rocketmq的消息发送原理
2025-01-01 09:25 阅读(148)

RocketMQ 消息发送原理与源码解析

RocketMQ 是一款开源的分布式消息中间件,在高性能、高可用性和大规模分布式场景中表现优异。本文将以图文并茂的形式详细解析 RocketMQ 消息发送的原理以及核心源码,帮助读者深入理解其内部机制。


一、消息发送的整体流程

在 RocketMQ 中,消息发送分为以下几个关键步骤:


消息创建:Producer 构造消息实例(Message)。

路由查找:Producer 根据 Topic 查询对应的 Broker 路由信息。

消息发送:Producer 将消息通过网络发送到目标 Broker。

响应处理:Broker 返回消息发送结果,Producer 处理响应。


下图展示了 RocketMQ 消息发送的整体流程:

graph LR
A[Producer] --> B[查询路由信息]
B --> C[选定目标Broker]
C --> D[发送消息到Broker]
D --> E[Broker返回结果]
E --> F[Producer处理结果]

二、核心组件及角色

1. Producer

消息的生产者,负责创建并发送消息。


DefaultMQProducer:Producer 的核心实现类。

发送模式:支持同步、异步和单向三种模式。


2. NameServer

提供路由服务,Producer 根据 Topic 查询路由信息。

3. Broker

消息的存储和转发节点,接收 Producer 的消息并存储。


三、源码解析

以下以 DefaultMQProducer 为例,分析其消息发送的关键源码。

1. 发送消息的入口

Producer 发送消息的入口方法是 DefaultMQProducer#send,其核心逻辑如下:

/**
 * DEFAULT SYNC -------------------------------------------------------
 */
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

可以看到,send 方法直接调用了 defaultMQProducerImpl 的 send 方法。


2. 路由查找

DefaultMQProducerImpl 会调用 MQClientInstance 查询路由信息:

public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
    this.makeSureStateOK();
    return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
}

该方法通过 NameServer 获取 Topic 的路由信息,包括可用的 Broker 列表。


3. 选择消息队列

获取路由信息后,Producer 会选择一个消息队列进行发送:

private MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo) {
    int index = this.sendWhichQueue.incrementAndGet();
    return tpInfo.getMessageQueueList().get(index % tpInfo.getMessageQueueList().size());
}

Broker 优先选择逻辑

Producer 是如何决定先发送到哪个 Broker?核心逻辑如下:



消息队列轮询机制:


每次发送消息时,Producer 内部维护一个 sendWhichQueue 变量,该变量会递增。

根据递增后的值,对消息队列列表长度取模,得到当前选择的队列索引。




路由策略:


Broker 路由信息存储在 TopicPublishInfo 中,包含了所有可用的队列。

Producer 从这些队列中选择目标队列。




容错机制:


如果某个 Broker 或队列发生故障,Producer 会跳过故障节点,选择下一个可用的 Broker。

通过 updateFaultItem 记录故障 Broker 的状态,并在重试时规避这些 Broker。

private MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // 遍历所有队列
    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
        // 轮询选择队列
        int index = this.sendWhichQueue.incrementAndGet();
        MessageQueue mq = tpInfo.getMessageQueueList().get(index % tpInfo.getMessageQueueList().size());
        // 跳过上次失败的 Broker
        if (!mq.getBrokerName().equals(lastBrokerName)) {
            return mq;
        }
    }
    return null;
}

随机化优化:


当多线程并发发送消息时,轮询机制能够尽可能均匀分布负载到不同的 Broker 上。

4. 消息发送

选择目标队列后,Producer 将消息发送到对应的 Broker:

SendResult sendKernelImpl(
    Message msg, MessageQueue mq, CommunicationMode communicationMode, SendCallback sendCallback) {
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    return this.mQClientAPIImpl.sendMessage(
        brokerAddr, mq.getTopic(), msg, communicationMode, sendCallback
    );
}

消息通过网络发送到目标 Broker,并根据通信模式(同步、异步或单向)处理响应。


四、消息发送的核心细节

1. sendDefaultImpl 方法分析

sendDefaultImpl 是 RocketMQ 消息发送的核心方法,包含消息重试、多线程支持及不同模式的发送逻辑。以下是关键代码解析:

1.1 路由信息的获取

在方法中,首先会调用 tryToFindTopicPublishInfo 获取 Topic 的路由信息:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

如果路由信息不可用,则会抛出异常或提示重新检查 NameServer 配置。


1.2 消息队列选择与发送

通过循环机制重试消息发送:


for (; times < timesTotal; times++) {
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
        if (sendResult.getSendStatus() == SendStatus.SEND_OK || !retryEnabled) {
            return sendResult;
        }
    }
}

1.3 重试机制


重试条件:默认只在同步发送模式下重试。

Broker 路由选择:每次重试会重新选择 Broker,避免路由故障。


2. 三种消息发送模式的实现

2.1 同步发送

同步发送通过 sendKernelImpl 方法实现,等待发送结果返回:

SendResult sendResult = this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, topicPublishInfo, timeout);
return sendResult;

特点:高可靠性,适合关键数据。

重试机制:默认重试 2 次。

2.2 异步发送

异步发送通过注册回调函数处理结果:

this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, topicPublishInfo, timeout);

特点:高性能,适合非关键数据。

回调处理:使用 SendCallback 处理成功或失败的响应。

2.3 单向发送

单向发送不关心发送结果,直接调用核心方法:

this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, topicPublishInfo, timeout);

特点:最低延迟,适合日志等非关键数据。



五、总结

RocketMQ 消息发送的核心流程包括消息创建、路由查找、消息发送和响应处理,其实现依赖于多个核心组件和机制,例如路由缓存、消息队列选择和重试机制。通过 sendDefaultImpl 方法的源码分析,可以深入理解同步、异步及单向发送的实现原理。


作者:nice_smile

链接:https://juejin.cn