噗噗的冒险乐园
49.73M · 2026-02-26
消息队列中间件(Message Queue Middleware,简称MOM)是一种基于消息传递的分布式系统基础软件,它充当应用程序之间的"消息邮局"或"异步通信桥梁",允许应用通过发送和接收消息进行通信,而无需直接建立连接或同时在线。
主流消息队列中间件产品
| 产品 | 开发语言 | 核心协议 | 适用场景 | 特点 |
|---|---|---|---|---|
| RabbitMQ | Erlang | AMQP | 企业级应用、复杂路由 | 功能丰富、生态成熟、延迟低 |
| Apache Kafka | Scala/Java | 自定义协议 | 大数据流处理、日志收集 | 高吞吐、持久化、分区机制 |
| RocketMQ | Java | 自定义协议 | 金融级应用、高可靠 | 阿里开源、事务消息、延迟消息 |
| ActiveMQ | Java | JMS/AMQP | 传统企业集成 | 老牌产品、支持 JMS 规范 |
| Pulsar | Java | 自定义协议 | 云原生、多租户 | 存算分离、水平扩展强 |
| ZeroMQ | C++ | 自定义协议 | 高性能嵌入式 | 无 Broker、库级别实现 |
消息队列(Message Queue,简称MQ)是一种异步的通信机制,它让应用程序能够通过发送和接收消息来交换数据,而无需直接连接或同时运行。简单来说,它是一个暂存消息的"缓冲区" ,遵循先进先出(FIFO) 的原则。
定义:调用方发送请求后,必须阻塞,等待被调用方处理完成并返回结果,才能继续执行后续代码。
举个例子来说就是打电话:
定义:调用方发送请求后立即返回,无需等待被调用方处理,被调用方完成后通过回调、通知或轮询方式告知结果。
举个例子来说就是发微信
| 优缺点 | 同步流程 | 异步流程 |
|---|---|---|
| 响应时间 | 慢(累加所有服务耗时) | 快(仅主逻辑耗时) |
| 系统耦合 | 高(直接依赖下游服务) | 低(通过队列解耦) |
| 容错性 | 差(下游故障影响上游) | 强(故障隔离,可重试) |
| 资源利用 | 低(线程阻塞等待) | 高(线程立即释放) |
| 数据一致性 | 强(实时确认结果) | 最终一致性(短暂延迟) |
| 编程复杂度 | 简单(线性思维) | 复杂(需处理回调、幂等) |
| 适用场景 | 查询、强一致性事务 | 通知、耗时操作、削峰 |
在上面我们学到了同步调用与异步调用,我们也列举了优缺点,那么我们知道消息队列是一种异步通信机制,我们来看下面这张图,这是一个非常典型的一个支付流程。
在上述支付流程当中有三步,如果我们使用同步调用机制,那么每执行一步就要等待一次返回结果,然后根据返回结果选择是否执行下一步。
如果这时候突然有两个新的需求,那么你不仅要写新需求的代码,还要更改之前的代码以适配新的需求,并且如果后续还是有新的需求,使用同步调用所完成的时间只会越来越长。
RabbitMQ是一个开源的消息队列中间件,它实现了高级消息队列协议(AMQP),用于在分布式系统中存储和转发消息。
在安装完rabbitmq后进入到控制页面,类似于minio的控制页面,如下图所示
给各位翻译一下再看看
其中最主要的就是如下这几个,在第五条中我们简单的介绍了MQ中存在的几类概念。
我们来看看队列界面
翻译一下,很简单,下面有一个创建队列,我们先创建两个队列
我们简单的写一个案例,不涉及到交换机,就是从生产者到队列再到消费者
我们需要的是Spring AMQP
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后在applicant.yml中配置mq地址与用户
spring:
application:
name: learnMQ
rabbitmq:
host: ip地址
port: 5672(如果是云服务器别忘了放行)
virtual-host: /(你自己的)
username: admin(你自己的)
password: admin(你自己的)
阅读如下代码,这是一个生产者发送消息的
package com.example;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class SpringAmqpTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "queue.a";//消息
String message = "你好,mq1";
rabbitTemplate.convertAndSend(queueName, message);
}
}
首先我们在创建一个模块模拟两个微服务之间的通讯,然后创建代码
编写消费者代码
package com.example.listeners;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqListener {
@RabbitListener(queues = "queue.a")
public void listensimpleQueue(String msg){
System.out.println("消费者收到了queue.a的消息:【" + msg + "】");
}
}
首先我们先运行消费者的代码,然后运行生产者的测试程序,在控制台就能看到以下打印信息