Netty框架入门教程


一、基本概念

1. 事件驱动

Netty 是一款基于事件驱动的非阻塞网络请求框架,在开始前先了解一下什么是事件驱动。

事件驱动模型是一种常见的编程模型,在事件驱动模型中,系统的运行和行为主要由事件的触发和处理来驱动,而不是通过线程的阻塞和轮询来等待和处理请求。模型的核心思想是将任务的执行委托给事件驱动的机制,以实现异步、非阻塞的处理方式。

2. 名词解释

(1) 事件

事件 (Event) 是系统中发生的某种特定的动作或状态变化,它可以是用户输入、网络消息、定时器触发等。事件驱动模型中的所有行为都是由事件触发和驱动的。

(2) 事件处理器

事件处理器 (Event Handler) 是用于处理特定类型事件的组件或函数。当某个事件发生时,系统会将事件传递给相应的事件处理器来进行处理。事件处理器通常被注册到事件监听器或者事件分发器中。

(3) 事件监听器

事件监听器 (Event Listener) 用于监听和接收特定类型的事件,并将事件分发给相应的事件处理器进行处理。它负责注册和管理事件处理器,以确保事件被正确处理。

(4) 事件循环

事件循环 (Event Loop) 是事件驱动模型的核心组件之一,它负责监听和接收事件,并将事件分发给相应的事件处理器进行处理。事件循环通常采用非阻塞的方式来等待事件的到来,以实现高效的事件处理。

(5) 回调

回调 (Callback) 是一种常见的事件处理机制,它通过在事件发生时调用预先注册的回调函数来处理事件。当某个事件发生时,系统会调用相应的回调函数来执行特定的逻辑。

(6) 异步

事件驱动模型通常采用异步 (Asynchronous) 的方式来处理事件,即事件的处理过程是非阻塞的,不会阻塞当前线程。通过异步处理,系统能够更好地利用资源,提高系统的并发性能和吞吐量。

二、对象描述

1. Bootstrap

netty 中由 BootstrapServerBootstrap 分别用于建立服务端与客户端应用。

2. EventLoop

EventLoopnetty 中负责监听和接收事件,并将事件分发给相应的事件处理器。当存在一个或多个 EventLoop 通过 EventLoopGroup 进行管理,类似于线程池于线程的关系。

3. ChannelFuture

netty 中通过事件的处理都是异步非阻塞,而 ChannelFuture 继承于 Java 中的 Future 类,返回异步事件处理结果。

4. ChannelHandler

EventLoop 接收到事件时将提交至 ChannelHandler 进行处理,在此实现具体的业务处理逻辑。

5. ChannelPipeline

类似于 EventLoopGroup,当存在多个 ChannelHandler 时可将其连接为 ChannelPipeline,事件的处理将按照 ChannelHandlerChannelPipeline 中添加的顺序有序执行。

三、工程管理

1. 依赖引入

新增 Maven 工程并在 pom 文件中引入相应依赖。

 <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.89.Final</version>
</dependency>

四、服务端应用

1. 事件处理

通过继承类 SimpleChannelInboundHandler 编写数据业务处理逻辑,EventLoop 会将数据提交至此。

当存在多个处理器时,需要在 channelRead0() 处理完成时通过 fireChannelRead() 方法将数据传递至一处理器,若只存在单个处理器则可忽略该行。

public class SimpleServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 处理客户端请求,并向客户端发送响应结果
     *
     * @param channelHandlerContext 请求上下文信息
     * @param msg                   接收的数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println("Server read(simple), data: " + msg);
        // 向下一级传递
        channelHandlerContext.fireChannelRead(msg);
    }
}

SimpleChannelInboundHandler 继承于 ChannelInboundHandlerAdapter 可用于编写简单的事件处理,若需要管理连接建立与异常事件处理则需自定义继承 ChannelInboundHandlerAdapter

其中 channelActive() 在首次通道建立时触发,由于通道的复用性其仅会触发一次;channelRead() 用于处理相应的业务逻辑,若执行耗时任务可通过 ctx.channel().eventLoop().execute() 新启线程异步执行;exceptionCaught() 则在通道异常时触发。

public class ChannelServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Server active.");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server read(channel), data: " + msg.toString());
        // 当处理耗时任务时可通过 eventLoop() 启动线程异步执行
        // 同理通过 schedule() 启动一个延迟任务
        ctx.channel().eventLoop().execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("Server read done.");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        // 完成处理,响应客户端请求
        ctx.writeAndFlush(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println("Server exception");
        cause.printStackTrace();
    }
}

2. 链路配置

通过继承 ChannelInitializer 将定义的一个或多个处理器构建添加至 ChannelPipeline 中。

在通道处理时将会按照此处 addLast() 的顺序执行,先添加的处理器处理优先级更高,可理解为一个队列逐级向后传递。

public class ChannelServerInitializer extends ChannelInitializer<SocketChannel> {

    /**
     * 传输通道数据编解码配置
     *
     * @param sc 网络连接通道
     */
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        // 获取 ChannelPipeline 对象,用于配置网络处理器
        ChannelPipeline pipeline = sc.pipeline();
        // 添加字符串编解码器
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
        // 添加自定义的业务处理器
        pipeline.addLast(new SimpleServerHandler());
        pipeline.addLast(new ChannelServerHandler());
    }
}

3. 事件监听

netty 中可针对每一步操作添加监听器,用于监听处理事件的成功与失败。

netty 提供了 lamdba 表达式与基础的类实例两种实现方法,下述为定义监听器类示例:

public class ServerChannelListener implements ChannelFutureListener {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Server lister, message: successful!");
        } else {
            System.err.println("Server lister, message: " + channelFuture.cause().getMessage());
        }
    }
}

4. 服务建立

完成上述步骤后即可启动服务端应用,通过 ServerBootstrap 创建服务端引导类,负责设置服务器的启动配置,将不同的组件组合起来以构建服务器端应用,可以配置服务器的一些参数,如绑定端口、设置事件处理器、指定连接处理流程等。

通过 group() 配置线程组,其中 parentGroup 负责接受客户端连接,childGroup 负责处理连接的 I/O 事件,childHandler() 用于配置事件处理器;channel() 用于配置通道模型,对于异步通讯通常使用 NioServerSocketChannel 对象,对应的还存在 OIO 但其在新版中已被标记弃用。

完整的服务端应用启动代码如下:

public class NettyServerTest {

    private final String HOST = "127.0.0.1";
    private final int PORT = 9090;

    private ServerBootstrap bootstrap;
    private EventLoopGroup parentGroup, childGroup;

    @Before
    public void init() {
        // parentGroup: 监听客户端请求
        parentGroup = new NioEventLoopGroup();
        // childGroup: 处理每条连接的数据读写
        childGroup = new NioEventLoopGroup();
        // bootstrap: 用于创建和配置服务器,从而启动服务器入口
        bootstrap = new ServerBootstrap()
                // 配置线程组的角色
                .group(parentGroup, childGroup)
                // 配置服务端的 IO 模型, 这里取 NIO
                .channel(NioServerSocketChannel.class)
                // 配置每条连接的数据读写和业务逻辑
                .childHandler(new SimpleServerInitializer());
    }
 
    @Test
    public void serverSyncDemo() {
        try {
            // 绑定监听端口,Host 未指定时使用本地地址
            ChannelFuture future = bootstrap.bind(HOST, PORT)
                    .addListener(new ServerChannelListener())
                    .sync();
            System.out.println("Bind done!");
            // 等待关闭线程将阻塞。
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

netty 中每一步操作通过 sync()await() 表示其为同步或异步操作,同时将返回一个 ChannelFuture 对象。即当以 sync() 操作时若任务耗时主线程将陷入阻塞,反之 await() 为异步则不会阻塞线程。

netty 中通过 close() 关闭传输通道,而 closeFuture() 用于获取关闭的结果。在上述示例中并未执行 close() 操作,因此 closeFuture().sync() 将会阻塞线程。

五、客户端应用

1. 事件处理

同理,客户端此处创建了一个处理器用于接收处理服务端返回的数据。

public class SimpleClientHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 处理客户端请求,并向客户端发送响应结果
     *
     * @param channelHandlerContext 请求上下文信息
     * @param msg                   接收的数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println("Client read, data: " + msg);
    }
}

2. 链路配置

ChannelPipeline 的配置与服务端类似,不再详细描述。

public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

    /**
     * 传输通道数据编解码配置
     *
     * @param sc 网络连接通道
     */
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        // 获取 ChannelPipeline 对象,用于配置网络处理器
        ChannelPipeline pipeline = sc.pipeline();
        // 添加字符串编解码器
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
        // 添加自定义的业务处理器
        pipeline.addLast(new SimpleClientHandler());
    }
}

3. 传输数据

netty 中提供了 ByteBuf 用于承当数据容器,其对标 Java 中的 ByteBuffer

ByteBuf 提供了两种初始化方式:alloc().buffer()alloc().directBuffer(),分别对应申请的内存为堆内内存和堆外内存,其中堆外内存并不受 JVM 垃圾回收的管理,在使用时需要尤为注意。

ByteBuf byteBuf1 = channel.alloc().buffer();
ByteBuf byteBuf2 = channel.alloc().directBuffer();
byteBuf1.writeBytes("hello world".getBytes());

4. 服务建立

客户端的建立与服务端类似,通过 Bootstrap 对象构造,不同的是其 group() 只需配置单个 EventLoopGroup

public class NettyClientTest {

    private final String HOST = "127.0.0.1";
    private final int PORT = 9090;

    private Bootstrap bootstrap;
    private EventLoopGroup loopGroup;

    @Before
    public void init() {
        loopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap()
                // 指定线程模型
                .group(loopGroup)
                // 指定 IO 类型为 NIO
                .channel(NioSocketChannel.class)
                // 禁用 Nagle 算法,发送数据包时都会立即发送,不会等待数据包的合并发送。
                .option(ChannelOption.TCP_NODELAY, true)
                // IO 处理逻辑
                .handler(new SimpleClientInitializer());
    }

    /**
     * 客户端发送同步请求,程序阻塞
     */
    @Test
    public void clientSyncDemo() {
        try {
            // 建立同步连接通道
            Channel channel = bootstrap.connect(HOST, PORT).sync().channel();
            // 写入数据至 ByteBuf
            for (int i = 0; i < 1; i++) {
                String message = "The message of " + i;
                ByteBuf byteBuf = channel.alloc().buffer();
                byteBuf.writeBytes(message.getBytes());
                // 发送数据至服务端
                channel.writeAndFlush(byteBuf);
                TimeUnit.SECONDS.sleep(1);
            }
            // 关闭与当前 Channel 关联的连接。
            channel.close().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录