一提到消息队列,想必你即便没真正上手实践过,也一定对其有所耳闻,在之前的博客中以 Kafka
为例深入解析了消息队列模型以及其所解决的一些痛点。
针对 Kafka
队列部分,感兴趣的小伙伴可以去考古,传送门放这了:
将视线收回到现在,不论是 Kafka
又或是今天的主角 RabbitMQ
,其核心都在我们熟悉的名词消息队列上。二者自身的数据存储方式以及数据传输模型从而衍生出不同的特性以及适用场景。
以 Kafka
为例,其为队列数据提供了持久化功能,基于主题与消费组的形式拉取生产者所产生的数据,拥有极高的吞吐量广泛应用于大数据领域。
而 RabbitMQ
的一大特点即将交换器与队列相结合,通过交换器可实现复杂的数据路由功能。在数据传输中 RabbitMQ
虽然也提供了持久化,但默认还是以内存为主,当数据消费完成后即会被删除。同时与 Kafka
消费者主动拉取数据相比,RabbitMQ
则恰好相反由生产者主动推送数据,通过消息确认机制避免了数据丢失与重复消费。基于上述描述,RabbitMQ
凭借低延时的特性被广泛应用于系统或微服务间的数据通讯等方面。
下面通过项目集成实战介绍
RabbitMQ
的实现效果
一、服务部署
与 Kafka
类似,RabbitMQ
同样需要进行单独的应用服务部署,下面以 Docker
方式快速部署测试服务。
1. 容器创建
首先创建对应的数据映射目录并拉取镜像,这里注意一点拉取的镜像带 management
表示其内置了后台管理页面。
# 创建数据目录
mkdir -p /home/dockerdata/rabbitmq
# 拉取镜像
docker pull rabbitmq:management
完成镜像拉取之后就可以启动容器了,这里配置的默认账号为:ibudai/123456
,可根据偏好自行调整。
docker run -id \
-p 15672:15672 \
-p 5672:5672 \
--name=rabbitmq \
-v /home/dockerdata/rabbitmq:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=ibudai \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:management
二、项目集成
1. 依赖导入
在 Spring
中提供了开箱即用的 rabbitmq
依赖,在工程 pom
文件引入下述依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 服务配置
在工程的 yaml
配置文件 RabbitMQ
对应的连接信息:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: ibudai
password: 123456
三、分流队列
1. 基本介绍
在 Topic Exchange
类型中,我们可以根据路由键将数据发送至不同的队列,不同队列之间相互独立。
如下图所示,当发送消息时经过路由只会发送到对应的消费者,而不是全局广播。
在下述配置中,创建了一个名 topic_logs
的 Topic
,并在此 Topic
依次绑定了两个队列。
@Configuration
public class RabbitMQTopicConfig {
/**
* 定义 Topic Exchange
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic_logs");
}
/**
* 定义两个队列,分别处理 info 和 error 级别的日志
*/
@Bean
public Queue infoQueue() {
return new Queue("infoQueue");
}
@Bean
public Queue errorQueue() {
return new Queue("errorQueue");
}
/**
* 绑定队列到 Topic Exchange,使用 routing key 匹配 info 级别
*/
@Bean
public Binding bindingInfoQueue(Queue infoQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(infoQueue).to(topicExchange).with("log.info");
}
/**
* 绑定队列到 Topic Exchange,使用 routing key 匹配 error 级别
*/
@Bean
public Binding bindingErrorQueue(Queue errorQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(errorQueue).to(topicExchange).with("log.error");
}
}
2. 消息发送
在发送消息时,通过 RabbitTemplate
实例的 convertAndSend()
方法,传输的数据对象为字符串,对于复杂对象通常使用 Jackson
或 Gson
等类库序列化后传输。
通过路由键即可执行数据流向,其中 routingKey
取值即上述定义的 infoQueue
与 errorQueue
。
@Service
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String routingKey, String message) {
// 发送消息
rabbitTemplate.convertAndSend("topic_logs", routingKey, message);
}
}
3. 消息消费
对于数据的消费同样十分简单,通过 @RabbitListener
注解即可,其中 queues
参数对应上述的 routingKey
。
@Service
public class TopicConsumer {
@RabbitListener(queues = "infoQueue")
public void receiveInfo(String message) {
System.out.println("Received INFO message: " + message);
}
@RabbitListener(queues = "errorQueue")
public void receiveError(String message) {
System.out.println("Received ERROR message: " + message);
}
}
四、广播队列
1. 基本介绍
广播队列默认即最常见的队列模型,当发送消息时所注册的多个队列都能同时收到消息。
同理,通过 FanoutExchange
实例用于绑定队列,代码示例如下:
@Configuration
public class RabbitMQTopicConfig {
/**
* 定义 Fanout Exchange
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_logs");
}
@Bean
public Queue queue1() {
return new Queue("queue1");
}
@Bean
public Queue queue2() {
return new Queue("queue2");
}
/**
* 绑定两个队列到 Fanout Exchange
*/
@Bean
public Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
2. 消息发送
同样的,通过 convertAndSend()
执行消息的广播发送。
@Service
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("fanout_logs", "", message);
}
}
3. 消息消费
这里消费代码逻辑类似,就不再展开描述。
@Service
public class TopicConsumer {
@RabbitListener(queues = "queue1")
public void receive1(String message) {
System.out.println("Received from queue1: " + message);
}
@RabbitListener(queues = "queue2")
public void receive2(String message) {
System.out.println("Received from queue2: " + message);
}
}