kafka-生产者监听器(SpringBoot整合Kafka)
文章目录
- 1、生产者监听器
- 1.1、创建生产者监听器
- 1.2、创建生产者拦截器
- 1.3、发送消息测试
- 1.4、使用Java代码创建主题分区副本
- 1.5、application.yml配置----v1版
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、控制台日志
1、生产者监听器
1.1、创建生产者监听器
package com.atguigu.kafka.listener; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.kafka.support.ProducerListener; @Component public class MyKafkaProducerListener implements ProducerListener { //生产者 ack 配置为 0 只要发送即成功 //ack为 1 leader落盘 broker ack之后 才成功 //ack为 -1 分区所有副本全部落盘 broker ack之后 才成功 @Override public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { //ProducerListener.super.onSuccess(producerRecord, recordMetadata); System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic() +",partition = "+producerRecord.partition() +",key = "+producerRecord.key() +",value = "+producerRecord.value() +",offset = "+recordMetadata.offset()); } //消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息 @Override public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) { System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic() +",partition = "+producerRecord.partition() +",key = "+producerRecord.key() +",value = "+producerRecord.value() +",offset = "+recordMetadata.offset()); System.out.println("异常信息:" + exception.getMessage()); } }1.2、创建生产者拦截器
package com.atguigu.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.stereotype.Component; import java.util.Map; //拦截器必须手动注册给kafka生产者(KafkaTemplate) @Component public class MyKafkaInterceptor implements ProducerInterceptor { //kafka生产者发送消息前执行:拦截发送的消息预处理 @Override public ProducerRecord onSend(ProducerRecord producerRecord) { System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic() +",partition:"+producerRecord.partition() +",key = "+producerRecord.key() +",value = "+producerRecord.value()); return null; } //kafka broker 给出应答后执行 @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { //exception为空表示消息发送成功 if(e == null){ System.out.println("消息发送成功:topic = "+ recordMetadata.topic() +",partition:"+recordMetadata.partition() +",offset="+recordMetadata.offset() +",timestamp="+recordMetadata.timestamp()); } } @Override public void close() { } @Override public void configure(Map map) { } }1.3、发送消息测试
package com.atguigu.kafka.producer; import com.atguigu.kafka.interceptor.MyKafkaInterceptor; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import java.io.IOException; @SpringBootTest class KafkaProducerApplicationTests { //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中 @Resource KafkaTemplate kafkaTemplate; @Resource MyKafkaInterceptor myKafkaInterceptor; @PostConstruct public void init() { kafkaTemplate.setProducerInterceptor(myKafkaInterceptor); } @Test void contextLoads() throws IOException { kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器"); //回调是等kafka,ack以后才执行,需要阻塞 System.in.read(); } }1.4、使用Java代码创建主题分区副本
package com.atguigu.kafka.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.TopicBuilder; import org.springframework.stereotype.Component; @Component public class KafkaTopicConfig { @Bean public NewTopic myTopic1() { //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数) return TopicBuilder.name("my_topic1")//主题名称 .partitions(3)//主题分区 .replicas(3)//主题分区副本数 .build();//创建 } }1.5、application.yml配置----v1版
server: port: 8110 # v1 spring: kafka: bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097 producer: # producer 生产者 retries: 0 # 重试次数 0表示不重试 acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all) batch-size: 16384 # 批次大小 单位byte buffer-memory: 33554432 # 生产者缓冲区大小 单位byte key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器 value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器1.6、屏蔽 kafka debug 日志 logback.xml
1.7、引入spring-kafka依赖
4.0.0 org.springframework.boot spring-boot-starter-parent 3.0.5 com.atguigu.kafka kafka-producer 0.0.1-SNAPSHOT kafka-producer kafka-producer 17 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka org.springframework.boot spring-boot-maven-plugin1.8、控制台日志
生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器 消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717573749549 MyKafkaProducerListener消息发送成功:topic=my_topic1,partition = null,key = null,value = spring-kafka-生产者监听器,offset = 0
[ [ { "partition": 0, "offset": 0, "msg": "spring-kafka-生产者监听器", "timespan": 1717573749549, "date": "2024-06-05 07:49:09" } ] ]
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!


