3.5 批量消息发送

批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多,性能就越好,判断依据是单条消息的长度,如果单条消息内容比较长,则打包发送多条消息会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过Default MQProducer#maxMessageSize。批量发送消息要解决的是如何将这些消息编码,以便服务端能够正确解码每条消息的内容。

那么RocketMQ如何编码多条消息呢?我们首先梳理一下RocketMQ网络请求命令的设计,其类图如图3-11所示。下面我们逐一介绍RemotingCommand的属性。

084-1

图3-11 RocketMQ请求命令类图

1)code:请求命令编码,请求命令类型。

2)version:版本号。

3)opaque:客户端请求序号。

4)flag:标记。倒数第一位表示请求类型,0表示请求;1表示返回。倒数第二位,1表示单向发送。

5)remark:描述。

6)extFields:扩展属性。

7)customeHeader:每个请求对应的请求头信息。

8)byte[] body:消息体内容。

发送单条消息时,消息体的内容将保存在body中。发送批量消息时,需要将多条消息体的内容存储在body中。如何存储更便于服务端正确解析每条消息呢?RocketMQ采取的方式是,对单条消息内容使用固定格式进行存储,如图3-12所示。

085-1

图3-12 RocetMQ消息封装格式

接下来梳理一下批量消息发送的核心流程,如代码清单3-29所示。

代码清单3-29 DefaultMQProducer#send消息批量发送

public SendResult send(Collection<Message> msgs) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}

首先在消息发送端,调用batch()方法,将一批消息封装成MessageBatch对象。Message-Batch继承自Message对象,内部持有List<Message> messages。这样一来,批量消息发送与单条消息发送的处理流程就完全一样了。MessageBatch只需要将该集合中每条消息的消息体聚合成一个byte[]数组,在消息服务端能够从该byte[]数组中正确解析出消息,如代码清单3-30所示。

代码清单3-30 Message’Batch#encode

public byte[] encode() {
    return MessageDecoder.encodeMessages(messages);
}

在创建RemotingCommand对象时,调用messageBatch#encode方法填充到Remoting Command的body域中。多条消息编码格式可参考图3-12,如代码清单3-31所示。

代码清单3-31 MessageDecoder#encodeMessage

public static byte[] encodeMessage(Message message) {
    byte[] body = message.getBody(); int
    bodyLen = body.length;
    String properties = messageProperties2String(message.getProperties());
    byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
    propertiesLength = (short) propertiesBytes.length; int
    sysFlag = message.getFlag();
    int storeSize = 4 // 1 TOTALSIZE
        + 4 // 2 MAGICCOD
        + 4 // 3 BODYCRC
        + 4 // 4 FLAG
        + 4 + bodyLen // 4 BODY
        + 2 + propertiesLength;
    ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
    // 1 TOTALSIZE
    byteBuffer.putInt(storeSize);
    // 2 MAGICCODE
    byteBuffer.putInt(0);
    // 3 BODYCRC
    byteBuffer.putInt(0);

    // 4 FLAG
    int flag = message.getFlag();
    byteBuffer.putInt(flag);
    // 5 BODY
    byteBuffer.putInt(bodyLen);
    byteBuffer.put(body);
    // 6 properties
    byteBuffer.putShort(propertiesLength);
    byteBuffer.put(propertiesBytes);
    return byteBuffer.array();
}

在消息发送端将会按照上述结构进行解码,整个发送流程与单个消息发送没有差异,就不一一介绍了。