塞瑞亚免安装正式中文版
1.37G · 2025-10-16
[入门精通] TLjava高薪扩展训练VIP系列179G69---获课地址:666it.top/13872/
在分布式系统架构中,消息中间件(MQ)与分布式锁已成为解决高并发、解耦服务、保证数据一致性的核心技术。本文将以RocketMQ为核心消息中间件,结合分布式锁的多种实现方案,通过企业级实战案例,深入剖析其技术原理与最佳实践。
消息中间件通过异步通信机制,解决了传统同步调用中响应延迟、系统耦合度高的问题。典型场景包括:
特性 | RocketMQ | Kafka | RabbitMQ |
---|---|---|---|
语言 | Java(阿里双11验证) | Scala/Java(大数据生态) | Erlang(高并发低延迟) |
协议 | 自研协议 | 自研协议 | AMQP/STOMP |
吞吐量 | 10万级TPS(低延迟) | 百万级TPS(批量处理) | 万级TPS(单线程模型) |
可靠性 | 同步/异步刷盘、主从复制 | 多副本复制(ISR机制) | 持久化队列、镜像队列 |
功能 | 事务消息、延迟消息、顺序消息 | 日志聚合、流处理 | 灵活路由、死信队列 |
适用场景 | 电商交易、金融系统 | 大数据分析、日志收集 | 任务队列、微服务通知 |
选型建议:
前置要求:
安装步骤:
# 下载RocketMQ 4.9.4版本
wget https://dist.apache.rocketmq.com/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
unzip rocketmq-all-4.9.4-bin-release.zip
cd rocketmq-all-4.9.4-bin-release
# 配置环境变量
export ROCKETMQ_HOME=$PWD
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 启动NameServer
nohup sh mqnamesrv &
# 启动Broker(配置autoCreateTopicEnable=true)
nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &
可视化插件安装:
git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-externals/rocketmq-console
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876
访问 http://localhost:8080
查看控制台。
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message(
"OrderTopic",
"TagA",
"OrderCreated".getBytes(StandardCharsets.UTF_8)
);
SendResult result = producer.send(msg);
System.out.println("消息ID: " + result.getMsgId());
producer.shutdown();
}
}
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message(
"VideoTopic",
"TagB",
"VideoUploaded".getBytes(StandardCharsets.UTF_8)
);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送成功: " + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.err.println("异步发送失败: " + e.getMessage());
}
});
Thread.sleep(1000); // 等待异步回调完成
producer.shutdown();
}
}
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OrderTopic", "*"); // 订阅所有Tag
consumer.setMessageModel(MessageModel.BROADCASTING); // 广播模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到广播消息: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("广播消费者启动成功");
}
}
SETNX + EXPIRE
非原子性导致死锁,主从同步延迟引发锁失效。tldb通过MQ的队列机制实现锁的争用与释放,核心逻辑如下:
// Java客户端实现
public class MqLockClient {
private SimpleClient mqClient;
public MqLockClient(String wsUrl, String authToken) {
this.mqClient = new SimpleClient(wsUrl, authToken);
mqClient.connect();
}
// 阻塞式加锁(带超时)
public String lock(String lockKey, long timeoutSeconds) {
return mqClient.lock(lockKey, timeoutSeconds);
}
// 非阻塞式加锁
public boolean tryLock(String lockKey, long timeoutSeconds) {
return mqClient.tryLock(lockKey, timeoutSeconds);
}
// 释放锁
public void unlock(String lockKey) {
mqClient.unLock(lockKey);
}
}
// 使用示例
public class OrderService {
private MqLockClient lockClient = new MqLockClient("ws://127.0.0.1:5001", "token123");
public void createOrder(String userId) {
String lockKey = "order_lock_" + userId;
try {
String key = lockClient.lock(lockKey, 5); // 5秒超时
// 执行业务逻辑(数据库操作、消息发送等)
System.out.println("用户" + userId + "创建订单成功");
} finally {
lockClient.unlock(lockKey);
}
}
}
Lock4j支持Redis、Redisson、Zookeeper等多种底层实现,通过注解简化分布式锁使用:
@Service
public class InventoryService {
@Lock4j(
name = "inventory_lock",
key = "#orderId",
executor = RedissonLockExecutor.class, // 指定执行器
acquireTimeout = 3000, // 获取锁超时时间
leaseTime = 10000 // 锁持有时间
)
public void deductInventory(String orderId, int quantity) {
// 扣减库存逻辑
System.out.println("订单" + orderId + "扣减库存" + quantity + "件");
}
}
配置文件:
spring:
redis:
host: localhost
port: 6379
lock4j:
enabled: true
redis-template:
enable: true
redisson:
enable: false
[订单微服务] → (RocketMQ) → [短信微服务]
↑
[用户微服务] → (RocketMQ) → [日志服务]
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostMapping
public ResponseEntity<?> createOrder(@RequestBody OrderDTO orderDTO) {
// 1. 保存订单到数据库
Order order = orderService.saveOrder(orderDTO);
// 2. 发送消息到MQ(异步通知短信服务)
Message<String> message = MessageBuilder.withPayload(order.getOrderId())
.setHeader(MessageConst.PROPERTY_KEYS, "sms_notify")
.build();
rocketMQTemplate.syncSend("OrderTopic:SMS", message);
return ResponseEntity.ok("订单创建成功");
}
}
@RocketMQMessageListener(
topic = "OrderTopic",
selectorExpression = "SMS", // 消费Tag为SMS的消息
consumerGroup = "sms_consumer_group"
)
@Service
public class SmsConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String orderId) {
// 1. 查询订单信息
Order order = orderService.getOrderById(orderId);
// 2. 发送短信(模拟)
System.out.println("发送短信给用户" + order.getUserId() +
": 您的订单" + orderId + "已创建");
// 3. 记录日志
logService.record("短信发送成功,订单ID: " + orderId);
}
}
curl -X POST http://localhost:8080/orders
-H "Content-Type: application/json"
-d '{"userId": "1001", "productId": "P001", "quantity": 2}'
订单微服务: 订单创建成功,ID: ORD20251014001
短信微服务: 发送短信给用户1001: 您的订单ORD20251014001已创建
通过深耕MQ消息中间件与分布式锁技术,开发者能够构建出高可用、低延迟、易扩展的企业级分布式系统,为业务创新提供坚实的技术底座。
1.37G · 2025-10-16
9.29G · 2025-10-16
9.55G · 2025-10-16
799~1199 元雷蛇清姬 V2 / X 直播摄像头发布:4K 画质 + AI 智能自动取景
卫星互联网低轨 12 组卫星发射成功,我国长征系列火箭达成 600 次里程碑、速度创纪录