RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

2024-05-13 1142阅读

1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件

RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明) 

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

 3、把这个插件传输到服务器上

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

4、根据官网的指示把插件放到RabbitMQ指定的文件夹下

RabbitMQ官网指示安装插件步骤的网址:https://www.rabbitmq.com/installing-plugins.html

我这里安装RabbitMQ的系统是CentOS,所以放在/usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins这个目录下

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

5、拷贝插件到指定的目录下

例:

cp rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins/

效果图:

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

 6、安装延迟队列插件

输入以下命令安装延迟队列插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

效果图:

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

7、重启RabbitMQ

输入以下命令重启RabbitMQ

systemctl restart rabbitmq-server.service

效果图:

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

8、查看插件是否安装成功

 进入RabbitMQ的管理页面,进入Exchange的管理页面,新增Exchange,在Type里面可以看到x-delayed-message的选项,证明延迟队列插件安装成功RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

9、基于插件实现延迟队列的原理示意图

原先我们没下插件之前实现延迟队列是基于图下这种方式实现的

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

但我们下载插件后就能通过交换机延迟消息的方式来实现消息的延迟了(由步骤8可见,我们验证插件是否安装成功是从Exchange进去的,而不是从Queues进去的)

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

10、基于插件延迟队列的代码实现

(1)在config包里新建一个名为DelayedQueueConfig的类用于编写配置队列延迟的代码

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

代码如下:

package com.ken.springbootrqbbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed_queue";
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";
    //交换机
    public static final String DELAYED_ROUTING_KEY = "delayed";
    //声明延迟队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //声明延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map arguments = new HashMap(3);
        //设置延迟类型
        arguments.put("x-delayed-type","direct");
        /**
         * 声明自定义交换机
         * 第一个参数:交换机的名称
         * 第二个参数:交换机的类型
         * 第三个参数:是否需要持久化
         * 第四个参数:是否自动删除
         * 第五个参数:其他参数
         */
        return new CustomExchange(DELAYED_QUEUE_NAME,"x-delayed-message",true,false,arguments);
    }
    //绑定队列和延迟交换机
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
       @Qualifier("delayedExchange") Exchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

 (2)在SendMsgController类里写一个接口,让其能往延迟队列里发送消息

代码如下:

package com.ken.springbootrqbbitmq.controller;
import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
 * 发送延迟消息
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date(),message);
        rabbitTemplate.convertAndSend("normal_exchange","normal01","消息来着ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("normal_exchange","normal02","消息来着ttl为40s的队列:" + message);
    }
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的TTL消息给normal03队列:{}", new Date(),ttlTime,message);
        rabbitTemplate.convertAndSend("normal_exchange","normal03",message,msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
    /**
     * 给延迟队列发送消息
     * @param message
     * @param delayTime
     */
    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的消息给延迟队列:{}", new Date(),delayTime,message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_QUEUE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }
}

(3)在consumer包里新建一个名为DelayQueueConsumer的类用于编写消费延迟队列的消费者代码

效果图:

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

代码如下:

package com.ken.springbootrqbbitmq.consumer;
import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
 * 延迟队列消费者
 */
@Slf4j
@Component
public class DelayQueueConsumer {
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    private void receiveDelayQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间{},收到延迟队列的消息",new Date(),msg);
    }
}

(4)启动项目,往浏览器输入接口地址和参数,从而调用接口

[1]第一条消息

http://localhost:8080/ttl/sendDelayMsg/我是第一条消息/20000

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

[2]第二条消息

http://localhost:8080/ttl/sendDelayMsg/我是第二条消息/2000

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

效果图:

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

结论:基于测试发现在使用延迟插件的情况下,延迟时间短的消息会被先消费,这证明基于插件的延迟消息达到预期效果

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]