2 Star 1 Fork 0

zhrun8899 / learning-notes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
netty 学习.md 11.68 KB
一键复制 编辑 原始数据 按行查看 历史

netty 学习

Bootstrap,一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件。

​ Handler,为了支持各种协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。

ChannelInboundHandler,一个最常用的Handler。这个Handler的作用就是处理接收到数据时的事件,也就是说,我们的业务逻辑一般就是写在这个Handler里面的,ChannelInboundHandler就是用来处理我们的核心业务逻辑。

如何使用channelInboundHandler?

​ ChannelInitializer,当一个链接建立时,我们需要知道怎么来接收或者发送数据,当然,我们有各种各样的Handler实现来处理它,那么ChannelInitializer便是用来配置这些Handler,它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline。

​ ChannelPipeline,一个Netty应用基于ChannelPipeline机制,这种机制需要依赖于EventLoop和EventLoopGroup,因为它们三个都和事件或者事件处理相关。

​ EventLoops的目的是为Channel处理IO操作,一个EventLoop可以为多个Channel服务。

​ EventLoopGroup会包含多个EventLoop。

?什么时候需要定义多个group?

Netty 的服务器端的 acceptor 阶段, 没有使用到多线程,也就是说n设置1和10是没有区别的。

服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 因此在调用 Java NIO 的 Selector.select 处理客户端的连接请求时, 实际上是在一个线程中的, 所以对只有一个服务的应用来说, bossGroup 设置多个线程是没有什么作用的, 反而还会造成资源浪费

是不是可以这样理解:有多个server时需要EventLoopGroup bossGroup = new NioEventLoopGroup(n);

注意:n最好是2 的m次幂.

NioEventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads, 这样就构成了一个线程池

如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2

MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组。抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.

  1. NioEventLoop 前面一节说道了newChild创建NioEventLoop实例,下面开始分析NioEventLoop。

NioEventLoop 继承于 SingleThreadEventLoop, 而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中对本地线程的抽象, 它内部有一个 Thread thread 属性, 存储了一个本地 Java 线程. 因此我们可以认为, 一个 NioEventLoop 其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改变。 一个 NioEventLoop 通常需要肩负起两种任务, 第一个是作为 IO 任务, 处理 IO 操作,如accept、connect、read、write等。第二个就是非IO任务, 处理 taskQueue 中的任务,,如register0、bind0等任务。先回顾一下NIO中Selector的使用流程: ioRatio 的作用就是限制执行任务队列的时间。如果 ioRatio 比例是100 的话,则这个比例无作用。公式则是建立在 IO 时间上的,公式为 ioTime * (100 - ioRatio) / ioRatio ; 也就是说,当 ioRatio 是 10 的时候,IO 任务执行了 100 纳秒,则非IO任务将会执行 900 纳秒,直到没有任务可执行

原文链接:https://blog.csdn.net/TheLudlows/article/details/82961193

​ Channel代表了一个Socket链接,或者其它和IO操作相关的组件,它和EventLoop一起用来参与IO处理。

​ Future,在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发。总之,所有的操作都会返回一个ChannelFuture。

?业务处理能否写到future中?

Domain Logic

​ 其实我们最最关心的事情就是如何处理接收到的解码后的数据,我们真正的业务逻辑便是处理接收到的数据。Netty提供了一个最常用的基类SimpleChannelInboundHandler,其中T就是这个Handler处理的数据的类型(上一个Handler已经替我们解码好了),消息到达这个Handler时,Netty会自动调用这个Handler中的channelRead0(ChannelHandlerContext,T)方法,T是传递过来的数据对象,在这个方法中我们便可以任意写我们的业务逻辑了。

NioEventLoopGroup 相当于 1 个事件循环组,这个组里包含多个事件循环 NioEventLoop,每个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。

每个 Boss NioEventLoop 循环执行的任务包含 3 步:

  • 轮询 Accept 事件。
  • 处理 Accept I/O 事件,与 Client 建立连接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上。
  • 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务。

每个 Worker NioEventLoop 循环执行的任务包含 3 步:

  • 轮询 Read、Write 事件。
  • 处理 I/O 事件,即 Read、Write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理。
  • 处理任务队列中的任务,runAllTasks。

其中任务队列中的 Task 有 3 种典型使用场景。

①用户程序自定义的普通任务

ctx.channel().eventLoop().execute( newRunnable() {

@Override

publicvoidrun(){

//...

}

});

②非当前 Reactor 线程调用 Channel 的各种方法

例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费。

③用户自定义定时任务

ctx.channel().eventLoop().schedule( newRunnable() {

@Override

publicvoidrun(){

}

}, 60, TimeUnit.SECONDS);

https://my.oschina.net/u/3967312/blog/2873872

public class NettyConfig {

/**
 * 存储每一个客户端接入进来时的channel对象
 */
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

} public class ServerHandler extends ChannelInboundHandlerAdapter {

/**
 * 客户端与服务端创建连接的时候调用
 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("客户端与服务端连接开始...");
    NettyConfig.group.add(ctx.channel());
}
/**
     * 服务端接收客户端发送过来的数据结束之后调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        System.out.println("信息接收完毕...");
    }



 /**
     * 服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息
     */
    @Override
	public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception {
    	System.out.println("我是服务端,我接受到了:" + ((RequestInfo)info).getInfo());
    	//服务端使用这个就能向 每个连接上来的客户端群发消息
    	NettyConfig.group.writeAndFlush(info);
    	Iterator<Channel> iterator = NettyConfig.group.iterator();
    	while(iterator.hasNext()){
    		//打印出所有客户端的远程地址
    		System.out.println((iterator.next()).remoteAddress());
    	}
//    	//单独回复客户端信息
//    	channelHandlerContext.writeAndFlush(info);
    }
 * 服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息
      */
    @Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception {
    	System.out.println("我是服务端,我接受到了:" + ((RequestInfo)info).getInfo());
    	//服务端使用这个就能向 每个连接上来的客户端群发消息
    	NettyConfig.group.writeAndFlush(info);
    	Iterator<Channel> iterator = NettyConfig.group.iterator();
    	while(iterator.hasNext()){
    		//打印出所有客户端的远程地址
    		System.out.println((iterator.next()).remoteAddress());
    	}
//    	//单独回复客户端信息
//    	channelHandlerContext.writeAndFlush(info);
    }

server中加入如下代码:

public void sendMessage(Object msg){
		if(serverSocketChannel != null){
			serverSocketChannel.writeAndFlush(msg);
		}
	}
 //使用原子技术 
    private static final AtomicLong al = new AtomicLong(0);

    public ClientRequest(){
        //请求唯一标识id 每次都会自增加1
        id = al.incrementAndGet();
    }
public class TcpServerInitalizer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
        //添加客户端和服务端之间的心跳检查状态
        ch.pipeline().addLast(new IdleStateHandler(6, 2, 1, TimeUnit.SECONDS));
        ch.pipeline().addLast(new TcpServerHandler());
        ch.pipeline().addLast(new StringEncoder());
    }
}

Netty系列之实现长连接

https://blog.csdn.net/yuliantao/article/details/82593100

alibaba SOFA Bolt Handbook

https://github.com/sofastack/sofa-bolt/wiki/SOFA-Bolt-Handbook

netty server程序所需功能:

1.需要能记忆登陆的客户的信息与socket对应关系,可能需要保存更多信息.

2.心跳包

3.处理线程来处理所收到的包

4.回包

5.decoder,encoder功能

6.日志功能

7.不会无故退出

netty客户端:

1.重新连接功能

2.日志功能

3.登录功能

4.处理线程

5.心跳包

说一下逻辑吧:发送消息时,除了心跳消息、握手消息、状态报告消息外,消息都加入消息发送超时管理器,立马开启一个定时器,比如每隔5秒执行一次,共执行3次,在这个周期内,如果消息没有发送成功,会进行3次重发,达到3次重发后如果还是没有发送成功,那就放弃重发,移除该消息,同时通过消息转发器通知应用层,由应用层决定是否再次重发。如果消息发送成功,服务端会返回一个消息发送状态报告,客户端收到该状态报告后,从消息发送超时管理器移除该消息,同时停止该消息对应的定时器即可。 另外,在用户握手认证成功时,应该检查消息发送超时管理器里是否有发送超时的消息,如果有,则全部重发:

可以看到,利用userEventTriggered()方法回调,通过IdleState类型,可以判断读超时/写超时/读写超时,这个在添加IdleStateHandler时可以配置,下面会贴上代码。首先我们可以在READER_IDLE事件里,检测是否在规定时间内没有收到服务端心跳包响应,如果是,那就触发重连操作。在WRITER_IDEL事件可以检测客户端是否在规定时间内没有向服务端发送心跳包,如果是,那就主动发送一个心跳包。发送心跳包是在子线程中执行,我们可以利用之前写的work线程池进行线程管理。 addHeartbeatHandler()代码如下:从图上可看到,在IdleStateHandler

onConnectStatusCallback(int connectStatus)为连接状态回调,以及一些公共逻辑处理:

1
https://gitee.com/zhrun8899/learning-notes.git
git@gitee.com:zhrun8899/learning-notes.git
zhrun8899
learning-notes
learning-notes
master

搜索帮助