RabbitMQ消息队列入门


一提到消息队列,想必你即便没真正上手实践过,也一定对其有所耳闻,在之前的博客中以 Kafka 为例深入解析了消息队列模型以及其所解决的一些痛点。

针对 Kafka 队列部分,感兴趣的小伙伴可以去考古,传送门放这了:

将视线收回到现在,不论是 Kafka 又或是今天的主角 RabbitMQ,其核心都在消息队列上,基于二者的存储方式以及数据传输模型从而衍生出不同的特性以及适用场景。

Kafka 为例,其为队列数据提供了持久化功能,基于主题与消费组的形式拉取生产者所产生的数据,拥有极高的吞吐量广泛应用于大数据领域。而 RabbitMQ 的一大特点即将交换器与队列相结合,通过交换器可实现复杂的数据路由功能。

在数据传输中 RabbitMQ 虽然也提供了持久化,但默认还是以内存为主,当数据消费完成后即会被删除。同时与 Kafka 消费者主动拉取数据相比,RabbitMQ 则恰好相反由生产者主动推送数据,通过消息确认机制避免了数据丢失与重复消费。

下面通过项目集成实战介绍 RabbitMQ 的集成应用

一、基础介绍

1. 服务部署

RabbitMQ 需要单独的应用服务部署,在开始前先以 Docker 方式快速部署测试服务。

创建对应的数据映射目录并拉取镜像,这里注意一点拉取的镜像带 management 表示其内置了后台管理页面。

# 创建数据目录
mkdir -p /home/dockerdata/rabbitmq

# 拉取镜像
docker pull rabbitmq:management

完成镜像拉取之后就可以启动容器了,这里配置的默认账号为:ibudai/123456,可根据偏好自行调整。

完成部署后应用服务运行在 5672 端口,默认自带的后台管理在 15672 端口。

docker run -id \
-p 15672:15672 \
-p 5672:5672 \
--name=rabbitmq \
-v /home/dockerdata/rabbitmq:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=ibudai \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:management

2. 核心概念

RabbitMQ 中包含了三个核心概念,下面分别进行讲解。

(1) 交换机

顾名思义交互机 (Exchange) 即消息的中转站,当生产者发送数据后,消息将先发送至交换机,再进行转发的对应的队列。

(2) 路由KEY

路由键 (Router Key) 则是交换机与消息队列的缓冲层,通过路由键决定一个消息应该发送至哪个队列。

通过路由键的存在实现交换机与队列之间的解耦,并且能够实现更复杂的消息转发机制。

(3) 消息队列

消息队列即最终消息的临时存储媒介,经过路由转发的消息将暂存于队列中,等待被消费者所消费。

二、项目集成

接下来就让我们开始正式介绍如何在 Spring 工程中集成使用。

1. 依赖导入

Spring 中提供了开箱即用的 rabbitmq 依赖,在工程 pom 中引入相应的 starter 依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 服务配置

当然必不可少的需要在工程的 yaml 配置中添加 RabbitMQ 对应的服务连接信息。

以刚才部署的测试服务为例,对应的配置内容如下:

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: ibudai
        password: 123456

三、交换模型

RabbitMQ 的交换机中存在多种模型,不同模型有着自己的传输特点,下面分别进行介绍。

1. 精准匹配

DirectExchange 顾名思义即直接交换,路由键与消息队列之间一一对应。

下述示例中创建了 topic_logs 交换机并绑定了两个消息队列,代码如下:

@Configuration
public class RabbitConfig {

    /**
     * 定义 Direct Exchange
     */
    @Bean
    public TopicExchange directExchange() {
        return new DirectExchange("topic_logs");
    }

    @Bean
    public Queue infoQueue() {
        return new Queue("queue.info");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("queue.error");
    }

    @Bean
    public Binding bindingInfoQueue(Queue infoQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(infoQueue)
                // 绑定交换机
                .to(topicExchange)
                // 绑定路由 Key
                .with("log.info");
    }

    @Bean
    public Binding bindingErrorQueue(Queue errorQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(errorQueue
                // 绑定交换机
                .to(topicExchange)
                // 绑定路由 Key
                .with("log.error");
    }
}

2. 模糊匹配

TopicExchange 传输模式中,除了与 DirectExchange 的精准匹配外,支持根据路由键模糊匹配。

如定义了路由键: test.m1.1test.m1.2test.m2.1,在发送消息时将路由键配置为 test.m1.* 则可以同时发送消息至 test.m1.1test.m1.2

@Configuration
public class RabbitConfig {

    /**
     * 定义 Topic Exchange
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic_logs");
    }

    // 略去其它,同 DirectExchange
}

3. 消息广播

广播队列 (FanoutExchange) 即发送消息时交换机绑定的所有队列都能收到消息。

通过 FanoutExchange 创建交换机实例,因为是全局广播不存在路由键因此无需通过 with() 绑定。

相应的代码示例如下:

@Configuration
public class RabbitConfig {

    /**
     * 定义 Fanout Exchange
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_logs");
    }

    @Bean
    public Queue queue1() {
        return new Queue("queue.fan1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("queue.fan2");
    }

    @Bean
    public Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
   
    @Bean
    public Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

四、服务收发

1. 消息发送

在发送消息时,通过 RabbitTemplate 实例的 convertAndSend(exchange, routingKey, message) 方法,入参分别代表:交换机、路由 KEY 与需发送的消息。

传输的消息数据对象默认为字符串,对于复杂对象通常使用 JacksonGson 等类库序列化后传输。

FanoutExchange 传输模式下则不指定 routingKey,表示发送消息至所有绑定的消息队列。

@Service
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage1() {
        String message = "Hello World!";
        // 精准匹配
        rabbitTemplate.convertAndSend("topic_logs", "log.info", message);
    }

    public void sendMessage2() {
        String message = "Hello World!";
        // 模糊匹配
        rabbitTemplate.convertAndSend("topic_logs", "log.*", message);
    }

    public void sendMessage3() {
        String message = "Hello World!";
        // 全局广播
        rabbitTemplate.convertAndSend("fanout_logs", "", message);
    }
}

2. 消息消费

对于数据的消费同样十分简单,通过 @RabbitListener 注解即可,其中 queues 参数即对应的消息队列名称。

@Service
public class TopicConsumer {

    @RabbitListener(queues = "infoQueue")
    public void receiveInfo(String message) {
        System.out.println("Received INFO message: " + message);
    }

    @RabbitListener(queues = "errorQueue")
    public void receiveError(String message) {
        System.out.println("Received ERROR message: " + message);
    }
}

五、多服务配置

1. 连接配置

在之前的示例中,我们是通过默认的配置连接服务收发消息,但在某些场景下系统可能会对接多个消息队列。

因此,在 Spring 中提供了自定义 RabbitTemplate 服务连接实例的能力。

如系统中存在两个 Rabbit 服务实例,连接配置如下:

rabbit:
  primary:
    host: host1
    port: 5672
    username: user1
    password: pwd1
    virtual-host: /
  secondary:
    host: host2
    port: 5672
    username: user2
    password: pwd2
    virtual-host: /

完成后通过 ConnectionFactory 创建连接工厂实例,其中虚拟主机 (VirtualHost)RabbitMQ 提供的一种逻辑隔离机制,相当于一个命名空间,用来隔离不同的连接、交换机、队列、用户权限等。

定义 ConnectionFactory 后便可注入到 RabbitTemplate 实例之中。

@Configuration
public class RabbitConfig {

    @Bean(name = "primaryConnectionFactory")
    public ConnectionFactory primaryConnectionFactory(@Value("${rabbit.primary.host}") String host,
                                                      @Value("${rabbit.primary.port}") int port,
                                                      @Value("${rabbit.primary.username}") String username,
                                                      @Value("${rabbit.primary.password}") String password,
                                                      @Value("${rabbit.primary.virtual-host}") String virtualHost) {
        CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }

    @Bean(name = "secondaryConnectionFactory")
    public ConnectionFactory secondaryConnectionFactory(@Value("${rabbit.secondary.host}") String host,
                                                        @Value("${rabbit.secondary.port}") int port,
                                                        @Value("${rabbit.secondary.username}") String username,
                                                        @Value("${rabbit.secondary.password}") String password,
                                                        @Value("${rabbit.secondary.virtual-host}") String virtualHost) {
        CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }

    @Bean(name = "primaryRabbitTemplate")
    public RabbitTemplate primaryRabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean(name = "secondaryRabbitTemplate")
    public RabbitTemplate secondaryRabbitTemplate(@Qualifier("secondaryConnectionFactory") ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

那么现在即可通过对应的 RabbitTemplate 实例发送消息至指定服务节点。

@Service
public class TopicProducer {

    @Autowired
    private RabbitTemplate primaryRabbitTemplate;

    @Autowired
    private RabbitTemplate secondaryRabbitTemplate;


    public void send() {
        primaryRabbitTemplate.convertAndSend("topic_logs", "log.info", "hello world");

        secondaryRabbitTemplate.convertAndSend("topic_logs", "log.info", "hello world");
    }
}

2. 消费实例

同样的,在创建了多个连接实例后相应的也需要创建对应的消息接收工厂。

在之前 @RabbitListener 注解接收消息示例中,通过 queues 指定消息队列。而在多连接实例下,则可通过 containerFactory 配置对应的服务实例。

完成的配置示例如下,其中 ConnectionFactory 即上述的连接服务实例。

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory primaryContainerFactory(ConnectionFactory primaryConnectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(primaryConnectionFactory);
        // 手动 ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
        return factory;
    }
}

其中 AcknowledgeMode 为接收消息之后的确认机制,可选项参考下表:

方法 作用
AcknowledgeMode.AUTO 消息自动确认。
AcknowledgeMode.MANUAL 消息手动 ACK 确认。
AcknowledgeMode.NONE 消息不确认,可重复消费。

在定义实例之后,即可通过 containerFactory 属性进行指定,完整代码如下:

@RabbitListener(queues = "infoQueue", containerFactory = "primaryContainerFactory")
public void receiveMessage(String msg, Channel channel, Message message) {
    // 手动 ack
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录