kafka-消费者组(SpringBoot整合Kafka)

2024-06-09 1298阅读

文章目录

  • 1、消费者组
    • 1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
    • 1.2、创建生产者发送消息
    • 1.3、application.yml配置
    • 1.4、创建消费者监听器
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

      1、消费者组

      1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

      kafka-消费者组(SpringBoot整合Kafka)

      1.2、创建生产者发送消息

      package com.atguigu.spring.kafka.consumer;
      import jakarta.annotation.Resource;
      import org.junit.jupiter.api.Test;
      import org.springframework.boot.test.context.SpringBootTest;
      import org.springframework.kafka.core.KafkaTemplate;
      @SpringBootTest
      class SpringKafkaConsumerApplicationTests {
          @Resource
          KafkaTemplate kafkaTemplate;
          @Test
          void contextLoads() {
              for (int i = 0; i  
      

      kafka-消费者组(SpringBoot整合Kafka)

      kafka-消费者组(SpringBoot整合Kafka)

      1.3、application.yml配置

      server:
        port: 8120
      # v1
      spring:
        Kafka:
          bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
          consumer:
            # read-committed读事务已提交的消息 解决脏读问题
            isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
            # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
            enable-auto-commit: true 
            # 消费者提交ack时多长时间批量提交一次
            auto-commit-interval: 1000
            # 消费者第一次消费主题消息时从哪个位置开始
            auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      

      1.4、创建消费者监听器

      package com.atguigu.spring.kafka.consumer.listener;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.springframework.kafka.annotation.KafkaListener;
      import org.springframework.stereotype.Component;
      @Component
      public class MyKafkaListener {
          @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
          public void onMessage1(ConsumerRecord record) {
              System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()
                      +",partition:"+record.partition()
                      +",offset = "+record.offset()
                      +",key = "+record.key()
                      +",value = "+record.value());
          }
          @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
          public void onMessage2(ConsumerRecord record) {
              System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()
                      +",partition:"+record.partition()
                      +",offset = "+record.offset()
                      +",key = "+record.key()
                      +",value = "+record.value());
          }
          @KafkaListener(topics ={"my_topic1"},groupId = "my_group2")
          public void onMessage3(ConsumerRecord record) {
              System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()
                      +",partition:"+record.partition()
                      +",offset = "+record.offset()
                      +",key = "+record.key()
                      +",value = "+record.value());
          }
      }
      

      1.5、创建SpringBoot启动类

      package com.atguigu.spring.kafka.consumer;
      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;
      // Generated by https://start.springboot.io
      // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
      @SpringBootApplication
      public class SpringKafkaConsumerApplication {
          public static void main(String[] args) {
              SpringApplication.run(SpringKafkaConsumerApplication.class, args);
          }
      }
      

      1.6、屏蔽 kafka debug 日志 logback.xml

            
          
          
      
      

      1.7、引入spring-kafka依赖

      
          4.0.0
          
              org.springframework.boot
              spring-boot-starter-parent
              3.0.5
               
          
          
          
          com.atguigu
          spring-kafka-consumer
          0.0.1-SNAPSHOT
          spring-kafka-consumer
          spring-kafka-consumer
          
              17
          
          
              
                  org.springframework.boot
                  spring-boot-starter
              
              
                  org.springframework.boot
                  spring-boot-starter-test
                  test
              
              
                  org.springframework.boot
                  spring-boot-starter-web
              
              
                  org.springframework.kafka
                  spring-kafka
              
          
          
              
                  
                      org.springframework.boot
                      spring-boot-maven-plugin
                  
              
          
      
      

      1.8、消费者控制台:

        .   ____          _            __ _ _
       /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
      ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
       \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
        '  |____| .__|_| |_|_| |_\__, | / / / /
       =========|_|==============|___/=/_/_/_/
       :: Spring Boot ::                (v3.0.5)
      my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
      my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
      my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
      my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
      my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
      my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
      my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
      my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
      my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
      my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
      my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
      my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
      my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
      my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
      my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
      my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
      my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
      my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
      my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
      my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
      

      kafka-消费者组(SpringBoot整合Kafka)

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]