目 录CONTENT

文章目录

netty - 高效的通信框架

FatFish1
2024-12-11 / 0 评论 / 0 点赞 / 51 阅读 / 0 字 / 正在检测是否收录...

netty简谈

netty是封装nio实现的一套JAVA高性能通信框架,它简化了网络通信编程

了解IO的发展史,大概可以看到是从最早的BIO到NIO,从阻塞到非阻塞的过程

如果使用BIO写通信框架,在通信中就会有大量的阻塞线程,产生巨大的消耗,如果消息消费漫长,服务的性能就会拉胯

如果使用NIO写通信框架,实际上性能是提高了,但是每次都要使用NIO写一套Buffer+Channel+Selector的代码,很麻烦

在这基础上,就出现了netty框架,简化逻辑编写

netty编程案例

依赖

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>...</version>
</dependency>

服务端架设

public class NettyServer {

    public void server() {
        new ServerBootstrap()
                // 1. group-事件组:对thread和nio.selector的封装,在netty中用于选择IO组件
                .group(new NioEventLoopGroup())
                // 2. channel-通道:对nio.channels的封装
                .channel(NioServerSocketChannel.class)
                // 3. 添加处理链
                .childHandler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 4.1 使用一些预置的处理器
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new LoggingHandler());
                        // 4.2 自定义处理器
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
                    }
                })
                // 4. 绑定端口
                .bind(8080);
    }
}

可以看到,主要有四个步骤:

  1. 设置group事件组:group是对thread和selector的封装,在netty中用于选择IO组件

  2. 设置channel通道:是对nio.channels的封装

  3. 添加处理链:这里可以添加一些预置的处理器(在io.netty.handler包下),也可以添加自定义的处理器

  4. 绑定监听端口

客户端开发

public class NettyClient {
    
    public void client() throws InterruptedException {
        new Bootstrap()
                // 1. 设置事件组
                .group(new NioEventLoopGroup())
                // 2. 设置channel
                .channel(NioSocketChannel.class)
                // 3. 设置处理链
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());    
                    }
                })
                // 4. 连接服务器
                .connect("localhost", 8080)
                // 5. 同步通信
                .sync()
                // 6. 代表连接对象
                .channel()
                // 7. 发送数据
                .writeAndFlush("hello server");
    }
}

客户端的逻辑与服务端类似,前三个是一样的,后面就是连接流程

通信测试

public class NettyTest {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                new NettyServer().server();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new NettyClient().client();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

源码阅读

group - 事件组

grou是对nio.selector的封装,首先明确两个概念:

  • EventLoop:事件循环对象,是对Thread的封装,它的本质就是一个单线程,它内部封装了nio.selector,可以在一个线程内多多个io进行轮询

  • EventLoopGroup:事件组,是对EventLoop的封装,它本质一个ExecutorService,通过维护多个EventLoop实现多线程控制io,真正开发的过程中,一般使用EventLoopGroup,不使用EventLoop

.group(new NioEventLoopGroup()) 方法往下看

NioEventLoopGroup的构造函数

public NioEventLoopGroup() {
    this(0);
}

首先是实现类的无参构造,传入了一个nthread=0

向上看,可以看到对nThread进行默认值判断

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

// -- MultithreadEventLoopGroup -- 
static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

在类的static方法中有对默认值的初始化

继续向上看,终于找到其构造函数

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args)
if (executor == null) {
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

线程池和命名逻辑

首先构造一个线程池

nio实现了自己的线程池,具有以下特性:

  • newDefaultThreadFactory()方法构造了一个DefaultThreadFactory工工厂

// io.netty.util.concurrent.MultithreadEventExecutorGroup
protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass());
}
  • ThreadPerTaskExecutor重写了execute方法,可以看到每执行一个任务,都会执行DefaultThreadFactory#newThread方法创建一条新线程

// io.netty.util.concurrent.ThreadPerTaskExecutor
public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}
  • DefaultThreadFactory的命名逻辑是nioEventLoopGroup-m-nn,m是代表第几个group,nn是第几个eventLoop

public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
    this(toPoolName(poolType), daemon, priority);
}

可见命名的逻辑来源于这个构造函数,调用toPoolName()方法,这里传进来的poolType是nioEventLoopGroup

EventExecutor的初始化

children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
    boolean success = false;
    try {
        children[i] = newChild(executor, args);
        success = true;
    } catch (Exception e) {
……

继续根据MultithreadEventExecutorGroup的构造方法,到了这里,实际上是在初始化EventExecutor数组

这里使用的是newChiled(Executor executor, Object... args) 这个方法构造EventLoop,可见NioEventLoopGroup中封装的实际上就是EventLoop

要注意的是,该方法是在NioEventLoopGroup中重写的

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    ……
    return new NioEventLoop(this, executor, selectorProvider,
            selectStrategyFactory.newSelectStrategy(),
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

NioEventLoop

根据NioEventLoopGroup的构造函数逻辑,可以看到是初始化了一个EventExecutor[],并且按照线程数量,初始化一定量的NioEventLoop存在里面

NioEventLoop的特性包括:

  • 封装了selector,具备nio的选择器能力

  • 实现Thread,具备任务启动能力

核心成员变量

private Selector selector;

可见里面是封装了nio.selector的

构造函数

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
    super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

完成了对selector的初始化操作

register

封装了nio.channel的register逻辑

if (inEventLoop()) {
    register0(ch, interestOps, task);
} else {
……

// register0
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
    try {
        ch.register(unwrappedSelector, interestOps, task);
    } catch (Exception e) {
        throw new EventLoopException("failed to register a channel", e);
    }
}

run方法

因为NioEventLoop实现了Thread的能力,因此也具备任务启动的能力

for (;;) {
    try {
        int strategy;
        try {
            // 1. 判断策略
            strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
            switch (strategy) {
            ……
            case SelectStrategy.SELECT:
                long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                if (curDeadlineNanos == -1L) {
                    curDeadlineNanos = NONE; // nothing on the calendar
                }
                nextWakeupNanos.set(curDeadlineNanos);
                try {
                    if (!hasTasks()) {
                        strategy = select(curDeadlineNanos);
                    }
                } finally {
                    // This update is just to help block unnecessary selector wakeups
                    // so use of lazySet is ok (no race condition)
                    nextWakeupNanos.lazySet(AWAKE);
                }
                // fall through
            default:
            }
        } catch (IOException e) {

执行一个死循环,首先判断策略

public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

正常有任务,这里判断的应该是SELECT策略,这时执行strategy = select(curDeadlineNanos) 方法,选择自己管理的nio.channel

private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        return selector.select();
    }
    // Timeout will only be 0 if deadline is within 5 microsecs
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

可见调用的是nio.selector的select()方法

后面根据select结果,做对应处理

channel - nio通道封装

netty.channel并不是nio.channels的直接实现,而是对其进行功能增强版本的封装

还是从其使用开始看

.channel(NioServerSocketChannel.class)

这里调用的是ServerBootStrap#channel方法,则先从该方法开始分析netty.channel的创建流程

netty.channel的创建流程

首先是AbstractBootStrap#channel方法

return channelFactory(new ReflectiveChannelFactory<C>(
        ObjectUtil.checkNotNull(channelClass, "channelClass")
));

可以看出来是一个工厂模式的反射工厂,看ReflectiveChannelFactory的构造函数

public ReflectiveChannelFactory(Class<? extends T> clazz) {
    ObjectUtil.checkNotNull(clazz, "clazz");
    try {
        this.constructor = clazz.getConstructor();

这里取到了类的constructor,备用,看它使用的地方是在下面的newChannel方法、

public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

可见这里调用的是指定class的无参构造函数,还记得一开始传进去的是NioServerSocketChannel.class,因此调用的目标就是它的无参构造,这里成功获取到了一个netty.channel的实例

相似的,客户端侧使用BootStrap#channel方法获取的是NioSocketChannel实例

再ctrl+alt+h分析下ReflectiveChannelFactory#newChannel的调用点,可以看到两个:

  • BootStrap#connect():客户端与特定ip建立连接的时候

  • ServerBootStrap#bind():服务端开始监听指定端口的时候

可见netty.channel的构造时机是在建连的时候

NioSocketChannel与NioServerSocketChannel

从无参构造开始看

public NioServerSocketChannel() {
    this(DEFAULT_SELECTOR_PROVIDER);
}

public NioServerSocketChannel(SelectorProvider provider) {
    this(provider, null);
}

// 函数3
public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
    this(newChannel(provider, family));
}

// 函数4
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// 函数5
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    ……
    try {
        ch.configureBlocking(false);
    ……

// 函数6
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

在函数3中调用newChannel()方法构造nio.channels实例

在函数4中指定关心的selectKey为OP_ACCEPT

在函数5中已经调用到父类AbstractNioChannel,这里把传进来的nio.channels实例绑定到成员属性中,同时还设置了非阻塞

在函数6中初始化了自己的channelPipeline,是后面加handler用的

看newInstance方法

private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
    try {
        ServerSocketChannel channel =
                SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
        return channel == null ? provider.openServerSocketChannel() : channel;
    ……

这里可以得到结论:NioServerSocketChannel是对ServerSocketChannel的封装,而非简单的重写,同理,NioSocketChannel也是对SocketChannel的封装

Future&Promise - netty的异步任务增强

前面已经看到了可见netty.channel的构造时机是在建连的时候,服务端是在ServerBootStrap#bind()方法,客户端是在BootStrap#connect()方法,以connect方法为例看下

connect - 连接建立的流程

public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
    validate();
    return doResolveAndConnect(remoteAddress, localAddress);
}

校验,剩下流程继续向下看

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 1. 初始化netty.channel,包括注册
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    // 2. 如果完成了就直接连接服务端即可
    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        // 连接服务端的具体代码
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        // 3. 如果暂时没有完成注册,则添加监听器,在注册完成后执行对应操作
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            // 4. 回调的具体实现
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
  
                    promise.registered();
                    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

首先在initAndRegister()方法中完成netty.channel的初始化过程,生成了一个ChannelFuture

当这个ChannelFuture实例是刚初始化的,需要向其中添加listener监听器,其中使用promise做连接

那么这个Future和Promise到底是啥呢

Future和Promise相关能力

ChannelFuture

public interface ChannelFuture extends Future<Void>

ChannelFuture是netty对JDK的Future的二次封装,它提供的能力主要是对于监听器的增删改,说明了这个接口的基本功能:回调功能,以及监听器的管理(曾删改)

Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

需要注意的是,这里返回的Future类型是netty.Future,已经对jdk的Future封装过一层了

GenericFutureListener

监听器类型

public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    void operationComplete(F future) throws Exception;
}

提供了一个方法,即针对成功的结果做相应的操作

Promise

Promise是对Future的拓展,比较重要的点,还是对回调监听器的管理,增加了判断状态的能力新添加的功能就是:判断future的状态,同时决定是否要唤醒监听器,或者是否要抛异常

public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    boolean setUncancellable();
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> await() throws InterruptedException;
    @Override
    Promise<V> awaitUninterruptibly();
    @Override
    Promise<V> sync() throws InterruptedException;
    @Override
    Promise<V> syncUninterruptibly();
}

ChannelPromise

public interface ChannelPromise extends ChannelFuture, Promise<Void>

这个玩意继承了ChannelFuture和Promise的能力,可知它提供的功能包括:

  • Promise提供的对状态的判断,以及根据状态判断是否唤醒监听器

  • Future提供的回调管理,对监听器的增删改查能力

ChannelPromise = Channel +future + Promise

其实换句话说,Future提供的是回调能力,即处理完了,我要做什么,而Promise提供的是提前获取处理结果的能力,因此它还相当于一个数据容器,即我可以从容器中取出一部分数据,或发现异常,提早抛出,不必等异步完全执行结束

pipeline - netty中的处理链

.childHandler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 4.1 使用一些预置的处理器
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new LoggingHandler());
                        // 4.2 自定义处理器
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
                    }
                })

还是先看应用,这里通过BootStrap#childHandler方法添加处理链,重新ChannelInitializer#initChannel方法添加自己的链

netty系统提供了一些默认的处理器,也可以自定义处理器

处理器的分类

  • ChannelInboundHandlerAdapter:入站处理器,数据从网络流向应用程序的过程被称为“入站”,即这类处理器处理的是网络中发来的数据,例如:

    • StringDecoder用于处理传进来的ByteBuffer,对其进行解码

    • LoggingHandler用于接收数据后的日志记录

  • ChannelOutboundHandlerAdapter:出站处理器,数据从应用程序流向网络的过程被称为“出站”,即这类处理器处理的是待发送的数据,处理完,数据就可以发送到网络了

这是两个默认的实现类,提供了一些处理方法的默认实现,例如:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    ……

    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    ……
}

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    ……

    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }
    
    ……
}

0

评论区