一提到消息队列,想必你即便没真正上手实践过,也一定对其有所耳闻,在之前的博客中以 Kafka
为例深入解析了消息队列模型以及其所解决的一些痛点。
针对 Kafka
队列部分,感兴趣的小伙伴可以去考古,传送门放这了:
将视线收回到现在,不论是 Kafka
又或是今天的主角 RabbitMQ
,其核心都在消息队列上,基于二者的存储方式以及数据传输模型从而衍生出不同的特性以及适用场景。
以 Kafka
为例,其为队列数据提供了持久化功能,基于主题与消费组的形式拉取生产者所产生的数据,拥有极高的吞吐量广泛应用于大数据领域。而 RabbitMQ
的一大特点即将交换器与队列相结合,通过交换器可实现复杂的数据路由功能。
在数据传输中 RabbitMQ
虽然也提供了持久化,但默认还是以内存为主,当数据消费完成后即会被删除。同时与 Kafka
消费者主动拉取数据相比,RabbitMQ
则恰好相反由生产者主动推送数据,通过消息确认机制避免了数据丢失与重复消费。
下面通过项目集成实战介绍
RabbitMQ
的集成应用
一、基础介绍
1. 服务部署
RabbitMQ
需要单独的应用服务部署,在开始前先以 Docker
方式快速部署测试服务。
创建对应的数据映射目录并拉取镜像,这里注意一点拉取的镜像带 management
表示其内置了后台管理页面。
# 创建数据目录
mkdir -p /home/dockerdata/rabbitmq
# 拉取镜像
docker pull rabbitmq:management
完成镜像拉取之后就可以启动容器了,这里配置的默认账号为:ibudai/123456
,可根据偏好自行调整。
完成部署后应用服务运行在 5672
端口,默认自带的后台管理在 15672
端口。
docker run -id \
-p 15672:15672 \
-p 5672:5672 \
--name=rabbitmq \
-v /home/dockerdata/rabbitmq:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=ibudai \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:management
2. 核心概念
在 RabbitMQ
中包含了三个核心概念,下面分别进行讲解。
(1) 交换机
顾名思义交互机 (Exchange)
即消息的中转站,当生产者发送数据后,消息将先发送至交换机,再进行转发的对应的队列。
(2) 路由KEY
路由键 (Router Key)
则是交换机与消息队列的缓冲层,通过路由键决定一个消息应该发送至哪个队列。
通过路由键的存在实现交换机与队列之间的解耦,并且能够实现更复杂的消息转发机制。
(3) 消息队列
消息队列即最终消息的临时存储媒介,经过路由转发的消息将暂存于队列中,等待被消费者所消费。
二、项目集成
接下来就让我们开始正式介绍如何在 Spring
工程中集成使用。
1. 依赖导入
在 Spring
中提供了开箱即用的 rabbitmq
依赖,在工程 pom
中引入相应的 starter
依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 服务配置
当然必不可少的需要在工程的 yaml
配置中添加 RabbitMQ
对应的服务连接信息。
以刚才部署的测试服务为例,对应的配置内容如下:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: ibudai
password: 123456
三、交换模型
在 RabbitMQ
的交换机中存在多种模型,不同模型有着自己的传输特点,下面分别进行介绍。
1. 精准匹配
DirectExchange
顾名思义即直接交换,路由键与消息队列之间一一对应。
下述示例中创建了 topic_logs
交换机并绑定了两个消息队列,代码如下:
@Configuration
public class RabbitConfig {
/**
* 定义 Direct Exchange
*/
@Bean
public TopicExchange directExchange() {
return new DirectExchange("topic_logs");
}
@Bean
public Queue infoQueue() {
return new Queue("queue.info");
}
@Bean
public Queue errorQueue() {
return new Queue("queue.error");
}
@Bean
public Binding bindingInfoQueue(Queue infoQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(infoQueue)
// 绑定交换机
.to(topicExchange)
// 绑定路由 Key
.with("log.info");
}
@Bean
public Binding bindingErrorQueue(Queue errorQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(errorQueue
// 绑定交换机
.to(topicExchange)
// 绑定路由 Key
.with("log.error");
}
}
2. 模糊匹配
在 TopicExchange
传输模式中,除了与 DirectExchange
的精准匹配外,支持根据路由键模糊匹配。
如定义了路由键: test.m1.1
、test.m1.2
与 test.m2.1
,在发送消息时将路由键配置为 test.m1.*
则可以同时发送消息至 test.m1.1
与 test.m1.2
。
@Configuration
public class RabbitConfig {
/**
* 定义 Topic Exchange
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic_logs");
}
// 略去其它,同 DirectExchange
}
3. 消息广播
广播队列 (FanoutExchange)
即发送消息时交换机绑定的所有队列都能收到消息。
通过 FanoutExchange
创建交换机实例,因为是全局广播不存在路由键因此无需通过 with()
绑定。
相应的代码示例如下:
@Configuration
public class RabbitConfig {
/**
* 定义 Fanout Exchange
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_logs");
}
@Bean
public Queue queue1() {
return new Queue("queue.fan1");
}
@Bean
public Queue queue2() {
return new Queue("queue.fan2");
}
@Bean
public Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
四、服务收发
1. 消息发送
在发送消息时,通过 RabbitTemplate
实例的 convertAndSend(exchange, routingKey, message)
方法,入参分别代表:交换机、路由 KEY
与需发送的消息。
传输的消息数据对象默认为字符串,对于复杂对象通常使用 Jackson
或 Gson
等类库序列化后传输。
在 FanoutExchange
传输模式下则不指定 routingKey
,表示发送消息至所有绑定的消息队列。
@Service
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage1() {
String message = "Hello World!";
// 精准匹配
rabbitTemplate.convertAndSend("topic_logs", "log.info", message);
}
public void sendMessage2() {
String message = "Hello World!";
// 模糊匹配
rabbitTemplate.convertAndSend("topic_logs", "log.*", message);
}
public void sendMessage3() {
String message = "Hello World!";
// 全局广播
rabbitTemplate.convertAndSend("fanout_logs", "", message);
}
}
2. 消息消费
对于数据的消费同样十分简单,通过 @RabbitListener
注解即可,其中 queues
参数即对应的消息队列名称。
@Service
public class TopicConsumer {
@RabbitListener(queues = "infoQueue")
public void receiveInfo(String message) {
System.out.println("Received INFO message: " + message);
}
@RabbitListener(queues = "errorQueue")
public void receiveError(String message) {
System.out.println("Received ERROR message: " + message);
}
}
五、多服务配置
1. 连接配置
在之前的示例中,我们是通过默认的配置连接服务收发消息,但在某些场景下系统可能会对接多个消息队列。
因此,在 Spring
中提供了自定义 RabbitTemplate
服务连接实例的能力。
如系统中存在两个 Rabbit
服务实例,连接配置如下:
rabbit:
primary:
host: host1
port: 5672
username: user1
password: pwd1
virtual-host: /
secondary:
host: host2
port: 5672
username: user2
password: pwd2
virtual-host: /
完成后通过 ConnectionFactory
创建连接工厂实例,其中虚拟主机 (VirtualHost)
是 RabbitMQ
提供的一种逻辑隔离机制,相当于一个命名空间,用来隔离不同的连接、交换机、队列、用户权限等。
定义 ConnectionFactory
后便可注入到 RabbitTemplate
实例之中。
@Configuration
public class RabbitConfig {
@Bean(name = "primaryConnectionFactory")
public ConnectionFactory primaryConnectionFactory(@Value("${rabbit.primary.host}") String host,
@Value("${rabbit.primary.port}") int port,
@Value("${rabbit.primary.username}") String username,
@Value("${rabbit.primary.password}") String password,
@Value("${rabbit.primary.virtual-host}") String virtualHost) {
CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
@Bean(name = "secondaryConnectionFactory")
public ConnectionFactory secondaryConnectionFactory(@Value("${rabbit.secondary.host}") String host,
@Value("${rabbit.secondary.port}") int port,
@Value("${rabbit.secondary.username}") String username,
@Value("${rabbit.secondary.password}") String password,
@Value("${rabbit.secondary.virtual-host}") String virtualHost) {
CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
@Bean(name = "primaryRabbitTemplate")
public RabbitTemplate primaryRabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean(name = "secondaryRabbitTemplate")
public RabbitTemplate secondaryRabbitTemplate(@Qualifier("secondaryConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
那么现在即可通过对应的 RabbitTemplate
实例发送消息至指定服务节点。
@Service
public class TopicProducer {
@Autowired
private RabbitTemplate primaryRabbitTemplate;
@Autowired
private RabbitTemplate secondaryRabbitTemplate;
public void send() {
primaryRabbitTemplate.convertAndSend("topic_logs", "log.info", "hello world");
secondaryRabbitTemplate.convertAndSend("topic_logs", "log.info", "hello world");
}
}
2. 消费实例
同样的,在创建了多个连接实例后相应的也需要创建对应的消息接收工厂。
在之前 @RabbitListener
注解接收消息示例中,通过 queues
指定消息队列。而在多连接实例下,则可通过 containerFactory
配置对应的服务实例。
完成的配置示例如下,其中 ConnectionFactory
即上述的连接服务实例。
@Configuration
public class RabbitConfig {
@Bean
public SimpleRabbitListenerContainerFactory primaryContainerFactory(ConnectionFactory primaryConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(primaryConnectionFactory);
// 手动 ack
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
其中 AcknowledgeMode
为接收消息之后的确认机制,可选项参考下表:
方法 | 作用 |
---|---|
AcknowledgeMode.AUTO | 消息自动确认。 |
AcknowledgeMode.MANUAL | 消息手动 ACK 确认。 |
AcknowledgeMode.NONE | 消息不确认,可重复消费。 |
在定义实例之后,即可通过 containerFactory
属性进行指定,完整代码如下:
@RabbitListener(queues = "infoQueue", containerFactory = "primaryContainerFactory")
public void receiveMessage(String msg, Channel channel, Message message) {
// 手动 ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}