3.4 消息发送基本流程

RocketMQ消息发送的关键点如图3-8所示。

068-1

图3-8 RocketMQ消息发送的关键点

消息发送流程主要的步骤为验证消息、查找路由、消息发送(包含异常处理机制),如代码清单3-8所示。

代码清单3-8 同步消息发送入口

DefaultMQProducer#send
public SendResult send(Message msg) throws MQClientException, RemotingException,
    MQBrokerException, InterruptedException{
    return this.defaultMQProducerImpl.send(msg);
}
DefaultMQProducerImpl#send
public SendResult send(Message msg) throws MQClientException, RemotingException,
        MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,long timeout) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

默认消息以同步方式发送,默认超时时间为3s。

本节主要以SendResult sendMessage(Messsage message)方法为突破口,介绍消息发送的基本实现流程。

3.4.1 消息长度验证

在消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范。具体的规范要求是主题名称、消息体不能为空,消息长度不能等于0且默认不能超过允许发送消息的最大长度4MB(maxMessageSize=1024×1024×4)。

3.4.2 查找主题路由信息

在消息发送之前,还需要获取主题的路由信息,只有获取了这些信息我们才能知道消息具体要发送到哪个Broker节点上,如代码清单3-9所示。

代码清单3-9 DefaultMQProducerImpl#tryToFindTopicPublishInfo

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic)
    { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok())
            { return topicPublishInfo;
    } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true,
                this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
    }
}

tryToFindTopicPublishInfo是查找主题的路由信息的方法。如果生产者中缓存了topic的路由信息,且该路由信息包含消息队列,则直接返回该路由信息。如果没有缓存或没有包含消息队列,则向NameServer查询该topic的路由信息。如果最终未找到路由信息,则抛出异常,表示无法找到主题相关路由信息异常。先看一下TopicPublishInfo的属性,如图3-9所示。

069-1

图3-9 RocketMQ TopicPublishInfo类图

下面逐一介绍TopicPublishInfo的属性。

  • orderTopic:是否是顺序消息。
  • List messageQueueList:该主题队列的消息队列。
  • sendWhichQueue:每选择一次消息队列,该值会自增1,如果超过Integer.MAX_VALUE,则重置为0,用于选择消息队列。
  • List queueDatas:topic队列元数据。
  • List brokerDatas:topic分布的broker元数据。
  • HashMapfilterServerTable:broker上过滤服务器的地址列表。

第一次发送消息时,本地没有缓存topic的路由信息,查询NameServer尝试获取路由信息,如果路由信息未找到,再次尝试用默认主题DefaultMQProducerImpl#createTopicKey去查询。如果BrokerConfig#autoCreateTopicEnable为true,NameServer将返回路由信息;如果autoCreateTopicEnable为false,将抛出无法找到topic路由异常。MQClientInstance#up dateTopicRouteInfoFromNameServer方法的功能是更新消息生产者和维护路由缓存,如代码清单3-10所示。

代码清单3-10 MQClientInstance#updateTopicRouteInfoFromNameServer

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer
                    (defaultMQProducer.getCreateTopicKey(),1000 * 3);
    if (topicRouteData != null) {
        for (QueueData data : topicRouteData.getQueueDatas()) {
            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(),
                data.getReadQueueNums());
            data.setReadQueueNums(queueNums);
            data.setWriteQueueNums(queueNums);
        }
    }
} else {
    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,
        1000 * 3);
}

第一步:如果isDefault为true,则使用默认主题查询,如果查询到路由信息,则将路由信息中读写队列的个数替换为消息生产者默认的队列个数(defaultTopicQueueNums);如果isDefault为false,则使用参数topic查询,如果未查询到路由信息,则返回false,表示路由信息未变化,如代码清单3-11所示。

代码清单3-11 MQClientInstance#updateTopicRouteInfoFromNameServer

TopicRouteData old = this.topicRouteTable.get(topic);
 boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
    changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
    log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old,
        topicRouteData);
}

第二步:如果找到路由信息,则与本地缓存中的路由信息进行对比,判断路由信息是否发生了改变,如果未发生变化,则直接返回false。

第三步:更新MQClientInstance Broker地址缓存表,如代码清单3-12所示。

代码清单3-12 MQClientInstance#updateTopicRouteInfoFromNameServer

{
    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic,
        topicRouteData);
    publishInfo.setHaveTopicRouterInfo(true);
    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet()
        .iterator();
    while (it.hasNext()) {
        Entry<String, MQProducerInner> entry = it.next();
        MQProducerInner impl = entry.getValue();
        if (impl != null) { impl.updateTopicPublishInfo(topic,
            publishInfo);
        }
    }
}

第四步:将topicRouteData中的List<QueueData> 转换成topicPublishInfo的List <MessageQueue>列表,具体实现在topicRouteData2TopicPublishInfo中。然后更新该MQClientInstance管辖的所有消息,发送关于topic的路由信息,如代码清单3-13所示。

代码清单3-13 MQClientInstance#TopicRouteData2TopicPublishInfo

List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
    if (PermName.isWriteable(qd.getPerm())) {
        BrokerData brokerData = null;
        for (BrokerData bd : route.getBrokerDatas()) {
            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                    brokerData = bd;
                    break;
                }
            }
            if (null == brokerData) {
                continue;
            }
            if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                continue;
            }
            for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                info.getMessageQueueList().add(mq);
            }
        }
    }
}

循环遍历路由信息的QueueData信息,如果队列没有写权限,则继续遍历下一个QueueData。根据brokerName找到brokerData信息,如果找不到或没有找到主节点,则遍历下一个QueueData。根据写队列个数,topic+序号创建MessageQueue,填充topicPublishInfo的List<MessageQueue>,完成消息发送的路由查找。

提示

温馨提示:在生产环境中不建议开启自动创建主题,因为这会导致新创建的主题只存在于集群中的部分节点上,具体原因建议大家结合路由寻址机制进行思考,关于该问题的详细分析可参考笔者“中间件兴趣圈”公众号中的博文:https://mp.weixin.qq.com/s/GbSlS3hi8IE0kznTynV4ZQ

3.4.3 选择消息队列

根据路由信息选择消息队列,返回的消息队列按照broker序号进行排序。举例说明,如果topicA在broker-a、broker-b上分别创建了4个队列,那么返回的消息队列为[{"brokerName":"broker-a"、"queueId":0}、{"brokerName":"broker-a"、"queueId":1}、{"brokerName":"broker-a"、"queueId":2}、{"brokerName":"broker-a"、"queueId":3}、{"brokerName":"broker-b"、"queueId":0}、{"brokerName":"broker-b"、"queueId":1}、{"brokerName":"broker-b"、"queueId":2}、{"brokerName":"broker-b"、"queueId":3}],那么RocketMQ如何选择消息队列呢?

首先消息发送端采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送结果执行回调之前进行重试,由retryTimesWhenSend AsyncFailed指定异常重试次数。接下来就是循环执行,选择消息队列、发送消息,发送成功则返回,收到异常则重试。选择消息队列有两种方式。

1)sendLatencyFaultEnable=false,默认不启用Broker故障延迟机制。

2)sendLatencyFaultEnable=true,启用Broker故障延迟机制。

1. 默认机制

如果sendLatencyFaultEnable=false,则调用TopicPublishInfo#selectOneMessageQueue,如代码清单3-14所示。

代码清单3-14 TopicPublishInfo#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if
    (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size(); if
            (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
        }
}
public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size(); if
    (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

在消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列的个数取模,返回该位置的MessageQueue(selectOneMessageQueue()方法,如果消息发送失败,下次进行消息队列选择时规避上次MesageQueue所在的Broker,否则有可能再次失败。

或许有读者会问,Broker不可用后,路由信息中为什么还会包含该Broker的路由信息呢?其实这不难解释:首先,NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次,NameServer不是检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,因此消息生产者最快感知Broker最新的路由信息也需要30s。这就需要引入一种机制,在Broker宕机期间,一次消息发送失败后,将该Broker暂时排除在消息队列的选择范围中。

2. Broker故障延迟机制

Broker故障延迟机制如代码清单3-15所示。

代码清单3-15 MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final
        String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName ||
                            mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                    }
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() %
                            writeQueueNums);
                    }
                    return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

首先对上述代码进行解读。

1)轮询获取一个消息队列。

2)验证该消息队列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName())是关键。

3)如果返回的MessageQueue可用,则移除latencyFaultTolerance关于该topic的条目,表明该Broker故障已经修复。

Broker故障延迟机制核心类如图3-10所示。

074-1

图3-10 RocketMQ故障延迟机制核心类

(1)LatencyFaultTolerance:延迟机制接口规范。

1)void updateFaultItem(T name, long currentLatency, long notAvailable Duration):更新失败条目。

  • name:Broker名称。
  • currentLatency:消息发送故障的延迟时间。
  • notAvailableDuration:不可用持续时长,在这个时间内,Broker将被规避。

2)boolean isAvailable(final T name):判断Broker是否可用。

3)void remove(final T name):移除失败条目,意味着Broker重新参与路由计算。

4)T pickOneAtLeast():尝试从规避的Broker中选择一个可用的Broker,如果没有找到,则返回null。

(2)FaultItem:失败条目(规避规则条目)。

1)private final String name:条目唯一键,这里为brokerName。

2)private volatile long currentLatency:本次消息发送的延迟时间。

3)private volatile long startTimestamp:故障规避的开始时间。

(3)MQFaultStrategy:消息失败策略,延迟实现的门面类。

1)long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}。

2)long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}。

根据currentLatency本次消息发送的延迟时间,从latencyMax尾部向前找到第一个比currentLatency小的索引index,如果没有找到,则返回0。然后根据这个索引从notAvailable-Duration数组中取出对应的时间,在这个时长内,Broker将设置为不可用。

下面从源码的角度分析updateFaultItem、isAvailable方法的实现原理,如代码清单3-16所示。

代码清单3-16 DefaultMQProducerImpl#sendDefaultImpl

beginTimestampPrev = System.currentTimeMillis();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback,
    topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev,
    false);

如果在发送过程中抛出了异常,则调用DefaultMQProducerImpl#updateFaultItem方法,该方法直接调用MQFaultStrategy#updateFaultItem方法,如代码清单3-17所示。下面关注一下各个参数的含义。

  • brokerName:Broker名称。
  • currentLatency:本次消息发送的延迟时间。
  • isolation:是否规避Broker,该参数如果为true,则使用默认时长30s来计算Broker故障规避时长,如果为false,则使用本次消息发送延迟时间来计算Broker故障规避时长。

代码清单3-17 MQFaultStrategy#updateFaultItem

public void updateFaultItem(final String brokerName, final long currentLatency,
        boolean isolation) {
    if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 :
                currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency,
                    duration);
    }
}
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
        }
        return 0;
    }
}

如果isolation为true,则使用30s作为computeNotAvailableDuration方法的参数。如果isolation为false,则使用本次消息发送时延作为computeNotAvailableDuration方法的参数。

computeNotAvailableDuration的作用是计算因本次消息发送故障需要规避Broker的时长,也就是接下来多长的时间内,该Broker将不参与消息发送队列负载。具体算法是,从latencyMax数组尾部开始寻找,找到第一个比currentLatency小的下标,然后从notAvailableDuration数组中获取需要规避的时长,该方法最终调用LatencyFaultTolerance的updateFaultItem()方法,如代码清单3-18所示。

代码清单3-18 LatencyFaultToleranceImpl#updateFaultItem

public void updateFaultItem(final String name, final long currentLatency, final long
        notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() +
            notAvailableDuration);
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() +
                notAvailableDuration);
        }
    } else {
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() +
            notAvailableDuration);
    }
}

根据Broker名称从缓存表中获取FaultItem,如果找到则更新FaultItem,否则创建FaultItem。这里有两个关键点。

1)currentLatency、startTimeStamp被volatile修饰。

2)startTimeStamp为当前系统时间加上需要规避的时长。startTimeStamp是判断Broker当前是否可用的直接依据,请看FaultItem#isAvailable方法,如代码清单3-19所示。

代码清单3-19 FaultItem#isAvailable方法

public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

注意

开启与不开启sendLatencyFaultEnable机制在消息发送时都能规避故障的Broker,那么这两种机制有何区别呢?

开启所谓的故障延迟机制,即设置sendLatencyFaultEnable为ture,其实是一种较为悲观的做法。当消息发送者遇到一次消息发送失败后,就会悲观地认为Broker不可用,在接下来的一段时间内就不再向其发送消息,直接避开该Broker。而不开启延迟规避机制,就只会在本次消息发送的重试过程中规避该Broker,下一次消息发送还是会继续尝试。

3.4.4 消息发送

消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl如代码清单3-20所示。

代码清单3-20 DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout)

如代码清单3-21所示,下面详细讲解消息发送参数。

1)Message msg:待发送消息。

2)MessageQueue mq:消息将发送到该消息队列上。

3)CommunicationMode communicationMode:消息发送模式,包括SYNC、ASYNC、ONEWAY。

4)SendCallback sendCallback:异步消息回调函数。

5)TopicPublishInfo topicPublishInfo:主题路由信息。

6)long timeout:消息发送超时时间。

代码清单3-21 DefaultMQProducerImpl#sendKernelImpl

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish
    (mq.getBrokerName());
if (null == brokerAddr)
    { tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish
        (mq.getBrokerName());
}

第一步:根据MessageQueue获取Broker的网络地址。如果MQClientInstance的brokerAddrTable未缓存该Broker的信息,则从NameServer主动更新topic的路由信息。如果路由更新后还是找不到Broker信息,则抛出MQClientException,提示Broker不存在,如代码清单3-22所示。

代码清单3-22 DefaultMQProducerImpl#sendKernelImpl

//对于MessageBatch,在生成过程中已设置ID
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}
int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

第二步:为消息分配全局唯一ID,如果消息体默认超过4KB(compressMsgBody-OverHowmuch),则对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.COMPRESSED_FLAG。如果是事务Prepared消息,则设置消息的系统标记为MessageSysFlag. TRANSACTION_PREPARED_TYPE,如代码清单3-23所示。

代码清单3-23 DefaultMQProducerImpl#sendKernelImpl

if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }
    if (msg.getProperty(" STARTDELIVERTIME") != null ||
        msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null)
        { context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}

第三步:如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。通过DefaultMQProducerImpl#registerSendMessageHook注册钩子处理类,并且可以注册多个。简单看一下钩子处理类接口,如代码清单3-24、代码清单3-25所示。

代码清单3-24 SendMessageHook

public interface SendMessageHook {
    String hookName();
    void sendMessageBefore(final SendMessageContext context);
    void sendMessageAfter(final SendMessageContext context);
}

代码清单3-25 DefaultMQProducerImpl#sendKernelImpl

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopic
        QueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getPr
    operties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg,
            MessageConst.PROPERTY_RECONSUME_TIME);
    }
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg,
            MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}

第四步:构建消息发送请求包。

主要包含如下重要信息:生产者组、主题名称、默认创建主题key、该主题在单个Broker上的默认队列数、队列ID(队列序号)、消息系统标记(MessageSysFlag)、消息发送时间、消息标记(RocketMQ对消息中的标记不做任何处理,供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等,如代码清单3-26所示。

代码清单3-26 MQClientAPIImpl#sendMessage

public SendResult sendMessage(final String addr,final String brokerName,
    final Message msg,final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,final CommunicationMode communicationMode, final
        SendCallback sendCallback,final TopicPublishInfo topicPublishInfo, final
        MQClientInstance instance,final int retryTimesWhenSendFailed, final
        SendMessageContext context,final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException
        { RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch)
            { SendMessageRequestHeaderV2 requestHeaderV2 =
                SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requ
                    estHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof
                MessageBatch ? RequestCode.SEND_BATCH_MESSAGE :
                RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand
                (RequestCode.SEND_MESSAGE, requestHeader);
        }
        request.setBody(msg.getBody());
        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis,
                    request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis,
                    request);
            default:
                assert false;
                break;
        }
        return null;
    }

第五步:根据消息发送方式(同步、异步、单向)进行网络传输,如代码清单3-27所示。

代码清单3-27 DefaultMQProducerImpl#sendKernelImpl

if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}

第六步:如果注册了消息发送钩子函数,则执行after逻辑。注意,就算消息发送过程中发生RemotingException、MQBrokerException、InterruptedException操作,该方法也会执行。

1. 同步发送

RocketMQ客户端发送消息的入口是MQClientAPIImpl#sendMessage。请求命令是Request Code.SEND_MESSAGE,我们可以找到该命令的处理类org.apache.rocketmq.broker.processor. SendMessageProcessor。入口方法在SendMessageProcessor#sendMessage中,如代码清单3-28所示。

代码清单3-28 AbstractSendMessageProcessor#msgCheck

protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final
        SendMessageRequestHeader requestHeader, final RemotingCommand response) {
    if(!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerP ermission())&& this.brokerController.getTopicConfigManager(). isOrderTopic(requestHeader.getTopic())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" +
            this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending message is forbidden");
        return response;
    }
    if (!this.brokerController.getTopicConfigManager().
                isTopicCanSendMessage(requestHeader.getTopic())) {
        String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with
                system reserved words.";
        log.warn(errorMsg);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorMsg);
        return response;
    }
    TopicConfig topicConfig =
            this.brokerController.getTopicConfigManager().
            selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) {
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            if (requestHeader.getTopic().startsWith
                        (MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                } else {
                    topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
                }
            }
            log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(),
                    ctx.channel().remoteAddress());
            topicConfig = this.brokerController.getTopicConfigManager().
                createTopicInSendMessageMethod(
                        requestHeader.getTopic(),requestHeader.getDefaultTopic(),
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

            if (null == topicConfig) {
                if
                        (requestHeader.getTopic().startsWith(
                        MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    topicConfig = this.brokerController.getTopicConfigManager().
                            createTopicInSendMessageBackMethod(
                                requestHeader.getTopic(), 1, PermName.PERM_WRITE |
                                    PermName.PERM_READ,topicSysFlag);
            }
        }
        if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not
                    exist, apply first please!"
                    + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
                return response;
            }
        }
        int queueIdInt = requestHeader.getQueueId();
        int idValid = Math.max(topicConfig.getWriteQueueNums(),
                    topicConfig.getReadQueueNums());
        if (queueIdInt >= idValid) {
            String errorInfo = String.format("request queueId[%d] is illegal, %s
                Producer: %s",
                queueIdInt,
                topicConfig.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(errorInfo);
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(errorInfo);
            return response;
        }
    return response;
}

第一步:检查消息发送是否合理,这里完成了以下几件事。

1)检查Broker是否有写权限。

2)检查topic是否可以进行消息发送。主要针对默认主题,默认主题不能发送消息,仅供路由查找。

3)在NameServer端存储主题的配置信息,默认路径为${ROCKET_HOME}/store/ config/ topic.json。下面是主题存储信息。

  • order:是否是顺序消息。
  • perm:权限码。
  • readQueueNums:读队列数量。
  • writeQueueNums:写队列数量。
  • topicName:主题名称。
  • topicSysFlag:topic Flag,当前版本暂为保留。
  • topicFilterType:主题过滤方式,当前版本仅支持SINGLE_TAG。

4)检查队列,如果队列不合法,则返回错误码。

第二步:如果消息重试次数超过允许的最大重试次数,消息将进入DLQ死信队列。死信队列主题为%DLQ%+消费组名。

第三步:调用DefaultMessageStore#putMessage进行消息存储。关于消息存储的实现细节将在第4章重点讲解。

2. 异步发送

异步发送是指消息生产者调用发送的API后,无须等待消息服务器返回本次消息发送的结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果后回调。异步发送方式相比于同步发送方式,虽然消息发送端的发送性能会显著提高,但是为了降低消息服务器的负载压力,RocketMQ对消息发送的异步消息进行了并发控制,通过参数clientAsyncSemaphoreValue实现,默认为65535。异步消息发送虽然也可以通过DefaultMQProducer#retryTimesWhenSendAsyncFailed属性来控制消息的发送重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等情况将不会重试。

3. 单向发送

单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送的结果,并且无须提供回调函数,这表示压根就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做了,并且没有重试机制。