kafka-生产者监听器(SpringBoot整合Kafka)

2024-06-09 1663阅读

文章目录

  • 1、生产者监听器
    • 1.1、创建生产者监听器
    • 1.2、创建生产者拦截器
    • 1.3、发送消息测试
    • 1.4、使用Java代码创建主题分区副本
    • 1.5、application.yml配置----v1版
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、控制台日志

      1、生产者监听器

      1.1、创建生产者监听器

      package com.atguigu.kafka.listener;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.springframework.kafka.support.ProducerListener;
      @Component
      public class MyKafkaProducerListener implements ProducerListener {
          //生产者 ack 配置为 0 只要发送即成功
          //ack为 1  leader落盘  broker ack之后 才成功
          //ack为 -1 分区所有副本全部落盘  broker ack之后 才成功
          @Override
          public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
              //ProducerListener.super.onSuccess(producerRecord, recordMetadata);
              System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()
              +",partition = "+producerRecord.partition()
              +",key = "+producerRecord.key()
              +",value = "+producerRecord.value()
              +",offset = "+recordMetadata.offset());
          }
          //消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息
          @Override
          public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
              System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()
                      +",partition = "+producerRecord.partition()
                      +",key = "+producerRecord.key()
                      +",value = "+producerRecord.value()
                      +",offset = "+recordMetadata.offset());
              System.out.println("异常信息:" + exception.getMessage());
          }
      }
      

      1.2、创建生产者拦截器

      package com.atguigu.kafka.interceptor;
      import org.apache.kafka.clients.producer.ProducerInterceptor;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.springframework.stereotype.Component;
      import java.util.Map;
      //拦截器必须手动注册给kafka生产者(KafkaTemplate)
      @Component
      public class MyKafkaInterceptor implements ProducerInterceptor {
          //kafka生产者发送消息前执行:拦截发送的消息预处理
          @Override
          public ProducerRecord onSend(ProducerRecord producerRecord) {
              System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
              +",partition:"+producerRecord.partition()
              +",key = "+producerRecord.key()
              +",value = "+producerRecord.value());
              return null;
          }
          //kafka broker 给出应答后执行
          @Override
          public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
              //exception为空表示消息发送成功
              if(e == null){
                  System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
                          +",partition:"+recordMetadata.partition()
                          +",offset="+recordMetadata.offset()
                  +",timestamp="+recordMetadata.timestamp());
              }
          }
          @Override
          public void close() {
          }
          @Override
          public void configure(Map map) {
          }
      }
      

      1.3、发送消息测试

      package com.atguigu.kafka.producer;
      import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
      import jakarta.annotation.PostConstruct;
      import jakarta.annotation.Resource;
      import org.junit.jupiter.api.Test;
      import org.springframework.boot.test.context.SpringBootTest;
      import org.springframework.kafka.core.KafkaTemplate;
      import java.io.IOException;
      @SpringBootTest
      class KafkaProducerApplicationTests {
          //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
          @Resource
          KafkaTemplate kafkaTemplate;
          @Resource
          MyKafkaInterceptor myKafkaInterceptor;
          @PostConstruct
          public void init() {
              kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
          }
          @Test
          void contextLoads() throws IOException {
              kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");
              //回调是等kafka,ack以后才执行,需要阻塞
              System.in.read();
          }
      }
      

      1.4、使用Java代码创建主题分区副本

      package com.atguigu.kafka.config;
      import org.apache.kafka.clients.admin.NewTopic;
      import org.springframework.context.annotation.Bean;
      import org.springframework.kafka.config.TopicBuilder;
      import org.springframework.stereotype.Component;
      @Component
      public class KafkaTopicConfig {
          @Bean
          public NewTopic myTopic1() {
              //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
              return TopicBuilder.name("my_topic1")//主题名称
                      .partitions(3)//主题分区
                      .replicas(3)//主题分区副本数
                      .build();//创建
          }
      }
      

      1.5、application.yml配置----v1版

      server:
        port: 8110
      # v1
      spring:
        kafka:
          bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
          producer: # producer 生产者
            retries: 0 # 重试次数 0表示不重试
            acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)
            batch-size: 16384 # 批次大小 单位byte
            buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
            key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
            value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
      

      1.6、屏蔽 kafka debug 日志 logback.xml

            
          
          
      
      

      1.7、引入spring-kafka依赖

      
          4.0.0
          
              org.springframework.boot
              spring-boot-starter-parent
              3.0.5
               
          
          
          
          com.atguigu.kafka
          kafka-producer
          0.0.1-SNAPSHOT
          kafka-producer
          kafka-producer
          
              17
          
          
              
                  org.springframework.boot
                  spring-boot-starter
              
              
                  org.springframework.boot
                  spring-boot-starter-test
                  test
              
              
                  org.springframework.kafka
                  spring-kafka
              
          
          
              
                  
                      org.springframework.boot
                      spring-boot-maven-plugin
                  
              
          
      
      

      1.8、控制台日志

      生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
      消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717573749549
      MyKafkaProducerListener消息发送成功:topic=my_topic1,partition = null,key = null,value = spring-kafka-生产者监听器,offset = 0
      

      kafka-生产者监听器(SpringBoot整合Kafka)

      kafka-生产者监听器(SpringBoot整合Kafka)

      [
        [
          {
            "partition": 0,
            "offset": 0,
            "msg": "spring-kafka-生产者监听器",
            "timespan": 1717573749549,
            "date": "2024-06-05 07:49:09"
          }
        ]
      ]
      
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]