Java消息队列机制详解


一、基本介绍

1. 消息队列

在应用开发中,消息队列可谓无处不在,它作为消息的载体,承担着异步通信、流量削峰等作用。

以常见的服务响应为例,当存在大批量用户执行请求服务时,系统可能无法同时处理所有的请求,意味着请求将会陷入排队阻塞的情况,导致系统无法接受处理新的请求,从而引发系统的瘫痪。

而通过消息队列,即可巧妙的实现请求的异步处理,提高系统服务的可用性,同样为上述示例引入队列异步处理后其运行流程如下:

2. 事件模型

对于消息队列中间件,在大数据领域 Kafka 是当之无愧的老大哥,通过分布式的机制更是进一步提高的服务的高可用。因此,在多服务系统的场景下,Kafka 往往都扮演着重要的角色。

但对于单体的服务而言,在许多场景下 Kafka 由于需要额外引入中间件,其很难发挥其最大的优势。针对此类场景,本地消息队列服务则能取到更高的回报率,本文也将围绕本地消息队列服务展开介绍。

二、工具介绍

1. 依赖引入

Disruptor 是一个高性能队列本地消息类库,能够实现类似 Kafka 的消息传递,而无需依赖外部中间件。

Maven 工程中引入以下依赖即可导入依赖信息。

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>4.0.0</version>
</dependency>

需要注意 4.x 版本需要 JDK11 及以上版本,因此要在 pom.xml 中加入下述配置指定版本。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.11.0</version>
            <configuration>
                <source>11</source>
                <target>11</target>
            </configuration>
        </plugin>
    </plugins>
</build>

2. 事件处理

Disruptor 模型结构中同样包含三个重要概念:事件 (Event)、事件处理器 (EventHandler) 与事件发送者 (EventPublisher)。其中,Event 用于封装消息内容,EventHandler 类似于事件消费者,EventPublisher 类似于生产者用于发送事件消息。

新建 MessageEvent 用于封装传输的事件消息,这里通过泛型类用于指定多种类型消息。

public class MessageEvent<T> {

    private T t;

    public T getValue() {
        return t;
    }

    public void setValue(T t) {
        this.t = t;
    }
}

新建 MessageEventHandler 用于处理事件消息,当发送者发送事件消息时,则会执行处理器的 onEvent() 方法。

在下述示例中,当接受到事件消息后,则将事件内容体输出到控制台。

public class MessageEventHandler<T> implements EventHandler<MessageEvent<T>> {

    @Override
    public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) {
        System.out.println("Event receive: " + event.getValue());
    }
}

3. 事件发送

Disruptor 中由 RingBuffer 负责事件的传输,当绑定事件并启动后即可通过其获取 RingBuffer 对象,进而执行事件的发布动作。

下述为一个简单的事件发布示例,完整的介绍文档参考官网手册:Disruptor 使用手册

public class MessageTest {

    @Test
    public void demo1() {
        int bufferSize = 1024;
        DaemonThreadFactory factory = DaemonThreadFactory.INSTANCE;
        Disruptor<MessageEvent<String>> disruptor = new Disruptor<>(MessageEvent::new, bufferSize, factory);
        // Set event handler
        disruptor.handleEventsWith(new MessageEventHandler<>());
        disruptor.start();

        // publish event
        RingBuffer<MessageEvent<String>> ringBuffer = disruptor.getRingBuffer();
        ringBuffer.publishEvent((event, sequence) -> {
            event.setValue("hello");
        });
        // stop disruptor
        disruptor.shutdown();
    }
}

三、仿造实现

1. 功能剖析

了解了 Disruptor 的应用效果后,我们可以自己仿造一个简易的事件监听发布模型。

消息传输模型的核心在于两方面,事件的发布 (Publish) 和事件的监听 (Handle)。同时,为了实现消息的异步处理,在事件发布的具体实现中,需要通过线程池执行具体发布操作。

2. 事件监听

新建 Observer 接口类用于定义消息的监听器,同理将其设计为泛型类,其中 R 为消息类型。

public interface Observer<R> {

    void onServer(R t);
}

MessageObserver 则为具体的消息处理实现,作用效果等价于上述的 MessageEventHandler

public class MessageObserver<R> implements Observer<R> {

    @Override
    public void onServer(R t) {
        System.out.println("Observer data: " + t);
    }
}

3. 服务注册

新建 QueueListener 接口用于处理监听器的绑定和消息发送事件,由于后续涉及到线程池的管理,因此这里同时继承于 AutoCloseable 接口。

public interface QueueListener<T, R> extends AutoCloseable {

    void register(T t);

    boolean remove(T t);

    void publish(R r);
}

新建 MessageQueueListener 用于实现具体的监听器注册逻辑,通过 List 存储注册的监听器,需要注意初始化其通过 Collections.synchronizedList() 创建线程安全对象,防止多线程操作异常。

同时,通过 ExecutorService 线程池从而实现事件异步的发送处理,为了方便此时使用的线程池由 ExecutorService 初始化,实际应用时可替换为 ThreadPoolExecutor 实现更细粒话的控制。

public class MessageQueueListener<T extends Observer<R>, R>
        implements QueueListener<T, R> {

    private final ExecutorService executor;

    private final List<T> list;

    public MessageQueueListener(int poolSize) {
        executor = Executors.newFixedThreadPool(poolSize);
        list = Collections.synchronizedList(new ArrayList<>());
    }

    @Override
    public void register(T t) {
        list.add(t);
    }

    @Override
    public boolean remove(T t) {
        boolean success = false;
        boolean contains = list.contains(t);
        if (contains) {
            success = list.remove(t);
        }
        return success;
    }

    @Override
    public void publish(R r) {
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Queue is empty, register first");
        }

        for (T t : list) {
            executor.execute(() -> t.onServer(r));
        }
    }

    @Override
    public void close() throws Exception {
        try {
            executor.shutdown();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

4. 消息发布

完成上述的定义之后即可测试事件的发布效果,具体的测试代码如下:

public class MessageTest {

    @Test
    public void demo2() {
        Observer<String> observer1 = new MessageObserver<>();
        Observer<String> observer2 = new MessageObserver<>();

        int poolSize = 3;
        try (QueueListener<Observer<String>, String> listener = new MessageQueueListener<>(poolSize)) {
            // register handler
            listener.register(observer1);
            listener.register(observer2);
            // publish data
            listener.publish("hello");

            // remove handler
            listener.remove(observer2);
            // publish data
            listener.publish("hello");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录