使用 RocketMQ 实现消息的顺序消费
在分布式系统中,保持消息的顺序性是一个常见且重要的问题。RocketMQ 提供了一种有效的方式来确保消息的顺序消费。本文将通过代码示例,介绍如何使用 RocketMQ 实现消息的顺序生产和消费。
(图片来源网络,侵删)
环境准备
在开始之前,请确保您已经配置好 RocketMQ 环境,并且在 MqConstant 类中定义了 RocketMQ 的 NameServer 地址。
顺序消息的生产
首先,我们需要编写生产者代码来发送顺序消息。我们会创建两个示例,一个简单的顺序生产示例,另一个则是基于业务逻辑(如订单流程)的顺序生产示例。
简单的顺序生产者
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
public class FOrderlyTest {
@Test
public void orderlyProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
for (int i = 0; i
基于业务逻辑的顺序生产者
在这个示例中,我们假设有一个 Order 类表示订单,订单包含了 id、orderNumber、price、date 和 status 等信息。
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import com.takumilove.domain.Order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Test;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
public class FOrderlyTest {
@Test
public void orderlyProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
List orderList = Arrays.asList(
new Order(1, 111, 59D, new Date(), "下订单"),
new Order(2, 111, 59D, new Date(), "物流"),
new Order(3, 111, 59D, new Date(), "签收"),
new Order(4, 112, 89D, new Date(), "下订单"),
new Order(5, 112, 89D, new Date(), "物流"),
new Order(6, 112, 89D, new Date(), "拒收")
);
orderList.forEach(order -> {
Message message = new Message("orderlyTopic", order.toString().getBytes());
try {
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List list, Message message, Object o) {
int queueNumber = list.size();
Integer i = (Integer) o;
return list.get(i % queueNumber);
}
}, order.getOrderNumber());
} catch (Exception e) {
System.out.println("发送失败" + e.getMessage());
}
});
producer.shutdown();
System.out.println("发送完毕:");
}
}
顺序消息的消费
接下来,我们编写消费者代码来消费这些顺序消息。我们将分别展示简单顺序消费者和基于业务逻辑的顺序消费者。
简单的顺序消费者
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Test;
import java.util.List;
public class FOrderlyTest {
@Test
public void orderlyConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("orderlyTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
}
基于业务逻辑的顺序消费者
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Test;
import java.util.List;
public class FOrderlyTest {
@Test
public void orderlyConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("orderlyTopic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List list,
ConsumeOrderlyContext consumeOrderlyContext) {
MessageExt messageExt = list.get(0);
System.out.println(new String(messageExt.getBody()));
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.in.read();
}
}
总结
通过以上示例,我们展示了如何使用 RocketMQ 实现消息的顺序生产和消费。无论是简单的消息还是基于业务逻辑的消息,都可以通过 RocketMQ 提供的顺序消费机制来保证消息的有序性。这对于订单系统等需要严格顺序的场景尤为重要。
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!
