数据源头
flume抽取本地文件到kafka
#数据源设置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#数据源头
a1.sources.r1.type = TAILDIR
#记录了每条文件的inode,作用就是flume断开后,下次采集数据就会从新的位置开始
a1.sources.r1.positionFile = /opt/specialwu/flumeconf/kafka_taildir_position.json
a1.sources.r1.filegroups = f1
#向上面的json文件写inode的频率
a1.sources.r1.writePosInterval = 1000
a1.sources.r1.filegroups.f1 = /usr/share/nginx/logs/access.log
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#自己集群的topic,但尽量与数据源topic一致
a1.sinks.k1.kafka.topic = topic0329
a1.sinks.k1.kafka.bootstrap.servers = specialwu2:9092,specialwu3:9092,specialwu4:9092
#一次处理消息的数量
a1.sinks.k1.kafka.flumeBatchSize = 2000
#设置为1则leader接收到数据后就开始传输
a1.sinks.k1.kafka.producer.acks = 1
#每隔1秒接收一次数据
a1.sinks.k1.kafka.producer.linger.ms = 1000
#不支持压缩
a1.sinks.k1.kafka.producer.compression.type = snappy
#批处理大小
a1.sinks.kafka-sink.kafka.producer.batch.size = 1048576
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
中间过程
在浏览器上输入地址后实现消费的启停
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private KafkaListenerEndpointRegistry registry;
String message=null;
@KafkaListener(id = "myfirstconsumer",topics = "topic04291",groupId="myfirstconsumer")
//@KafkaListener(id = "myfirstconsumer",topics = "topic0429",containerFactory = "kafkaListenerContainerFactory")
//@KafkaListener(topics = "topic0329",groupId = "ss1")
public void simpleConsumer(List<ConsumerRecord<String, String>> record, Acknowledgment ack) throws ParseException {
System.out.println("批量消费");
for (ConsumerRecord<String, String> record1:record){
System.out.println("传过来的值是"+record1.value()+"分区是"+record1.partition());
message="当前的数据是 "+record1.value()+" 对应的分区是 "+record1.partition();
}
jdbcTemplate.batchUpdate("insert into web_visitors (visit_ip,visit_time,city,country) values (?,?,?,?)", new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
//测试数据部分
ConsumerRecord<String, String> stringStringConsumerRecord = record.get(i);
String[] splitRecord = stringStringConsumerRecord.value().split("\\s+");
String visitIp=splitRecord[0];
//String visitCity = JSON.parseObject(GetIP.findIpAddrRegion(splitRecord[0])).getString("city");
//String visitcountry = JSON.parseObject(GetIP.findIpAddrRegion(splitRecord[0])).getString("country");
try {
String vivitime=Dateutile1.dateFormat(splitRecord[3].substring(1));
System.out.println("访问时间"+splitRecord[3].substring(1));
ps.setString(1,visitIp);
ps.setString(2,vivitime);
ps.setString(3,"visitCity");
ps.setString(4,"visitcountry");
} catch (Exception e) {
e.printStackTrace();
System.out.println(e);
}
}
@Override
public int getBatchSize() {
//一次性插入的大小
return record.size();
}
});
ack.acknowledge();
}
@KafkaListener(topics = "topic0329",groupId = "s1")
public void simpleConsumer1(List<ConsumerRecord<String, String>> record) {
System.out.println("消费者s2_______________");
for (ConsumerRecord<String, String> record1:record){
System.out.println("传过来的值是"+record1.value()+"分区是"+record1.partition());
}
}
@RequestMapping("/stopspecialwukafka")
public String testKafka(String listennerId){
if (registry.getListenerContainer(listennerId) == null){
return "没有该listener_id ";
}
if(registry.getListenerContainer(listennerId).isRunning()){
registry.getListenerContainer(listennerId).pause();
return "已成功停止该Listener";
}
return "输入的 "+listennerId+" 已经停止";
}
@RequestMapping("/startspecialwukafka")
public String startKafka(String listennerId){
if (registry.getListenerContainer(listennerId)==null){
return "没有该listener_id ";
}
if(!registry.getListenerContainer(listennerId).isRunning()){
registry.getListenerContainer(listennerId).start();
}
if (message==null){
message="刚开始消费暂无数据";
}
return message;
}