一、基本介绍
1. 消息队列
在应用开发中,消息队列可谓无处不在,它作为消息的载体,承担着异步通信、流量削峰等作用。
以常见的服务响应为例,当存在大批量用户执行请求服务时,系统可能无法同时处理所有的请求,意味着请求将会陷入排队阻塞的情况,导致系统无法接受处理新的请求,从而引发系统的瘫痪。
而通过消息队列,即可巧妙的实现请求的异步处理,提高系统服务的可用性,同样为上述示例引入队列异步处理后其运行流程如下:
2. 事件模型
对于消息队列中间件,在大数据领域 Kafka
是当之无愧的老大哥,通过分布式的机制更是进一步提高的服务的高可用。因此,在多服务系统的场景下,Kafka
往往都扮演着重要的角色。
但对于单体的服务而言,在许多场景下 Kafka
由于需要额外引入中间件,其很难发挥其最大的优势。针对此类场景,本地消息队列服务则能取到更高的回报率,本文也将围绕本地消息队列服务展开介绍。
二、工具介绍
1. 依赖引入
Disruptor
是一个高性能队列本地消息类库,能够实现类似 Kafka
的消息传递,而无需依赖外部中间件。
在 Maven
工程中引入以下依赖即可导入依赖信息。
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0</version>
</dependency>
需要注意 4.x
版本需要 JDK11
及以上版本,因此要在 pom.xml
中加入下述配置指定版本。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
2. 事件处理
Disruptor
模型结构中同样包含三个重要概念:事件 (Event)
、事件处理器 (EventHandler)
与事件发送者 (EventPublisher)
。其中,Event
用于封装消息内容,EventHandler
类似于事件消费者,EventPublisher
类似于生产者用于发送事件消息。
新建 MessageEvent
用于封装传输的事件消息,这里通过泛型类用于指定多种类型消息。
public class MessageEvent<T> {
private T t;
public T getValue() {
return t;
}
public void setValue(T t) {
this.t = t;
}
}
新建 MessageEventHandler
用于处理事件消息,当发送者发送事件消息时,则会执行处理器的 onEvent()
方法。
在下述示例中,当接受到事件消息后,则将事件内容体输出到控制台。
public class MessageEventHandler<T> implements EventHandler<MessageEvent<T>> {
@Override
public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event receive: " + event.getValue());
}
}
3. 事件发送
在 Disruptor
中由 RingBuffer
负责事件的传输,当绑定事件并启动后即可通过其获取 RingBuffer
对象,进而执行事件的发布动作。
下述为一个简单的事件发布示例,完整的介绍文档参考官网手册:Disruptor 使用手册。
public class MessageTest {
@Test
public void demo1() {
int bufferSize = 1024;
DaemonThreadFactory factory = DaemonThreadFactory.INSTANCE;
Disruptor<MessageEvent<String>> disruptor = new Disruptor<>(MessageEvent::new, bufferSize, factory);
// Set event handler
disruptor.handleEventsWith(new MessageEventHandler<>());
disruptor.start();
// publish event
RingBuffer<MessageEvent<String>> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent((event, sequence) -> {
event.setValue("hello");
});
// stop disruptor
disruptor.shutdown();
}
}
三、仿造实现
1. 功能剖析
了解了 Disruptor
的应用效果后,我们可以自己仿造一个简易的事件监听发布模型。
消息传输模型的核心在于两方面,事件的发布 (Publish)
和事件的监听 (Handle)
。同时,为了实现消息的异步处理,在事件发布的具体实现中,需要通过线程池执行具体发布操作。
2. 事件监听
新建 Observer
接口类用于定义消息的监听器,同理将其设计为泛型类,其中 R
为消息类型。
public interface Observer<R> {
void onServer(R t);
}
MessageObserver
则为具体的消息处理实现,作用效果等价于上述的 MessageEventHandler
。
public class MessageObserver<R> implements Observer<R> {
@Override
public void onServer(R t) {
System.out.println("Observer data: " + t);
}
}
3. 服务注册
新建 QueueListener
接口用于处理监听器的绑定和消息发送事件,由于后续涉及到线程池的管理,因此这里同时继承于 AutoCloseable
接口。
public interface QueueListener<T, R> extends AutoCloseable {
void register(T t);
boolean remove(T t);
void publish(R r);
}
新建 MessageQueueListener
用于实现具体的监听器注册逻辑,通过 List
存储注册的监听器,需要注意初始化其通过 Collections.synchronizedList()
创建线程安全对象,防止多线程操作异常。
同时,通过 ExecutorService
线程池从而实现事件异步的发送处理,为了方便此时使用的线程池由 ExecutorService
初始化,实际应用时可替换为 ThreadPoolExecutor
实现更细粒话的控制。
public class MessageQueueListener<T extends Observer<R>, R>
implements QueueListener<T, R> {
private final ExecutorService executor;
private final List<T> list;
public MessageQueueListener(int poolSize) {
executor = Executors.newFixedThreadPool(poolSize);
list = Collections.synchronizedList(new ArrayList<>());
}
@Override
public void register(T t) {
list.add(t);
}
@Override
public boolean remove(T t) {
boolean success = false;
boolean contains = list.contains(t);
if (contains) {
success = list.remove(t);
}
return success;
}
@Override
public void publish(R r) {
if (list.isEmpty()) {
throw new IllegalArgumentException("Queue is empty, register first");
}
for (T t : list) {
executor.execute(() -> t.onServer(r));
}
}
@Override
public void close() throws Exception {
try {
executor.shutdown();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
4. 消息发布
完成上述的定义之后即可测试事件的发布效果,具体的测试代码如下:
public class MessageTest {
@Test
public void demo2() {
Observer<String> observer1 = new MessageObserver<>();
Observer<String> observer2 = new MessageObserver<>();
int poolSize = 3;
try (QueueListener<Observer<String>, String> listener = new MessageQueueListener<>(poolSize)) {
// register handler
listener.register(observer1);
listener.register(observer2);
// publish data
listener.publish("hello");
// remove handler
listener.remove(observer2);
// publish data
listener.publish("hello");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}