一提到消息队列,想必你即便没真正上手实践过,也一定对其有所耳闻,在之前的博客中以 Kafka
为例深入解析了消息队列模型以及其所解决的一些痛点。
针对 Kafka
队列部分,感兴趣的小伙伴可以去考古,传送门放这了:
将视线收回到现在,不论是 Kafka
又或是今天的主角 RabbitMQ
,其核心都在消息队列上,基于二者的存储方式以及数据传输模型从而衍生出不同的特性以及适用场景。
以 Kafka
为例,其为队列数据提供了持久化功能,基于主题与消费组的形式拉取生产者所产生的数据,拥有极高的吞吐量广泛应用于大数据领域。
RabbitMQ
的特点即将交换器与队列相结合,通过交换机从而实现复杂的数据路由功能。同时与 Kafka
中主动拉取数据相比,RabbitMQ
则恰好相反由生产者主动推送数据,通过消息确认机制避免数据丢失与重复消费。
下面通过项目集成实战介绍
RabbitMQ
的集成应用
一、基础介绍
1. 核心概念
在 RabbitMQ
中包含了下述几个核心概念,让我们分别进行介绍。
(1) 虚拟机
虚拟主机 (Virtual Hosts)
是 RabbitMQ
中实现多租户的途径,通过不同的虚拟主机实现数据隔离。
(2) 交换机
顾名思义交互机 (Exchange)
即消息的中转站,当生产者发送数据后,消息将先发送至交换机,再进行转发的对应的队列。
(3) 路由KEY
路由键 (Router Key)
则是交换机与消息队列的缓冲层,通过路由键决定一个消息应该发送至哪个队列。
(4) 消息队列
消息队列即最终消息的临时存储媒介,经过路由转发的消息将暂存于队列中,等待被消费者所消费。
2. 模块解耦
从上述架构图中你或许会疑惑,Router Key
存在的意义是什么?为什么交换机与消息队列不能直接关联?
答案其实很简单,Router Key
的引入是 RabbitMQ
为解决功能耦合而提出的特性。通过 Router Key
不仅实现了队列与交换机的解耦,同时 Router Key
的引入提供了一系列更为灵活的消息映射方式。
试想一下如果没有 Router Key
,那消息将只能固定发到某个队列或广播到所有队列。而引入 Router Key
后交换机可根据绑定规则决定路由到哪些队列,同时也提供了实现多对多的路由能力。
不仅如此,当使用 Router Key
时,由于系统代码中所配置的并非具体的队列名称,在操作变更时无需修改代码仅需调整 RabbitMQ
中的路由规则配置即可,从而降低代码维护成本。
3. 队列类型
在 RabbitMQ
中消息队列存在 Classic
与 Quorum
两种类型,二者区别如下:
(1) Classic Queue
Classic Queue (CQs)
是 RabbitMQ
最早的队列类型。
默认单节点存储,若开启镜像队列 (Mirrored Queue)
可复制到多个节点。依赖 Erlang Mnesia
数据库存储,在 RabbitMQ 3.8
以后已逐渐被标记为“旧模型”。
基于主从复制模式,通过一个 master
与多个 slave
管理。其中子节点非实时同步数据,虽提高了性能但当 master
节点宕机需要选举新的 master
,这期间可能丢失未同步的数据。
(2) Quorum Queue
Quorum Queue
是在 RabbitMQ 3.8
引入用于替代镜像队列。
基于 Raft
共识算法实现复制和高可用,每条消息会写入多数派节点 (quorum)
从而保证一致性。
在 Quorum
模型中实现了类似 Kafka
的分片与副本,虽有一定的性能损耗,但保证的一致性。同时随着版本迭代 Classic
已逐渐被启用,在队列的选择上更推荐 Quorum Queue
。
二、项目集成
1. 服务部署
接下来就让我们了解如何在 Spring
工程中集成,开始前先以 Docker
部署 RabbitMQ
测试服务。
创建对应的数据映射目录并拉取镜像,需注意一点拉取的镜像带 management
表示其内置了后台管理页面。
# 创建数据目录
mkdir -p /home/dockerdata/rabbitmq
# 拉取镜像
docker pull rabbitmq:management
完成镜像拉取之后就可以启动容器了,这里配置的默认账号为:ibudai/123456
,可根据偏好自行调整。
部署完成后应用服务运行在 5672
端口,默认自带的后台管理在 15672
端口。
docker run -id \
-p 15672:15672 \
-p 5672:5672 \
--name=rabbitmq \
-v /home/dockerdata/rabbitmq:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=ibudai \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:management
2. 集成配置
在 Spring
中提供了开箱即用的 rabbitmq
依赖,在工程 pom
中引入相应的 starter
依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
当然必不可少的需要在工程的 yaml
配置中添加 RabbitMQ
对应的服务连接信息。
以刚才部署的测试服务为例,对应的配置内容如下:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: ibudai
password: 123456
三、交换模型
在 RabbitMQ
的交换机中存在多种模型,不同模型有着自己的传输特点,下面分别进行介绍。
1. 精准匹配
DirectExchange
顾名思义即直接交换,路由键与消息队列之间一一对应。
下述示例中创建了 topic_logs
交换机并绑定了两个消息队列,代码如下:
@Configuration
public class RabbitConfig {
/**
* 定义 Direct Exchange
*/
@Bean
public TopicExchange directExchange() {
return new DirectExchange("topic_logs");
}
@Bean
public Queue infoQueue() {
return new Queue("queue.info");
}
@Bean
public Queue errorQueue() {
return new Queue("queue.error");
}
@Bean
public Binding bindingInfoQueue(Queue infoQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(infoQueue)
// 绑定交换机
.to(topicExchange)
// 绑定路由 Key
.with("log.info");
}
@Bean
public Binding bindingErrorQueue(Queue errorQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(errorQueue
// 绑定交换机
.to(topicExchange)
// 绑定路由 Key
.with("log.error");
}
}
2. 模糊匹配
在 TopicExchange
传输模式中,除了与 DirectExchange
的精准匹配外,支持根据路由键模糊匹配。
如定义了路由键: test.m1.1
、test.m1.2
与 test.m2.1
,在发送消息时将路由键配置为 test.m1.*
则可以同时发送消息至 test.m1.1
与 test.m1.2
。
@Configuration
public class RabbitConfig {
/**
* 定义 Topic Exchange
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic_logs");
}
// 略去其它,同 DirectExchange
}
3. 消息广播
广播队列 (FanoutExchange)
即发送消息时交换机绑定的所有队列都能收到消息。
通过 FanoutExchange
创建交换机实例,因为是全局广播不存在路由键因此无需通过 with()
绑定。
相应的代码示例如下:
@Configuration
public class RabbitConfig {
/**
* 定义 Fanout Exchange
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_logs");
}
@Bean
public Queue queue1() {
return new Queue("queue.fan1");
}
@Bean
public Queue queue2() {
return new Queue("queue.fan2");
}
@Bean
public Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
四、服务收发
1. 消息发送
在发送消息时,通过 RabbitTemplate
实例的 convertAndSend(exchange, routingKey, message)
方法,入参分别代表:交换机、路由 KEY
与需发送的消息。
传输的消息数据对象默认为字符串,对于复杂对象通常使用 Jackson
或 Gson
等类库序列化后传输。
在 FanoutExchange
传输模式下则不指定 routingKey
,表示发送消息至所有绑定的消息队列。
@Service
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage1() {
String message = "Hello World!";
// 精准匹配
rabbitTemplate.convertAndSend("topic_logs", "log.info", message);
}
public void sendMessage2() {
String message = "Hello World!";
// 模糊匹配
rabbitTemplate.convertAndSend("topic_logs", "log.*", message);
}
public void sendMessage3() {
String message = "Hello World!";
// 全局广播
rabbitTemplate.convertAndSend("fanout_logs", "", message);
}
}
2. 消息消费
对于数据的消费同样十分简单,通过 @RabbitListener
注解即可,其中 queues
参数即对应的消息队列名称。
@Service
public class TopicConsumer {
@RabbitListener(queues = "infoQueue")
public void receiveInfo(String message) {
System.out.println("Received INFO message: " + message);
}
@RabbitListener(queues = "errorQueue")
public void receiveError(String message) {
System.out.println("Received ERROR message: " + message);
}
}
五、多服务配置
1. 连接配置
在之前的示例中,我们是通过默认的配置连接服务收发消息,但在某些场景下系统可能会对接多个消息队列。
因此,在 Spring
中提供了自定义 RabbitTemplate
服务连接实例的能力。
如系统中存在两个 Rabbit
服务实例,连接配置如下:
rabbit:
primary:
host: host1
port: 5672
username: user1
password: pwd1
virtual-host: /
secondary:
host: host2
port: 5672
username: user2
password: pwd2
virtual-host: /
完成后通过 ConnectionFactory
创建连接工厂实例,其中虚拟主机 (VirtualHost)
是 RabbitMQ
提供的一种逻辑隔离机制,相当于一个命名空间,用来隔离不同的连接、交换机、队列、用户权限等。
定义 ConnectionFactory
后便可注入到 RabbitTemplate
实例之中。
@Configuration
public class RabbitConfig {
@Bean(name = "primaryConnectionFactory")
public ConnectionFactory primaryConnectionFactory(@Value("${rabbit.primary.host}") String host,
@Value("${rabbit.primary.port}") int port,
@Value("${rabbit.primary.username}") String username,
@Value("${rabbit.primary.password}") String password,
@Value("${rabbit.primary.virtual-host}") String virtualHost) {
CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
@Bean(name = "secondaryConnectionFactory")
public ConnectionFactory secondaryConnectionFactory(@Value("${rabbit.secondary.host}") String host,
@Value("${rabbit.secondary.port}") int port,
@Value("${rabbit.secondary.username}") String username,
@Value("${rabbit.secondary.password}") String password,
@Value("${rabbit.secondary.virtual-host}") String virtualHost) {
CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
@Bean(name = "primaryRabbitTemplate")
public RabbitTemplate primaryRabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean(name = "secondaryRabbitTemplate")
public RabbitTemplate secondaryRabbitTemplate(@Qualifier("secondaryConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
那么现在即可通过对应的 RabbitTemplate
实例发送消息至指定服务节点。
@Service
public class TopicProducer {
@Autowired
private RabbitTemplate primaryRabbitTemplate;
@Autowired
private RabbitTemplate secondaryRabbitTemplate;
public void send() {
primaryRabbitTemplate.convertAndSend("topic_logs", "log.info", "hello world");
secondaryRabbitTemplate.convertAndSend("topic_logs", "log.info", "hello world");
}
}
2. 消费实例
同样的,在创建了多个连接实例后相应的也需要创建对应的消息接收工厂。
在之前 @RabbitListener
注解接收消息示例中,通过 queues
指定消息队列。而在多连接实例下,则可通过 containerFactory
配置对应的服务实例。
完成的配置示例如下,其中 ConnectionFactory
即上述的连接服务实例。
@Configuration
public class RabbitConfig {
@Bean
public SimpleRabbitListenerContainerFactory primaryContainerFactory(ConnectionFactory primaryConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(primaryConnectionFactory);
// 手动 ack
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
其中 AcknowledgeMode
为接收消息之后的确认机制,可选项参考下表:
方法 | 作用 |
---|---|
AcknowledgeMode.AUTO | 消息自动确认。 |
AcknowledgeMode.MANUAL | 消息手动 ACK 确认。 |
AcknowledgeMode.NONE | 消息不确认,可重复消费。 |
在定义实例之后,即可通过 containerFactory
属性进行指定,完整代码如下:
@RabbitListener(queues = "infoQueue", containerFactory = "primaryContainerFactory")
public void receiveMessage(String msg, Channel channel, Message message) {
// 手动 ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}