RocketMQ 消息发送原理与源码解析
RocketMQ 是一款开源的分布式消息中间件,在高性能、高可用性和大规模分布式场景中表现优异。本文将以图文并茂的形式详细解析 RocketMQ 消息发送的原理以及核心源码,帮助读者深入理解其内部机制。
一、消息发送的整体流程
在 RocketMQ 中,消息发送分为以下几个关键步骤:
消息创建:Producer 构造消息实例(Message)。
路由查找:Producer 根据 Topic 查询对应的 Broker 路由信息。
消息发送:Producer 将消息通过网络发送到目标 Broker。
响应处理:Broker 返回消息发送结果,Producer 处理响应。
下图展示了 RocketMQ 消息发送的整体流程:
graph LR
A[Producer] --> B[查询路由信息]
B --> C[选定目标Broker]
C --> D[发送消息到Broker]
D --> E[Broker返回结果]
E --> F[Producer处理结果]
二、核心组件及角色
1. Producer
消息的生产者,负责创建并发送消息。
DefaultMQProducer:Producer 的核心实现类。
发送模式:支持同步、异步和单向三种模式。
2. NameServer
提供路由服务,Producer 根据 Topic 查询路由信息。
3. Broker
消息的存储和转发节点,接收 Producer 的消息并存储。
三、源码解析
以下以 DefaultMQProducer 为例,分析其消息发送的关键源码。
1. 发送消息的入口
Producer 发送消息的入口方法是 DefaultMQProducer#send,其核心逻辑如下:
/**
* DEFAULT SYNC -------------------------------------------------------
*/
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
可以看到,send 方法直接调用了 defaultMQProducerImpl 的 send 方法。
2. 路由查找
DefaultMQProducerImpl 会调用 MQClientInstance 查询路由信息:
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
}
该方法通过 NameServer 获取 Topic 的路由信息,包括可用的 Broker 列表。
3. 选择消息队列
获取路由信息后,Producer 会选择一个消息队列进行发送:
private MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo) {
int index = this.sendWhichQueue.incrementAndGet();
return tpInfo.getMessageQueueList().get(index % tpInfo.getMessageQueueList().size());
}
Broker 优先选择逻辑
Producer 是如何决定先发送到哪个 Broker?核心逻辑如下:
消息队列轮询机制:
每次发送消息时,Producer 内部维护一个 sendWhichQueue 变量,该变量会递增。
根据递增后的值,对消息队列列表长度取模,得到当前选择的队列索引。
路由策略:
Broker 路由信息存储在 TopicPublishInfo 中,包含了所有可用的队列。
Producer 从这些队列中选择目标队列。
容错机制:
如果某个 Broker 或队列发生故障,Producer 会跳过故障节点,选择下一个可用的 Broker。
通过 updateFaultItem 记录故障 Broker 的状态,并在重试时规避这些 Broker。
private MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 遍历所有队列
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 轮询选择队列
int index = this.sendWhichQueue.incrementAndGet();
MessageQueue mq = tpInfo.getMessageQueueList().get(index % tpInfo.getMessageQueueList().size());
// 跳过上次失败的 Broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}
随机化优化:
当多线程并发发送消息时,轮询机制能够尽可能均匀分布负载到不同的 Broker 上。
4. 消息发送
选择目标队列后,Producer 将消息发送到对应的 Broker:
SendResult sendKernelImpl(
Message msg, MessageQueue mq, CommunicationMode communicationMode, SendCallback sendCallback) {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
return this.mQClientAPIImpl.sendMessage(
brokerAddr, mq.getTopic(), msg, communicationMode, sendCallback
);
}
消息通过网络发送到目标 Broker,并根据通信模式(同步、异步或单向)处理响应。
四、消息发送的核心细节
1. sendDefaultImpl 方法分析
sendDefaultImpl 是 RocketMQ 消息发送的核心方法,包含消息重试、多线程支持及不同模式的发送逻辑。以下是关键代码解析:
1.1 路由信息的获取
在方法中,首先会调用 tryToFindTopicPublishInfo 获取 Topic 的路由信息:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
如果路由信息不可用,则会抛出异常或提示重新检查 NameServer 配置。
1.2 消息队列选择与发送
通过循环机制重试消息发送:
for (; times < timesTotal; times++) {
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
if (sendResult.getSendStatus() == SendStatus.SEND_OK || !retryEnabled) {
return sendResult;
}
}
}
1.3 重试机制
重试条件:默认只在同步发送模式下重试。
Broker 路由选择:每次重试会重新选择 Broker,避免路由故障。
2. 三种消息发送模式的实现
2.1 同步发送
同步发送通过 sendKernelImpl 方法实现,等待发送结果返回:
SendResult sendResult = this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, topicPublishInfo, timeout);
return sendResult;
特点:高可靠性,适合关键数据。
重试机制:默认重试 2 次。
2.2 异步发送
异步发送通过注册回调函数处理结果:
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, topicPublishInfo, timeout);
特点:高性能,适合非关键数据。
回调处理:使用 SendCallback 处理成功或失败的响应。
2.3 单向发送
单向发送不关心发送结果,直接调用核心方法:
this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, topicPublishInfo, timeout);
特点:最低延迟,适合日志等非关键数据。
五、总结
RocketMQ 消息发送的核心流程包括消息创建、路由查找、消息发送和响应处理,其实现依赖于多个核心组件和机制,例如路由缓存、消息队列选择和重试机制。通过 sendDefaultImpl 方法的源码分析,可以深入理解同步、异步及单向发送的实现原理。
作者:nice_smile
链接:https://juejin.cn