RabbitMQ消息队列介绍


一提到消息队列,想必你即便没真正上手实践过,也一定对其有所耳闻,在之前的博客中以 Kafka 为例深入解析了消息队列模型以及其所解决的一些痛点。

针对 Kafka 队列部分,感兴趣的小伙伴可以去考古,传送门放这了:

  1. Kafka消息队列详解
  2. Kafka发布订阅实战

将视线收回到现在,不论是 Kafka 又或是今天的主角 RabbitMQ,其核心都在我们熟悉的名词消息队列上。二者自身的数据存储方式以及数据传输模型从而衍生出不同的特性以及适用场景。

Kafka 为例,其为队列数据提供了持久化功能,基于主题与消费组的形式拉取生产者所产生的数据,拥有极高的吞吐量广泛应用于大数据领域。

RabbitMQ 的一大特点即将交换器与队列相结合,通过交换器可实现复杂的数据路由功能。在数据传输中 RabbitMQ 虽然也提供了持久化,但默认还是以内存为主,当数据消费完成后即会被删除。同时与 Kafka 消费者主动拉取数据相比,RabbitMQ 则恰好相反由生产者主动推送数据,通过消息确认机制避免了数据丢失与重复消费。基于上述描述,RabbitMQ 凭借低延时的特性被广泛应用于系统或微服务间的数据通讯等方面。

下面通过项目集成实战介绍 RabbitMQ 的实现效果

一、服务部署

Kafka 类似,RabbitMQ 同样需要进行单独的应用服务部署,下面以 Docker 方式快速部署测试服务。

1. 容器创建

首先创建对应的数据映射目录并拉取镜像,这里注意一点拉取的镜像带 management 表示其内置了后台管理页面。

# 创建数据目录
mkdir -p /home/dockerdata/rabbitmq

# 拉取镜像
docker pull rabbitmq:management

完成镜像拉取之后就可以启动容器了,这里配置的默认账号为:ibudai/123456,可根据偏好自行调整。

docker run -id \
-p 15672:15672 \
-p 5672:5672 \
--name=rabbitmq \
-v /home/dockerdata/rabbitmq:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=ibudai \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:management

二、项目集成

1. 依赖导入

Spring 中提供了开箱即用的 rabbitmq 依赖,在工程 pom 文件引入下述依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 服务配置

在工程的 yaml 配置文件 RabbitMQ 对应的连接信息:

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: ibudai
        password: 123456

三、分流队列

1. 基本介绍

Topic Exchange 类型中,我们可以根据路由键将数据发送至不同的队列,不同队列之间相互独立。

如下图所示,当发送消息时经过路由只会发送到对应的消费者,而不是全局广播。

在下述配置中,创建了一个名 topic_logsTopic,并在此 Topic 依次绑定了两个队列。

@Configuration
public class RabbitMQTopicConfig {

    /**
     * 定义 Topic Exchange
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic_logs");
    }

    /**
     * 定义两个队列,分别处理 info 和 error 级别的日志
     */
    @Bean
    public Queue infoQueue() {
        return new Queue("infoQueue");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("errorQueue");
    }

    /**
     * 绑定队列到 Topic Exchange,使用 routing key 匹配 info 级别
     */
    @Bean
    public Binding bindingInfoQueue(Queue infoQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(infoQueue).to(topicExchange).with("log.info");
    }

    /**
     * 绑定队列到 Topic Exchange,使用 routing key 匹配 error 级别
     */
    @Bean
    public Binding bindingErrorQueue(Queue errorQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(errorQueue).to(topicExchange).with("log.error");
    }
}

2. 消息发送

在发送消息时,通过 RabbitTemplate 实例的 convertAndSend() 方法,传输的数据对象为字符串,对于复杂对象通常使用 JacksonGson 等类库序列化后传输。

通过路由键即可执行数据流向,其中 routingKey 取值即上述定义的 infoQueueerrorQueue

@Service
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String routingKey, String message) {
        // 发送消息
        rabbitTemplate.convertAndSend("topic_logs", routingKey, message);
    }
}

3. 消息消费

对于数据的消费同样十分简单,通过 @RabbitListener 注解即可,其中 queues 参数对应上述的 routingKey

@Service
public class TopicConsumer {

    @RabbitListener(queues = "infoQueue")
    public void receiveInfo(String message) {
        System.out.println("Received INFO message: " + message);
    }

    @RabbitListener(queues = "errorQueue")
    public void receiveError(String message) {
        System.out.println("Received ERROR message: " + message);
    }
}

四、广播队列

1. 基本介绍

广播队列默认即最常见的队列模型,当发送消息时所注册的多个队列都能同时收到消息。

同理,通过 FanoutExchange 实例用于绑定队列,代码示例如下:

@Configuration
public class RabbitMQTopicConfig {

    /**
     * 定义 Fanout Exchange
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_logs");
    }

    @Bean
    public Queue queue1() {
        return new Queue("queue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("queue2");
    }

    /**
     * 绑定两个队列到 Fanout Exchange
     */
    @Bean
    public Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
   
    @Bean
    public Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

2. 消息发送

同样的,通过 convertAndSend() 执行消息的广播发送。

@Service
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("fanout_logs", "", message);
    }
}

3. 消息消费

这里消费代码逻辑类似,就不再展开描述。

@Service
public class TopicConsumer {

    @RabbitListener(queues = "queue1")
    public void receive1(String message) {
        System.out.println("Received from queue1: " + message);
    }

    @RabbitListener(queues = "queue2")
    public void receive2(String message) {
        System.out.println("Received from queue2: " + message);
    }
}

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录