RocketMQ源码学习笔记:Producer发送消息流程

2024-07-17 1620阅读

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

    目录

    • 1、Overview
    • 2、验证消息
    • 3、查找路由
    • 4、选择消息发送队列
      • 4.1、选择队列的策略
      • 4.2、源码阅读
        • 4.2.1、轮询规避
        • 4.2.2、故障延迟规避
          • 4.2.2.1、计算规避时间
          • 4.2.2.2、选择队列
          • 4.2.3、ThreadLocal的使用
          • 5、发送消息
            • 5.1、客户端建立的时间

              1、Overview

              消息发送主要可以分成下面四个步骤。

              1. 验证消息
              2. 查找路由
              3. 选择队列
              4. 消息发送

              之后从源码查看四个步骤的具体内容。

              我们建立一个DefaultMQProducer之后,调用DefaultMQProducer#send()方法就可发送信息。

              查看send()的代码最终会来到DefaultMQProducerImpl#sendDefaultImpl(),我们从这里开始看源码。

              2、验证消息

              发送前必然验证一下消息。

              主要是检验消息的状态,一些必要的值不能为空等。

              this.makeSureStateOK();
              // 1、检查消息
              Validators.checkMessage(msg, this.defaultMQProducer);
              

              公司内部想设置一些新的规则用来发送前拦截信息就适合放在checkMessage()里。

              这部分没有太多内容。

              3、查找路由

              所谓的路由是指可用的Broker的信息,包括地址,具体的消息队列等。

              下面这一句获取到路由。

              TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
              

              在DefaultMQProducer内部缓存这路由信息,维护在ConcurrentHashMap中

              private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap();
              

              在tryToFindTopicPublishInfo()中会先检查路由信息是否存在,不存在还需要从NameServer中获取路由列表。

              this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
              

              更新的时候会加上一个ReentrantLock,更新结束后释放。

              获取到路由信息后开始选择队列发送消息。

              4、选择消息发送队列

              4.1、选择队列的策略

              得到路由信息后就开始选择消息队列发送信息。

              选择队列有两种策略

              • 轮询规避:轮询选择队列。如果上次发送消息失败,那就消息需要重新发送,这时就需要规避掉上次发送失败的队列,寻找下一个队列发送。
              • 故障延迟策略:在选择队列发送时根据以往发送时长判断该队列的Broker是否可用。对于发送失败的Broker,Producer会规避该Broker一段时间。

                这是发送消息的流程图。

                假设我们的Broker是集群,有两个Broker。消息会选择其中一个Broker发送消息,如果失败就重试,直到发送成功或者超过重试次数。

                RocketMQ源码学习笔记:Producer发送消息流程

                4.2、源码阅读

                这里会探索源码如何实现这两种队列选择策略。

                选择队列的入口在DefaultMQProducerImpl#sendDefaultImpl -> this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

                RocketMQ源码学习笔记:Producer发送消息流程

                从入口进入查看代码,最终在MQFaultStrategy#selectOneMessageQueue,代码通过sendLatencyFaultEnable这个字段来选择不同的选择策略。

                RocketMQ源码学习笔记:Producer发送消息流程

                4.2.1、轮询规避

                这是轮询规避的源码。

                RocketMQ源码学习笔记:Producer发送消息流程

                其中lastBrokerName是上一次消息发送时选择的broker。这代表该消息上一次发送失败了,所以记录着上一次失败的broker以在这次选择Broker时规避他。

                所以lastBrokerName==null时该消息是第一次发送,不需要规避,直接随机选择一个队列发送。

                如果上一次发送失败,则开始轮询选择一个队列,保证这个新选出的队列和上一个不同后就可以返回。

                4.2.2、故障延迟规避

                4.2.2.1、计算规避时间

                故障延迟规避策略需要记录发送时间并计算。在看选择Broker的代码时需要看看源码如何记录发送时间并计算出规避时间的。

                计算规避时间的代码在this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);。这时消息正常发送会调用一次这个方法;如果出现异常在catch快也会调用这个方法计算规避时间。

                RocketMQ源码学习笔记:Producer发送消息流程

                进入这个方法,代码如下。

                需要注意isolation=true时表示消息发送出现异常,这时便认为延迟时长是30000ms。

                同时也可以看到sendLatencyFaultEnable==true表示开启故障规避策略,这种情况才需要计算规避时间。选择Broker时也是通过这个属性判断使用过故障规避还是轮询规避。

                RocketMQ源码学习笔记:Producer发送消息流程

                规避时间的计算比较简单,阿里根据自己的经验设置了一个对照表来计算时间,如下图所示。比如延迟是550ms以内的Broker不用规避;延迟在550~1000ms的需要规避30s。

                RocketMQ源码学习笔记:Producer发送消息流程


                这里跳过计算规避时间的代码细节,进入下一行代码this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);。

                代码如下,它其实就是将不可用的Broker维护在faultItemTable中,并且记录着解禁时间。以后选择Broker会通过这个集合查看Broker是否可用。

                RocketMQ源码学习笔记:Producer发送消息流程


                4.2.2.2、选择队列

                我们回到选择Broker的代码MQFaultStrategy#selectOneMessageQueue,下图是相关代码。

                RocketMQ源码学习笔记:Producer发送消息流程

                它的大概流程是,轮询队列,如果可用就返回。实在找不到可用的就随机选择一个Broker发送。

                它通过latencyFaultTolerance.isAvailable(mq.getBrokerName())判断队列是否可用,里面实际就是通过前面讲到的faultItemTable来查看队列是否可用。

                4.2.3、ThreadLocal的使用

                在选择队列时,无论是轮询规避还是故障延迟规避都需要循环遍历messageQueue找到适合的queue发送信息。

                获取下标的方式用到了ThreadLocal。如下图所示,sendWhichQueue本质上就是一个ThreadLocal对象。

                RocketMQ源码学习笔记:Producer发送消息流程

                生产者发送信息时可能会有多个线程同时发信息。

                这些线程发送信息时应该各自维护一个消息队列的下标,这样每个线程发送信息时才会比较均匀地向每个队列都发送信息。

                另外这些线程发送信息时可能会指定消息队列的id,所以线程各自维护一个消息队列的下标是很有必要的。

                这个场景就很适合ThreadLocal,选择消息队列时用ThreadLocal来维护下标。



                5、发送消息

                5.1、客户端建立的时间

                客户端发送消息时,建立HTTP连接是在send()方法中而不是在start()方法中。

                站在设计者的角度需要考虑到,开发者在start()方法后可能还需要过一段时间才会真正发送信息,甚至不发信息。

                那么建立HTTP连接放在start()就比较浪费资源,所以建立HTTP连接放在了send()方法中。

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]