- RocketMQ技术内幕:RocketMQ架构设计与实现原理(第2版)
- 丁威 张登 周继锋
- 884字
- 2022-08-25 16:04:35
3.5 批量消息发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多,性能就越好,判断依据是单条消息的长度,如果单条消息内容比较长,则打包发送多条消息会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过Default MQProducer#maxMessageSize。批量发送消息要解决的是如何将这些消息编码,以便服务端能够正确解码每条消息的内容。
那么RocketMQ如何编码多条消息呢?我们首先梳理一下RocketMQ网络请求命令的设计,其类图如图3-11所示。下面我们逐一介绍RemotingCommand的属性。
图3-11 RocketMQ请求命令类图
1)code:请求命令编码,请求命令类型。
2)version:版本号。
3)opaque:客户端请求序号。
4)flag:标记。倒数第一位表示请求类型,0表示请求;1表示返回。倒数第二位,1表示单向发送。
5)remark:描述。
6)extFields:扩展属性。
7)customeHeader:每个请求对应的请求头信息。
8)byte[] body:消息体内容。
发送单条消息时,消息体的内容将保存在body中。发送批量消息时,需要将多条消息体的内容存储在body中。如何存储更便于服务端正确解析每条消息呢?RocketMQ采取的方式是,对单条消息内容使用固定格式进行存储,如图3-12所示。
图3-12 RocetMQ消息封装格式
接下来梳理一下批量消息发送的核心流程,如代码清单3-29所示。
代码清单3-29 DefaultMQProducer#send消息批量发送
public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs)); }
首先在消息发送端,调用batch()方法,将一批消息封装成MessageBatch对象。Message-Batch继承自Message对象,内部持有List<Message> messages。这样一来,批量消息发送与单条消息发送的处理流程就完全一样了。MessageBatch只需要将该集合中每条消息的消息体聚合成一个byte[]数组,在消息服务端能够从该byte[]数组中正确解析出消息,如代码清单3-30所示。
代码清单3-30 Message’Batch#encode
public byte[] encode() { return MessageDecoder.encodeMessages(messages); }
在创建RemotingCommand对象时,调用messageBatch#encode方法填充到Remoting Command的body域中。多条消息编码格式可参考图3-12,如代码清单3-31所示。
代码清单3-31 MessageDecoder#encodeMessage
public static byte[] encodeMessage(Message message) { byte[] body = message.getBody(); int bodyLen = body.length; String properties = messageProperties2String(message.getProperties()); byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); propertiesLength = (short) propertiesBytes.length; int sysFlag = message.getFlag(); int storeSize = 4 // 1 TOTALSIZE + 4 // 2 MAGICCOD + 4 // 3 BODYCRC + 4 // 4 FLAG + 4 + bodyLen // 4 BODY + 2 + propertiesLength; ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize); // 1 TOTALSIZE byteBuffer.putInt(storeSize); // 2 MAGICCODE byteBuffer.putInt(0); // 3 BODYCRC byteBuffer.putInt(0); // 4 FLAG int flag = message.getFlag(); byteBuffer.putInt(flag); // 5 BODY byteBuffer.putInt(bodyLen); byteBuffer.put(body); // 6 properties byteBuffer.putShort(propertiesLength); byteBuffer.put(propertiesBytes); return byteBuffer.array(); }
在消息发送端将会按照上述结构进行解码,整个发送流程与单个消息发送没有差异,就不一一介绍了。