一、基本概念
1. 事件驱动
Netty
是一款基于事件驱动的非阻塞网络请求框架,在开始前先了解一下什么是事件驱动。
事件驱动模型是一种常见的编程模型,在事件驱动模型中,系统的运行和行为主要由事件的触发和处理来驱动,而不是通过线程的阻塞和轮询来等待和处理请求。模型的核心思想是将任务的执行委托给事件驱动的机制,以实现异步、非阻塞的处理方式。
2. 名词解释
(1) 事件
事件 (Event)
是系统中发生的某种特定的动作或状态变化,它可以是用户输入、网络消息、定时器触发等。事件驱动模型中的所有行为都是由事件触发和驱动的。
(2) 事件处理器
事件处理器 (Event Handler)
是用于处理特定类型事件的组件或函数。当某个事件发生时,系统会将事件传递给相应的事件处理器来进行处理。事件处理器通常被注册到事件监听器或者事件分发器中。
(3) 事件监听器
事件监听器 (Event Listener)
用于监听和接收特定类型的事件,并将事件分发给相应的事件处理器进行处理。它负责注册和管理事件处理器,以确保事件被正确处理。
(4) 事件循环
事件循环 (Event Loop)
是事件驱动模型的核心组件之一,它负责监听和接收事件,并将事件分发给相应的事件处理器进行处理。事件循环通常采用非阻塞的方式来等待事件的到来,以实现高效的事件处理。
(5) 回调
回调 (Callback)
是一种常见的事件处理机制,它通过在事件发生时调用预先注册的回调函数来处理事件。当某个事件发生时,系统会调用相应的回调函数来执行特定的逻辑。
(6) 异步
事件驱动模型通常采用异步 (Asynchronous)
的方式来处理事件,即事件的处理过程是非阻塞的,不会阻塞当前线程。通过异步处理,系统能够更好地利用资源,提高系统的并发性能和吞吐量。
二、对象描述
1. Bootstrap
在 netty
中由 Bootstrap
与 ServerBootstrap
分别用于建立服务端与客户端应用。
2. EventLoop
EventLoop
在 netty
中负责监听和接收事件,并将事件分发给相应的事件处理器。当存在一个或多个 EventLoop
通过 EventLoopGroup
进行管理,类似于线程池于线程的关系。
3. ChannelFuture
在 netty
中通过事件的处理都是异步非阻塞,而 ChannelFuture
继承于 Java
中的 Future
类,返回异步事件处理结果。
4. ChannelHandler
当 EventLoop
接收到事件时将提交至 ChannelHandler
进行处理,在此实现具体的业务处理逻辑。
5. ChannelPipeline
类似于 EventLoopGroup
,当存在多个 ChannelHandler
时可将其连接为 ChannelPipeline
,事件的处理将按照 ChannelHandler
在 ChannelPipeline
中添加的顺序有序执行。
三、工程管理
1. 依赖引入
新增 Maven
工程并在 pom
文件中引入相应依赖。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.89.Final</version>
</dependency>
四、服务端应用
1. 事件处理
通过继承类 SimpleChannelInboundHandler
编写数据业务处理逻辑,EventLoop
会将数据提交至此。
当存在多个处理器时,需要在 channelRead0()
处理完成时通过 fireChannelRead()
方法将数据传递至一处理器,若只存在单个处理器则可忽略该行。
public class SimpleServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 处理客户端请求,并向客户端发送响应结果
*
* @param channelHandlerContext 请求上下文信息
* @param msg 接收的数据
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println("Server read(simple), data: " + msg);
// 向下一级传递
channelHandlerContext.fireChannelRead(msg);
}
}
SimpleChannelInboundHandler
继承于 ChannelInboundHandlerAdapter
可用于编写简单的事件处理,若需要管理连接建立与异常事件处理则需自定义继承 ChannelInboundHandlerAdapter
。
其中 channelActive()
在首次通道建立时触发,由于通道的复用性其仅会触发一次;channelRead()
用于处理相应的业务逻辑,若执行耗时任务可通过 ctx.channel().eventLoop().execute()
新启线程异步执行;exceptionCaught()
则在通道异常时触发。
public class ChannelServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Server active.");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server read(channel), data: " + msg.toString());
// 当处理耗时任务时可通过 eventLoop() 启动线程异步执行
// 同理通过 schedule() 启动一个延迟任务
ctx.channel().eventLoop().execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("Server read done.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 完成处理,响应客户端请求
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println("Server exception");
cause.printStackTrace();
}
}
2. 链路配置
通过继承 ChannelInitializer
将定义的一个或多个处理器构建添加至 ChannelPipeline
中。
在通道处理时将会按照此处 addLast()
的顺序执行,先添加的处理器处理优先级更高,可理解为一个队列逐级向后传递。
public class ChannelServerInitializer extends ChannelInitializer<SocketChannel> {
/**
* 传输通道数据编解码配置
*
* @param sc 网络连接通道
*/
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 获取 ChannelPipeline 对象,用于配置网络处理器
ChannelPipeline pipeline = sc.pipeline();
// 添加字符串编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加自定义的业务处理器
pipeline.addLast(new SimpleServerHandler());
pipeline.addLast(new ChannelServerHandler());
}
}
3. 事件监听
在 netty
中可针对每一步操作添加监听器,用于监听处理事件的成功与失败。
netty
提供了 lamdba
表达式与基础的类实例两种实现方法,下述为定义监听器类示例:
public class ServerChannelListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server lister, message: successful!");
} else {
System.err.println("Server lister, message: " + channelFuture.cause().getMessage());
}
}
}
4. 服务建立
完成上述步骤后即可启动服务端应用,通过 ServerBootstrap
创建服务端引导类,负责设置服务器的启动配置,将不同的组件组合起来以构建服务器端应用,可以配置服务器的一些参数,如绑定端口、设置事件处理器、指定连接处理流程等。
通过 group()
配置线程组,其中 parentGroup
负责接受客户端连接,childGroup
负责处理连接的 I/O
事件,childHandler()
用于配置事件处理器;channel()
用于配置通道模型,对于异步通讯通常使用 NioServerSocketChannel
对象,对应的还存在 OIO
但其在新版中已被标记弃用。
完整的服务端应用启动代码如下:
public class NettyServerTest {
private final String HOST = "127.0.0.1";
private final int PORT = 9090;
private ServerBootstrap bootstrap;
private EventLoopGroup parentGroup, childGroup;
@Before
public void init() {
// parentGroup: 监听客户端请求
parentGroup = new NioEventLoopGroup();
// childGroup: 处理每条连接的数据读写
childGroup = new NioEventLoopGroup();
// bootstrap: 用于创建和配置服务器,从而启动服务器入口
bootstrap = new ServerBootstrap()
// 配置线程组的角色
.group(parentGroup, childGroup)
// 配置服务端的 IO 模型, 这里取 NIO
.channel(NioServerSocketChannel.class)
// 配置每条连接的数据读写和业务逻辑
.childHandler(new SimpleServerInitializer());
}
@Test
public void serverSyncDemo() {
try {
// 绑定监听端口,Host 未指定时使用本地地址
ChannelFuture future = bootstrap.bind(HOST, PORT)
.addListener(new ServerChannelListener())
.sync();
System.out.println("Bind done!");
// 等待关闭线程将阻塞。
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
在 netty
中每一步操作通过 sync()
或 await()
表示其为同步或异步操作,同时将返回一个 ChannelFuture
对象。即当以 sync()
操作时若任务耗时主线程将陷入阻塞,反之 await()
为异步则不会阻塞线程。
在 netty
中通过 close()
关闭传输通道,而 closeFuture()
用于获取关闭的结果。在上述示例中并未执行 close()
操作,因此 closeFuture().sync()
将会阻塞线程。
五、客户端应用
1. 事件处理
同理,客户端此处创建了一个处理器用于接收处理服务端返回的数据。
public class SimpleClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 处理客户端请求,并向客户端发送响应结果
*
* @param channelHandlerContext 请求上下文信息
* @param msg 接收的数据
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println("Client read, data: " + msg);
}
}
2. 链路配置
ChannelPipeline
的配置与服务端类似,不再详细描述。
public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
/**
* 传输通道数据编解码配置
*
* @param sc 网络连接通道
*/
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 获取 ChannelPipeline 对象,用于配置网络处理器
ChannelPipeline pipeline = sc.pipeline();
// 添加字符串编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加自定义的业务处理器
pipeline.addLast(new SimpleClientHandler());
}
}
3. 传输数据
在 netty
中提供了 ByteBuf
用于承当数据容器,其对标 Java
中的 ByteBuffer
。
ByteBuf
提供了两种初始化方式:alloc().buffer()
与 alloc().directBuffer()
,分别对应申请的内存为堆内内存和堆外内存,其中堆外内存并不受 JVM
垃圾回收的管理,在使用时需要尤为注意。
ByteBuf byteBuf1 = channel.alloc().buffer();
ByteBuf byteBuf2 = channel.alloc().directBuffer();
byteBuf1.writeBytes("hello world".getBytes());
4. 服务建立
客户端的建立与服务端类似,通过 Bootstrap
对象构造,不同的是其 group()
只需配置单个 EventLoopGroup
。
public class NettyClientTest {
private final String HOST = "127.0.0.1";
private final int PORT = 9090;
private Bootstrap bootstrap;
private EventLoopGroup loopGroup;
@Before
public void init() {
loopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap()
// 指定线程模型
.group(loopGroup)
// 指定 IO 类型为 NIO
.channel(NioSocketChannel.class)
// 禁用 Nagle 算法,发送数据包时都会立即发送,不会等待数据包的合并发送。
.option(ChannelOption.TCP_NODELAY, true)
// IO 处理逻辑
.handler(new SimpleClientInitializer());
}
/**
* 客户端发送同步请求,程序阻塞
*/
@Test
public void clientSyncDemo() {
try {
// 建立同步连接通道
Channel channel = bootstrap.connect(HOST, PORT).sync().channel();
// 写入数据至 ByteBuf
for (int i = 0; i < 1; i++) {
String message = "The message of " + i;
ByteBuf byteBuf = channel.alloc().buffer();
byteBuf.writeBytes(message.getBytes());
// 发送数据至服务端
channel.writeAndFlush(byteBuf);
TimeUnit.SECONDS.sleep(1);
}
// 关闭与当前 Channel 关联的连接。
channel.close().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}