拼图
109.2MB · 2026-02-28
大家好,这是我写的第一篇博客!作为一名刚入门后端的大二学生,以前写单体项目时,遇到业务都是一套同步代码从头调到尾,主打一个“能跑就行”。
但直到我接手了项目里的**“秒杀下单”**模块,才体会到了什么叫“社会的毒打”。在秒杀场景下,瞬间涌入的成千上万个请求如果直接砸进 MySQL,数据库怕是当场就要“拔管”。为了保住可怜的数据库,我们要引入 RabbitMQ。
在刚学的时候,我总觉得这玩意儿把系统搞复杂了,但真正用到高并发场景才知道它有多香。总结起来就是经典的三板斧:
没学之前看官方文档一头雾水,其实把它当成**“寄快递”**秒懂:
了解完这些高大上的概念,我以为我已经无敌了,我的目标特别朴素:消息发出去,消费者能收到,就算成功。
结果真上手秒杀场景后,现实直接给我一记重锤。
我一开始写完 MQ 逻辑,测试没问题就提交了,结果遇到了以下情况:
这就很尴尬:
用户以为自己下单成功,我以为系统挺稳定,数据库表示“关我啥事”。
再加上这些常见的灵魂拷问:
说白了,刚开始的我只做到了“能发能收”,离“生产可落地”还差一大截。
我后面把问题拆成了 4 层来排查,每层都要有兜底:
我当时给自己立了个目标:
下面是我项目里最关键的几段代码(按链路顺序放),注释我写得比较啰嗦,方便面试和复盘。
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1. Redis + Lua 原子校验:库存是否足够、是否重复下单
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int r = result.intValue();
if (r != 0) {
// r=1 库存不足;r=2 重复下单
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 2. 校验通过,构造订单消息体(这里只是消息,不直接落库)
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 3. 发送 MQ:让消费端异步落库,削峰填谷
try {
rabbitTemplate.convertAndSend(MqConfig.EXCHANGE, MqConfig.KEY, voucherOrder);
} catch (Exception e) {
// 4. 发送失败:Redis 里已经做了预扣和下单标记,必须补偿回滚
log.error("秒杀下单消息发送失败,触发补偿回滚。voucherId={}, userId={}, orderId={}",
voucherId, userId, orderId, e);
seckillCompensationService.compensate(voucherId, userId, "MQ发送异常");
return Result.fail("系统繁忙,请稍后重试");
}
// 5. 快速返回:用户先拿到“排队中”
return Result.ok(orderId);
}
@Configuration
public class MqConfig {
// 秒杀主链路
public static final String EXCHANGE = "seckill.topic";
public static final String QUEUE = "seckill.queue";
public static final String KEY = "seckill.order";
// 死信链路
public static final String DLX_EXCHANGE = "seckill.dlx.topic";
public static final String DLX_QUEUE = "seckill.dlx.queue";
public static final String DLX_KEY = "seckill.dlx";
@Bean
public TopicExchange seckillExchange() {
return new TopicExchange(EXCHANGE);
}
@Bean
public Queue seckillQueue() {
// 关键:业务队列绑定死信交换机
// 当消息被 NACK 且 requeue=false 时,会进 DLQ
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DLX_EXCHANGE);
args.put("x-dead-letter-routing-key", DLX_KEY);
return new Queue(QUEUE, true, false, false, args);
}
@Bean
public TopicExchange seckillDlxExchange() {
return new TopicExchange(DLX_EXCHANGE);
}
@Bean
public Queue seckillDlxQueue() {
return new Queue(DLX_QUEUE, true);
}
@Bean
public Binding bindSeckillQueue() {
return BindingBuilder.bind(seckillQueue()).to(seckillExchange()).with(KEY);
}
@Bean
public Binding bindSeckillDlxQueue() {
return BindingBuilder.bind(seckillDlxQueue()).to(seckillDlxExchange()).with(DLX_KEY);
}
@Bean
public SimpleRabbitListenerContainerFactory manualAckContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 手动ACK:业务处理成功后再确认消息
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 每次只拉一条,避免未ACK消息堆在消费者内存里
factory.setPrefetchCount(1);
return factory;
}
}
@RabbitListener(queues = MqConfig.QUEUE, containerFactory = "manualAckContainerFactory")
public void listenSeckillQueue(VoucherOrder voucherOrder, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 1. 用户维度加锁:避免并发重复处理同一用户订单
RLock lock = redissonClient.getLock("lock:order:" + voucherOrder.getUserId());
boolean isLock = lock.tryLock();
if (!isLock) {
// 当前有相同用户订单在处理,直接ACK丢弃当前消息,防止重复消费风暴
channel.basicAck(deliveryTag, false);
return;
}
try {
// 2. 真正执行业务(事务方法)
proxy.createVoucherOrder(voucherOrder);
// 3. 成功后手动ACK
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 4. 读取自定义重试次数
Integer retry = (Integer) message.getMessageProperties().getHeaders().getOrDefault("x-retry", 0);
if (retry < 3) {
int nextRetry = retry + 1;
// 5. 重新投递一条新消息,并把 x-retry +1
MessagePostProcessor mpp = m -> {
m.getMessageProperties().setHeader("x-retry", nextRetry);
return m;
};
rabbitTemplate.convertAndSend(MqConfig.EXCHANGE, MqConfig.KEY, voucherOrder, mpp);
// 6. 重投成功后,ACK 当前失败消息,避免原消息反复立即重投
channel.basicAck(deliveryTag, false);
} else {
// 7. 超过重试上限:NACK 且不重回队列,交给 DLQ 处理
channel.basicNack(deliveryTag, false, false);
}
} finally {
// 8. finally 解锁,防止死锁
lock.unlock();
}
}
@PostConstruct
public void initCallbacks() {
// 1. ConfirmCallback:确认消息是否到达 Broker
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String id = correlationData == null ? null : correlationData.getId();
if (ack) {
log.debug("MQ Confirm ACK, correlationId={}", id);
} else {
// 这里只能说明“Broker没收到”,通常要配合本地消息表/重发任务
log.error("MQ Confirm NACK, correlationId={}, cause={}", id, cause);
}
});
// 2. ReturnCallback:消息到达交换机但无法路由到队列
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
try {
String body = new String(message.getBody());
log.error("MQ Return(不可路由): replyCode={}, replyText={}, exchange={}, routingKey={}, body={}",
replyCode, replyText, exchange, routingKey, body);
// 解析消息,触发补偿,避免Redis状态和DB状态“各走各的”
Map<String, Object> map = objectMapper.readValue(body, Map.class);
Long voucherId = map.get("voucherId") == null ? null : Long.valueOf(map.get("voucherId").toString());
Long userId = map.get("userId") == null ? null : Long.valueOf(map.get("userId").toString());
seckillCompensationService.compensate(voucherId, userId, "消息不可路由(Return)");
} catch (Exception e) {
log.error("处理 MQ Return 时异常", e);
}
});
}
@RabbitListener(queues = MqConfig.DLX_QUEUE, containerFactory = "manualAckContainerFactory")
public void listenSeckillDlx(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String body = new String(message.getBody());
VoucherOrder voucherOrder = objectMapper.readValue(body, VoucherOrder.class);
Long orderId = voucherOrder.getId();
Long voucherId = voucherOrder.getVoucherId();
Long userId = voucherOrder.getUserId();
// 1. 如果DB已经有订单,说明只是消息链路问题,不需要补偿
boolean orderExists = voucherOrderService.query().eq("id", orderId).count() > 0;
// 2. 如果DB没有订单,执行补偿:回滚Redis标记、校准库存
if (!orderExists) {
seckillCompensationService.compensate(voucherId, userId, "消费失败进入DLQ");
}
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 3. DLQ处理再失败也别无限循环,直接ACK并记录日志
log.error("处理秒杀死信消息异常,直接确认丢弃", e);
channel.basicAck(deliveryTag, false);
}
}
public void compensate(Long voucherId, Long userId, String reason) {
if (voucherId == null || userId == null) {
log.error("补偿参数非法:voucherId={}, userId={}, reason={}", voucherId, userId, reason);
return;
}
// 1. 回滚“已下单”标记,让用户可再次尝试
stringRedisTemplate.opsForSet().remove("seckill:order:" + voucherId, userId.toString());
// 2. 按数据库库存校准 Redis 库存(以DB为准)
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
int dbStock = (voucher == null || voucher.getStock() == null) ? 0 : voucher.getStock();
stringRedisTemplate.opsForValue().set("seckill:stock:" + voucherId, String.valueOf(dbStock));
log.warn("秒杀补偿完成:voucherId={}, userId={}, dbStock={}, reason={}", voucherId, userId, dbStock, reason);
}
这部分是我真的“撞墙”撞出来的,建议你直接收藏:
Confirm + Return。如果你也是刚开始搞 RabbitMQ 的同学,我真心建议你别只停在“会发会收”。
把失败链路补齐的那一刻,你会有种“这玩意终于像生产系统了”的踏实感。希望能给各位踩坑路上的小伙伴一点参考!欢迎评论区指正~