Loading...

RocketMQ:消息发送机制

2025-03-12

作者:Hap Tool

来聊聊这个老掉牙的话题,从自己的角度说下他们的概况和使用,同时也巩固下自己的八股文。
根据RocketMQ的常见分类,通常分为同步消息、异步消息和单向消息。同步消息指的是消息发送后等待Broker的响应。异步消息指的是发送后不等待Broker的响应,而是通过回调函数来处理发送结果。单向消息就更为简单,消息发出去就不管了,不管响应也不管回调。

使用说明

在消息的生产者如何发送这三类消息,请看如下代码:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper; // 同步消息发送
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 初始化生产者,指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();

try {
// 创建消息实例,指定 Topic、Tag 和消息体
Message msg = new Message("TestTopic",
"TagA",
"Hello RocketMQ Sync Message".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 发送消息并获取发送结果
producer.send(msg);
System.out.println("同步消息发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭生产者
producer.shutdown();
}
}
}


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper; // 异步消息发送
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 设置异步发送失败时的重试次数
producer.setRetryTimesWhenSendAsyncFailed(0); Message msg = new Message("TestTopic",
"TagB",
"Hello RocketMQ Async Message".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 异步发送消息,注册回调接口处理结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步消息发送成功: " + sendResult);
} @Override
public void onException(Throwable e) {
System.out.println("异步消息发送失败");
e.printStackTrace();
}
}); // 等待异步发送完成(实际生产环境中不需要 sleep)
Thread.sleep(3000);
producer.shutdown();
}
}


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper; // 单向消息发送
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message("TestTopic",
"TagC",
"Hello RocketMQ Oneway Message".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 单向发送,不等待响应
producer.sendOneway(msg);
System.out.println("单向消息已发送");

producer.shutdown();
}
}
同步消息发送和单向消息发送都比较简单,异步消息发送也比较简单,只是在send方法加了一个待实现的接口SendCallback,实现onSuccess方法即可。

源码实现

那在源码层面是如何实现的呢?请看方法MQClientAPIImpl.sendMessage。
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
RocketMQ的底层网络通信完全依赖Netty,在这个三个方法调用中,均是是将调用数据封装为RemotingCommand对象,并通过channel.writeAndFlush方法将数据发送至broker,这也是RocketMQ的核心网络通信机制,心跳、路由信息同步都是相同的方式。

实际项目使用情况

本文的中是想和大家分享下在实际项目中的使用情况。当然,在很多项目中很多人并不在乎使用的那种消息方式,因为本身的流量和项目严谨的要求并不是很高,容错性也比较强。
同步消息:适用于强一致性的业务场景,比如我所在的金融交易、订单支付,确保消息一定要投递成功才能进行下一步操作。我所在的信贷行业以及保险行业,订单数据并不会产生极高并发的情况。
异步消息:秒杀、大促等营销场景、说白了就是丢掉几个也无所谓的情况,不要设计交易、金额这块的业务场景。
单向发送:日志收集或者监控类,广播数据方式,追求极致性能。但其实这样的场景下我建议用Kafka,我也并未用过单向发送方式。