数据源头

flume抽取本地文件到kafka

#数据源设置
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#数据源头
a1.sources.r1.type = TAILDIR
#记录了每条文件的inode,作用就是flume断开后,下次采集数据就会从新的位置开始
a1.sources.r1.positionFile = /opt/specialwu/flumeconf/kafka_taildir_position.json
a1.sources.r1.filegroups = f1
#向上面的json文件写inode的频率
a1.sources.r1.writePosInterval = 1000
a1.sources.r1.filegroups.f1 = /usr/share/nginx/logs/access.log

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#自己集群的topic,但尽量与数据源topic一致
a1.sinks.k1.kafka.topic = topic0329
a1.sinks.k1.kafka.bootstrap.servers = specialwu2:9092,specialwu3:9092,specialwu4:9092
#一次处理消息的数量
a1.sinks.k1.kafka.flumeBatchSize = 2000
#设置为1则leader接收到数据后就开始传输
a1.sinks.k1.kafka.producer.acks = 1
#每隔1秒接收一次数据
a1.sinks.k1.kafka.producer.linger.ms = 1000
#不支持压缩
a1.sinks.k1.kafka.producer.compression.type = snappy
#批处理大小
a1.sinks.kafka-sink.kafka.producer.batch.size = 1048576

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1

中间过程

在浏览器上输入地址后实现消费的启停

    @Autowired
    private  JdbcTemplate jdbcTemplate;

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    String message=null;
    @KafkaListener(id = "myfirstconsumer",topics = "topic04291",groupId="myfirstconsumer")
    //@KafkaListener(id = "myfirstconsumer",topics = "topic0429",containerFactory = "kafkaListenerContainerFactory")
    //@KafkaListener(topics = "topic0329",groupId = "ss1")
    public void simpleConsumer(List<ConsumerRecord<String, String>> record, Acknowledgment ack) throws ParseException {
        System.out.println("批量消费");
        for (ConsumerRecord<String, String> record1:record){
            System.out.println("传过来的值是"+record1.value()+"分区是"+record1.partition());
            message="当前的数据是 "+record1.value()+" 对应的分区是 "+record1.partition();
        }


        jdbcTemplate.batchUpdate("insert into web_visitors (visit_ip,visit_time,city,country) values (?,?,?,?)", new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                //测试数据部分
                ConsumerRecord<String, String> stringStringConsumerRecord = record.get(i);
                String[] splitRecord = stringStringConsumerRecord.value().split("\\s+");
                String visitIp=splitRecord[0];
                //String visitCity = JSON.parseObject(GetIP.findIpAddrRegion(splitRecord[0])).getString("city");
                //String visitcountry = JSON.parseObject(GetIP.findIpAddrRegion(splitRecord[0])).getString("country");

                try {
                    String vivitime=Dateutile1.dateFormat(splitRecord[3].substring(1));
                    System.out.println("访问时间"+splitRecord[3].substring(1));
                    ps.setString(1,visitIp);
                    ps.setString(2,vivitime);
                    ps.setString(3,"visitCity");
                    ps.setString(4,"visitcountry");
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println(e);
                }

            }

            @Override
            public int getBatchSize() {
                //一次性插入的大小
                return record.size();
            }
        });

        ack.acknowledge();

    }

    @KafkaListener(topics = "topic0329",groupId = "s1")
    public void simpleConsumer1(List<ConsumerRecord<String, String>> record)  {
        System.out.println("消费者s2_______________");
        for (ConsumerRecord<String, String> record1:record){
            System.out.println("传过来的值是"+record1.value()+"分区是"+record1.partition());

        }

    }

    @RequestMapping("/stopspecialwukafka")
   public String testKafka(String listennerId){
        if (registry.getListenerContainer(listennerId) == null){
            return "没有该listener_id ";
        }
        if(registry.getListenerContainer(listennerId).isRunning()){
            registry.getListenerContainer(listennerId).pause();
            return "已成功停止该Listener";
        }
        return "输入的 "+listennerId+" 已经停止";
   }

    @RequestMapping("/startspecialwukafka")
    public String startKafka(String listennerId){
        if (registry.getListenerContainer(listennerId)==null){
            return "没有该listener_id ";
        }
        if(!registry.getListenerContainer(listennerId).isRunning()){
            registry.getListenerContainer(listennerId).start();
        }
        if (message==null){
            message="刚开始消费暂无数据";
        }
        return message;
    }

最终呈现