kafka-消费者组(SpringBoot整合Kafka)
文章目录
- 1、消费者组
- 1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
- 1.2、创建生产者发送消息
- 1.3、application.yml配置
- 1.4、创建消费者监听器
- 1.5、创建SpringBoot启动类
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、消费者控制台:
1、消费者组
1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
1.2、创建生产者发送消息
package com.atguigu.spring.kafka.consumer; import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; @SpringBootTest class SpringKafkaConsumerApplicationTests { @Resource KafkaTemplate kafkaTemplate; @Test void contextLoads() { for (int i = 0; i1.3、application.yml配置
server: port: 8120 # v1 spring: Kafka: bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097 consumer: # read-committed读事务已提交的消息 解决脏读问题 isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息 # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量 enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次 auto-commit-interval: 1000 # 消费者第一次消费主题消息时从哪个位置开始 auto-offset-reset: earliest #指定Offset消费:earliest | latest | none key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer1.4、创建消费者监听器
package com.atguigu.spring.kafka.consumer.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class MyKafkaListener { @KafkaListener(topics ={"my_topic1"},groupId = "my_group1") public void onMessage1(ConsumerRecord record) { System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic() +",partition:"+record.partition() +",offset = "+record.offset() +",key = "+record.key() +",value = "+record.value()); } @KafkaListener(topics ={"my_topic1"},groupId = "my_group1") public void onMessage2(ConsumerRecord record) { System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic() +",partition:"+record.partition() +",offset = "+record.offset() +",key = "+record.key() +",value = "+record.value()); } @KafkaListener(topics ={"my_topic1"},groupId = "my_group2") public void onMessage3(ConsumerRecord record) { System.out.println("my_group2消费者获取到消息:topic = "+ record.topic() +",partition:"+record.partition() +",offset = "+record.offset() +",key = "+record.key() +",value = "+record.value()); } }1.5、创建SpringBoot启动类
package com.atguigu.spring.kafka.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; // Generated by https://start.springboot.io // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn @SpringBootApplication public class SpringKafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringKafkaConsumerApplication.class, args); } }1.6、屏蔽 kafka debug 日志 logback.xml
1.7、引入spring-kafka依赖
4.0.0 org.springframework.boot spring-boot-starter-parent 3.0.5 com.atguigu spring-kafka-consumer 0.0.1-SNAPSHOT spring-kafka-consumer spring-kafka-consumer 17 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-maven-plugin1.8、消费者控制台:
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v3.0.5) my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2 my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8 my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0 my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6 my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5 my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3 my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9 my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1 my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7 my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4 my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2 my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8 my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5 my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4 my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3 my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9 my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1 my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7 my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0 my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!




