springboot的kafka动态工具类(动态创建topic、监听和监听方法)

2024-07-06 1851阅读

springboot的kafka动态工具类(动态创建topic、监听和监听方法)

一、使用场景

需要动态创建topic,然后动态创建该topic的监听容器,同时可以指定该监听容器的处理方法,避免增删监听topic时需要重启操作等情况。

很多情况下,使用kafka一般都会主动创建好队列(Topic)和消费者监听(@KafkaListener),特别是监听者,一般都是动态创建好后,然后使用@KafkaListener指定Topic后创建。

上述情况的优点在于:可以明确topic和消费者,启动时程序主动就创建好对应topic的消费容器和消费方法,直接消费即可。

缺点:如果需要监听新的topic,则需要添加@KafkaListener的配置并且重新启动项目,对于灵活性要求高或者线上的程序是比较麻烦的。

二、工具类概述

所以基于上述情况,为了更加灵活的创建和使用Kafka的topic和listener,专门写了一个kafka相关的工具类:

topic相关的包含了:topic创建、删除、列表、是否存在等方法。

Listener相关包含了:容器的创建、启动、停止、暂停、恢复等方法。

这里有个概念需要先了解下,监听容器里有两个状态,可以简单理解为:一个是容器的运行状态running,一个是容器的监听状态pauseRequest,再容器运行状态开启的基础上,监听状态开启,才能够正常消费消息。

三、代码展示

  1. 那么老规矩,万事先依赖:
    org.springframework.kafka
    spring-kafka
    private static final String kafkaServer = "kafka-ip:9092";//kafka地址
    /**
     * @Title producerFactory
     * @Description TODO 生产者工厂类,设置生产者相关配置
     * @return org.springframework.kafka.core.ProducerFactory
        Map
        return new KafkaTemplate
        Map
        ConcurrentKafkaListenerContainerFactory
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        AdminClient adminClient = AdminClient.create(props);
        return adminClient;
    }
}

    private static AdminClient adminClient;
    private static KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    private static KafkaTemplate kafkaTemplate;
    /**
     * @Title KafkaUtil
     * @Description 构造函数注入
     * @param adminClient kafka客户端对象
     * @param kafkaListenerEndpointRegistry kafka监听容器注册对象
     * @param kafkaListenerEndpointRegistry kafka生产者工具类
     * @return
     */
    @Autowired
    public KafkaUtil(AdminClient adminClient, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, KafkaTemplate kafkaTemplate) {
        KafkaUtil.adminClient = adminClient;
        KafkaUtil.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
        KafkaUtil.kafkaTemplate = kafkaTemplate;
    }
    //region topic相关方法
    /**
     * @Title createTopic
     * @Description 创建kafka topic
     * @param topicName topic名
     * @param partitions 分区数
     * @param replicas 副本数(short)
     * @return void
     */
    public static void createTopic(String topicName, int partitions, short replicas) throws Exception {
        NewTopic newTopic = new NewTopic(topicName, partitions, replicas);
        CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newTopic));
        topics.all().get();
        log.info("【{}】topic创建成功", topicName);
    }
    /**
     * @Title deleteTopic
     * @Description 删除topic
     * @param topicName  topic名称
     * @return void
     */
    public static void deleteTopic(String topicName) throws Exception {
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName));
        deleteTopicsResult.all().get();
        log.info("【{}】topic删除成功", topicName);
    }
    /**
     * @Title updateTopicRetention
     * @Description 修改topic的过期时间
     * @param topicName  topic名称
     * @param ms  过期时间(毫秒值)
     * @return void
     */
    public static void updateTopicRetention(String topicName, String ms) throws Exception {
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        ConfigEntry configEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, ms);
        Config config = new Config(Collections.singleton(configEntry));
        // 创建AlterConfigsOptions
        AlterConfigsOptions alterConfigsOptions = new AlterConfigsOptions().timeoutMs(10000);
        // 执行修改操作
        adminClient.alterConfigs(Collections.singletonMap(resource, config), alterConfigsOptions).all().get();
        log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒", topicName, ms);
    }
    /**
     * @Title listTopic
     * @Description 获取topic列表
     * @return java.util.Set
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set
        Set
            return false;
        }
        return strings.contains(topicName);
    }
    //endregion
    //region 生产者发送消息示例
    /**
     * @Title sendMsg
     * @Description 通过注册信息找到对应的容器并启动
     * @param topic 队列名称
     * @param msg 消息
     * @return void
     */
    public static void sendMsg(String topic, Object msg) throws Exception {
        kafkaTemplate.send(topic, msg);
        //kafkaTemplate.send(topic,2,"key",msg);//带有分区和key值的
    }
    //endregion
    //region 消费者监听容器相关方法
    /**
     * @Title existListenerContainer
     * @Description TODO 根据ID查询容器是否存在
     * @param id 监听容器id
     * @return boolean
     */
    public static boolean existListenerContainer(String id) throws Exception {
        Set
        //判断id是否存在
        if (existListenerContainer(id)) {
            //如果当前id的容器已存在,不添加
            log.info("当前id为{}的容器已存在,不进行添加操作!", id);
            return;
        }
        //判断所有队列是否存在
        for (String topic : topics) {
            if (!existTopic(topic)) {
                //如果存在topic不存在,不添加
                log.info("【{}】topic不存在,不进行添加操作!", topic);
                return;
            }
        }
        MethodKafkaListenerEndpoint
//            if (!KafkaConfig.notExistTopicCreateContainerFlag && !nameTopics.contains(topicName)) {
//                log.info("【{}】topic不存在,不创建容器!", topicName);
//                continue;
//            }
//            //创建一个kafka监听器端点对象
//            MethodKafkaListenerEndpoint
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return;
        }
        listenerContainer.start();
        log.info("Kafka监听容器操作:ID为{}的容器已【开启】", id);
    }
    /**
     * @Title stopListenerContainer
     * @Description TODO 根据id停止监听容器的运行状态
     * @param id 监听容器的id
     * @return void
     */
    public static void stopListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return;
        }
        listenerContainer.stop();
        log.info("Kafka监听容器操作:ID为{}的容器已【停止】", id);
    }
    /**
     * @Title pauseListenerContainer
     * @Description TODO 根据id暂停监听容器的监听状态
     * @param id 监听容器的id
     * @return void
     */
    public static void pauseListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return;
        }
        listenerContainer.pause();
        log.info("Kafka监听容器操作:ID为{}的容器已【暂停】", id);
    }
    /**
     * @Title resumeListenerContainer
     * @Description TODO  根据id恢复监听容器的监听状态
     * @param id 监听容器的id
     * @return void
     */
    public static void resumeListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return;
        }
        listenerContainer.resume();
        log.info("Kafka监听容器操作:ID为{}的容器已【恢复】", id);
    }
    /**
     * @Title isNormalStateListenerContainer
     * @Description 是否是正常状态的容器
     * (kafka监听容器的运行状态标志是running,监听状态标志是pauseRequested,停止是关闭了资源,暂停是停止消费)
     *  只有running是true,并且pauseRequested是false,监听容器才能正常消费消息
     * @param id 监听容器的id
     * @return boolean
     */
    public static boolean isNormalStateListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        //如果不存在此id容器,则返回false
        if (listenerContainer == null) {
            return false;
        }
        //存在则返回容器的运行状态和非暂停状态
        return listenerContainer.isRunning() && !listenerContainer.isPauseRequested();
    }
    /**
     * @Title getPauseStateListenerContainer
     * @Description 获取监听容器的暂停状态(监听的状态)
     * @param id 监听容器id
     * @return boolean
     */
    public static boolean getPauseStateListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            return true;
        }
        return listenerContainer.isPauseRequested();
    }
    /**
     * @Title getRunningStateListenerContainer
     * @Description 获取监听容器的运行状态(容器的状态)
     * @param id 监听容器id
     * @return boolean
     */
    public static boolean getRunningStateListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            return false;
        }
        return listenerContainer.isRunning();
    }
    /**
     * @Title setStateNormalListenerContainer
     * @Description 使容器的运行状态和监听状态都是正常
     * @param id 监听容器的id
     * @return boolean 正常返回true,非正常返回false
     */
    public static boolean setStateNormalListenerContainer(String id) throws Exception {
        if (!existListenerContainer(id)) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return false;
        }
        //先判断容器运行状态是否正常,如果不正常,则开启
        if (!getRunningStateListenerContainer(id)) {
            startListenerContainer(id);
        }
        //再判断容器监听状态是否正常,如果不正常,则恢复
        if (getPauseStateListenerContainer(id)) {
            resumeListenerContainer(id);
        }
        //设置完后,再查询状态并返回。
        return isNormalStateListenerContainer(id);
    }
    //endregion
}

    @RequestMapping("kafkatest")
    public void test() {
        try {
            String topicName = "kafka-test-1";
            KafkaUtil.createTopic(topicName, 1, (short) 1);//创建topic
            KafkaUtil.updateTopicRetention(topicName, String.valueOf(1000000));//更新topic的过期时间
            Set
            throw new RuntimeException(e);
        }
    }
    @RequestMapping("del")
    public void deleteTopic() throws Exception {
        String topicName = "kafka-test-1";
        KafkaUtil.deleteTopic(topicName);//删除topic
    }
    @RequestMapping("send")
    public void sendMsg() throws Exception {
        String topicName = "kafka-test-1";
        KafkaUtil.sendMsg(topicName, "haha");
        boolean b = KafkaUtil.existTopic(topicName);//查询topic是否存在
        System.out.println("topic-是否存在:" + b);
    }
    /**
     * @Title consumerMessage
     * @Description TODO 消费监听处理消息的方法
     * @param message 接受来自kakfa的参数
     * @param ack 消息确认参数
     * @return void
     */
    public void consumerMessage(List
        System.out.println("收到消息:" + message);
        //消息确认
        ack.acknowledge();
    }
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]