闪电疯狂赛车
91.44M · 2026-03-23
2.x 或 3.x4.9.x 或 5.x在 pom.xml 中添加 RocketMQ 官方 Spring Boot Starter 依赖:
<!-- RocketMQ Spring Boot Starter 核心依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version> <!-- 与 RocketMQ 服务端版本匹配 -->
</dependency>
<!-- 可选:测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
spring:
application:
name: rocketmq-spring-boot-demo
# RocketMQ 核心配置
rocketmq:
# NameServer 地址(集群用分号分隔)
name-server: 127.0.0.1:9876
# 生产者配置
producer:
# 生产者组名(必填,建议按业务命名)
group: demo-producer-group
# 发送超时时间(默认 3000ms)
send-message-timeout: 3000
# 同步发送失败重试次数(默认 2)
retry-times-when-send-failed: 2
# 异步发送失败重试次数(默认 2)
retry-times-when-send-async-failed: 2
# 消息最大长度(默认 4194304 字节 = 4MB)
max-message-size: 4194304
# 压缩阈值(默认 4096 字节,超过自动压缩)
compress-message-body-threshold: 4096
# 消费者配置(全局默认,可在消费端注解覆盖)
consumer:
group: demo-consumer-group
# 消费线程数(默认 20)
consume-thread-min: 10
consume-thread-max: 20
# 批量消费最大条数(默认 1)
consume-message-batch-max-size: 1
# 最大重试次数(默认 -1 表示 16 次)
max-reconsume-times: 3
| 配置项 | 作用 | 生产建议 |
|---|---|---|
name-server | NameServer 地址 | 集群环境配置多个(用分号分隔),避免单点 |
producer.group | 生产者组名 | 按业务划分(如 order-producer-group) |
producer.send-message-timeout | 发送超时 | 核心业务设为 5000ms,避免超时过短 |
consumer.group | 消费者组名 | 一个业务逻辑对应一个组,不可复用 |
consumer.max-reconsume-times | 最大重试次数 | 非核心业务设为 3 次,核心业务设为 5 次 |
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@Component
public class NormalMessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送普通消息
* @param topic 消息主题
* @param msgContent 消息内容
* @param msgKey 消息唯一键(用于追踪/幂等)
*/
public void sendNormalMessage(String topic, String msgContent, String msgKey) {
// 构建消息(支持自定义 Header)
Message<String> message = MessageBuilder
.withPayload(msgContent)
// 设置消息 Key(必填,用于幂等/追踪)
.setHeader(RocketMQHeaders.KEYS, msgKey)
// 可选:设置 Tag
.setHeader(RocketMQHeaders.TAGS, "normal_tag")
.build();
// 同步发送(topic:tag 格式指定 Tag)
rocketMQTemplate.syncSend(topic + ":normal_tag", message);
System.out.println("普通消息发送成功:" + msgKey);
}
}
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 普通消息消费者
* - consumerGroup:消费者组(必填)
* - topic:订阅主题(必填)
* - selectorExpression:Tag 过滤(* 表示所有)
* - messageModel:消费模式(CLUSTERING 集群/ BROADCASTING 广播)
* - consumeMode:消费模式(CONCURRENTLY 并发/ ORDERLY 顺序)
*/
@Component
@RocketMQMessageListener(
consumerGroup = "demo-consumer-group",
topic = "normal_topic",
selectorExpression = "normal_tag",
messageModel = MessageModel.CLUSTERING,
consumeMode = ConsumeMode.CONCURRENTLY
)
public class NormalMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
// 消费逻辑(需保证幂等)
System.out.println("收到普通消息:" + msg);
// 业务处理...
}
}
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class OrderedMessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送顺序消息(按业务键哈希选择队列)
* @param topic 主题
* @param msgContent 内容
* @param orderId 业务唯一键(如订单ID,保证同ID入同队列)
*/
public void sendOrderedMessage(String topic, String msgContent, String orderId) {
// 构建消息
String message = "订单" + orderId + ":" + msgContent;
// 发送顺序消息(指定 hashKey 为 orderId)
rocketMQTemplate.syncSendOrderly(
topic + ":order_tag",
MessageBuilder.withPayload(message).build(),
orderId // 关键:hashKey,保证同值入同队列
);
System.out.println("顺序消息发送成功:" + orderId);
}
}
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
consumerGroup = "order-consumer-group",
topic = "order_topic",
selectorExpression = "order_tag",
// 核心:顺序消费必须设为 ORDERLY
consumeMode = ConsumeMode.ORDERLY
)
public class OrderedMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
// 单线程消费,保证顺序
System.out.println("收到顺序消息:" + msg);
// 业务处理(如订单创建→支付→发货)
}
}
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class DelayMessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送延迟消息(固定等级)
* @param topic 主题
* @param msgContent 内容
* @param delayLevel 延迟等级(1=1s,5=1m,18=2h)
*/
public void sendDelayMessage(String topic, String msgContent, int delayLevel) {
// 构建消息
org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(
topic,
"delay_tag",
msgContent.getBytes()
);
// 设置延迟等级
rocketMsg.setDelayTimeLevel(delayLevel);
// 发送延迟消息
rocketMQTemplate.getProducer().send(rocketMsg);
System.out.println("延迟消息发送成功(等级" + delayLevel + "):" + msgContent);
}
}
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
consumerGroup = "delay-consumer-group",
topic = "delay_topic",
selectorExpression = "delay_tag"
)
public class DelayMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("收到延迟消息:" + msg);
// 业务处理(如订单超时取消)
}
}
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* 事务消息器
* - txProducerGroup:对应生产者组名
*/
@RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
@Component
public class TransactionListener implements RocketMQLocalTransactionListener {
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 获取消息内容
String msgContent = new String((byte[]) msg.getPayload());
String orderId = msg.getHeaders().get("KEYS").toString();
try {
// 执行本地事务(如扣减库存、创建订单)
boolean success = executeLocalDBTransaction(orderId);
if (success) {
// 提交消息
return RocketMQLocalTransactionState.COMMIT;
} else {
// 回滚消息
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
// 未知状态,等待回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 事务回查(Broker 主动调用)
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = msg.getHeaders().get("KEYS").toString();
// 查询本地事务状态
boolean isSuccess = queryLocalTransactionStatus(orderId);
return isSuccess ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
// 模拟本地事务执行
private boolean executeLocalDBTransaction(String orderId) {
// 实际业务逻辑:操作数据库/缓存等
return true;
}
// 模拟查询本地事务状态
private boolean queryLocalTransactionStatus(String orderId) {
// 实际查询数据库状态
return true;
}
}
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class TransactionMessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
* @param topic 主题
* @param msgContent 内容
* @param orderId 订单ID(消息Key)
*/
public void sendTransactionMessage(String topic, String msgContent, String orderId) {
// 构建消息
Message<String> message = MessageBuilder
.withPayload(msgContent)
.setHeader("KEYS", orderId)
.setHeader("TAGS", "tx_tag")
.build();
// 发送事务消息(最后一个参数为 arg,会传给器)
rocketMQTemplate.sendMessageInTransaction(
topic + ":tx_tag",
message,
null // 自定义参数,可传业务对象
);
System.out.println("事务消息发送成功(半消息):" + orderId);
}
}
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
consumerGroup = "tx-consumer-group",
topic = "tx_topic",
selectorExpression = "tx_tag"
)
public class TransactionMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("收到事务消息:" + msg);
// 业务处理(如通知物流、更新积分)
}
}
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@Component
public class BatchMessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送批量消息
* @param topic 主题
*/
public void sendBatchMessage(String topic) {
// 构建批量消息列表(必须同Topic、同Tag、无延迟/事务)
List<Message> msgList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message msg = new Message(
topic,
"batch_tag",
("批量消息" + i).getBytes()
);
msgList.add(msg);
}
// 发送批量消息
SendResult result = rocketMQTemplate.getProducer().send(msgList);
System.out.println("批量消息发送成功:" + result.getMsgId());
}
}
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQBatchListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@RocketMQMessageListener(
consumerGroup = "batch-consumer-group",
topic = "batch_topic",
selectorExpression = "batch_tag",
// 批量消费最大条数(需与配置项一致)
consumeMessageBatchMaxSize = 10
)
public class BatchMessageConsumer implements RocketMQBatchListener<String> {
@Override
public void onMessage(List<String> msgs) {
// 批量处理消息
System.out.println("收到批量消息,共" + msgs.size() + "条:");
msgs.forEach(msg -> System.out.println("- " + msg));
}
}
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
public class RocketMQTest {
@Resource
private NormalMessageProducer normalMessageProducer;
@Resource
private OrderedMessageProducer orderedMessageProducer;
@Resource
private DelayMessageProducer delayMessageProducer;
@Resource
private TransactionMessageProducer transactionMessageProducer;
@Resource
private BatchMessageProducer batchMessageProducer;
// 测试普通消息
@Test
public void testNormalMessage() {
normalMessageProducer.sendNormalMessage("normal_topic", "Hello RocketMQ", "MSG_001");
}
// 测试顺序消息
@Test
public void testOrderedMessage() {
String orderId = "ORDER_1001";
orderedMessageProducer.sendOrderedMessage("order_topic", "创建", orderId);
orderedMessageProducer.sendOrderedMessage("order_topic", "支付", orderId);
orderedMessageProducer.sendOrderedMessage("order_topic", "发货", orderId);
}
// 测试延迟消息(等级5=1分钟)
@Test
public void testDelayMessage() {
delayMessageProducer.sendDelayMessage("delay_topic", "订单超时取消", 5);
}
// 测试事务消息
@Test
public void testTransactionMessage() {
transactionMessageProducer.sendTransactionMessage("tx_topic", "订单支付成功", "ORDER_10086");
}
// 测试批量消息
@Test
public void testBatchMessage() {
batchMessageProducer.sendBatchMessage("batch_topic");
}
}
msgKey 做幂等(如 Redis 分布式锁、数据库唯一键);MQClientException,实现失败重试/降级;brokerOffset - consumerOffset);%DLQ%{consumerGroup});name-server 和 producer/consumer.group 是必填项,需按业务规范命名;hashKey,消费端设为 ORDERLY;delayTimeLevel;RocketMQLocalTransactionListener,处理本地事务和回查;RocketMQBatchListener;