前面,我们分析了 Netty EventLoop 的 创建  与 启动  原理,接下里我们来分析 Netty 中另外两个重要组件 —— ChannelHandler Pipeline 
Netty 版本:4.1.30
 
我们前面在讲 Channel 创建  时,在 AbstractChannel 的构造函数中, 一笔带过地提到了 Pipeline,现在我们来深入分析一下它的原理。
概述 Netty channel lifecycle 前面,我们在分析 Netty channel  源码时,分析了 Channel 的创建、初始化、注册、绑定过程。在 Netty 中,channel 的生命周期如下所示:
ChannelRegistered:Channel 注册到了 EventLoop 上 
 ChannelActive:Channel 激活,连接到了远程某一个节点上,可以收发数据了 
 ChannelInactive:断开连接 
 ChannelUnregistered:Channel 从 EventLoop 上取消注册 
 
Netty channelHandler Channel 每一次状态的变化,都会产生一个对应的事件,并且都会触发 ChannelHandler 
在上面的 ChannelHandler UML 中,最为重要的两个 ChannelHandler:
Netty ChannelPipeline 前面我们在分析 Channel 创建过程时,每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。ChannelPipeline 是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链,如图所示:
一个 Channel 包含了一个 ChannelPipeline,ChannelPipeline 内部是一个双向的链表结构,内部由一个个的 ChannelHandlerContext 节点组成,ChannelPipeline 有头尾两个固定的节点 HeadContext 与 TailContext。用户自定的 ChannelHandler 就是由 ChannelHandlerContext 包装成 Pipeline 的节点,参与 Channel 整个生命周期中所触发的入站事件与出站事件以及相应数据流的拦截处理。
根据事件的起源,事件将会被 ChannelInboundHandler (入站处理器) 或者 ChannelOutboundHandler (出站处理器) 处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个 ChannelHandler,如图所示:
Pipeline UML 我们先来看下 ChannelPipeline  以及 ChannelHandlerContext  的类图结构,它们都实现了 ChannelInboundInvoker  与 ChannelOutboundInvoker  接口。
Pipeline 初始化 AbstractChannel 构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 protected  AbstractChannel (Channel parent)  {    this .parent = parent;     id = newId();     unsafe = newUnsafe();          pipeline = newChannelPipeline(); } protected  DefaultChannelPipeline newChannelPipeline ()  {    return  new  DefaultChannelPipeline (this ); } 
DefaultChannelPipeline 构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected  DefaultChannelPipeline (Channel channel)  {    this .channel = ObjectUtil.checkNotNull(channel, "channel" );     succeededFuture = new  SucceededChannelFuture (channel, null );     voidPromise =  new  VoidChannelPromise (channel, true );          tail = new  TailContext (this );          head = new  HeadContext (this ); 	          head.next = tail;     tail.prev = head; } 
我们可以看到 Pipeline 其实是一个双向链表的结构,刚刚初始化的时候,Pipeline (管道) 中只有两个节点,如图:
接下来我们看看组成 Pipeline 节点的对象 —— ChannelHandlerContext。
ChannelHandlerContext ChannelHandlerContext 实现了 AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker 接口。Pipeline 中的事件传播,都是由 ChannelHandlerContext 负责,将发生的事件从一个节点传到下一个节点。
ChannelHandlerContext 接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public  interface  ChannelHandlerContext  extends  AttributeMap , ChannelInboundInvoker, ChannelOutboundInvoker {         Channel channel () ;          EventExecutor executor () ;          String name () ;               ChannelHandler handler () ;          boolean  isRemoved () ; 		     @Override      ChannelHandlerContext fireChannelRegistered () ;     @Override      ChannelHandlerContext fireChannelUnregistered () ;     @Override      ChannelHandlerContext fireChannelActive () ;     @Override      ChannelHandlerContext fireChannelInactive () ;     @Override      ChannelHandlerContext fireExceptionCaught (Throwable cause) ;     @Override      ChannelHandlerContext fireUserEventTriggered (Object evt) ;     @Override      ChannelHandlerContext fireChannelRead (Object msg) ;     @Override      ChannelHandlerContext fireChannelReadComplete () ;     @Override      ChannelHandlerContext fireChannelWritabilityChanged () ;     @Override      ChannelHandlerContext read () ;     @Override      ChannelHandlerContext flush () ;          ChannelPipeline pipeline () ;          ByteBufAllocator alloc () ; } 
AttributeMap 接口 实现 AttributeMap 接口,表示 ChannelHandlerContext 节点可以存储自定义的属性。
1 2 3 4 5 6 7 public  interface  AttributeMap  {         <T> Attribute<T> attr (AttributeKey<T> key) ; 	     <T> boolean  hasAttr (AttributeKey<T> key) ; } 
ChannelInboundInvoker 接口 实现 ChannelInboundInvoker 接口,表示节点可以用于传播入站相关的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public  interface  ChannelInboundInvoker  {     	     ChannelInboundInvoker fireChannelRegistered () ; 	               ChannelInboundInvoker fireChannelUnregistered () ; 	               ChannelInboundInvoker fireChannelActive () ;           	     ChannelInboundInvoker fireChannelInactive () ;           	     ChannelInboundInvoker fireExceptionCaught (Throwable cause) ;           	     ChannelInboundInvoker fireUserEventTriggered (Object event) ;           	     ChannelInboundInvoker fireChannelRead (Object msg) ;      	     ChannelInboundInvoker fireChannelReadComplete () ;      	     ChannelInboundInvoker fireChannelWritabilityChanged () ; } 
ChannelOutboundInvoker 接口 实现 ChannelOutboundInvoker 接口,意味着节点可以用来处理出站相关的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public  interface  ChannelOutboundInvoker  {	          ChannelFuture bind (SocketAddress localAddress) ; 	ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) ;          ChannelFuture connect (SocketAddress remoteAddress) ;     ChannelFuture connect (SocketAddress remoteAddress, ChannelPromise promise) ;     ChannelFuture connect (SocketAddress remoteAddress, SocketAddress localAddress) ; 	ChannelFuture connect (SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) ;          ChannelFuture disconnect () ;     ChannelFuture disconnect (ChannelPromise promise) ;    	     ChannelFuture close () ;     ChannelFuture close (ChannelPromise promise) ;          ChannelFuture deregister () ;     ChannelFuture deregister (ChannelPromise promise) ; 	          ChannelOutboundInvoker read () ; 	          ChannelFuture write (Object msg) ;     ChannelFuture write (Object msg, ChannelPromise promise) ; 	         ChannelOutboundInvoker flush () ; 	          ChannelFuture writeAndFlush (Object msg, ChannelPromise promise) ;     ChannelFuture writeAndFlush (Object msg) ;     ChannelPromise newPromise () ;     ChannelProgressivePromise newProgressivePromise () ;     ChannelFuture newSucceededFuture () ;     ChannelFuture newFailedFuture (Throwable cause) ;     ChannelPromise voidPromise () ; } 
TailContext & HeadContext 接下来,我们看看 Pipeline 中的头部与尾部节点。
TailContext 节点 TailContext 是尾部节点,inbound 类型,主要处理 Pipeline 中数据流的收尾工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 final  class  TailContext  extends  AbstractChannelHandlerContext  implements  ChannelInboundHandler  {         TailContext(DefaultChannelPipeline pipeline) {                           super (pipeline, null , TAIL_NAME, true , false );                  setAddComplete();     } 	          @Override      public  ChannelHandler handler ()  {         return  this ;     } 	     ... 	     @Override      public  void  exceptionCaught (ChannelHandlerContext ctx, Throwable cause)  throws  Exception {         onUnhandledInboundException(cause);     }     @Override      public  void  channelRead (ChannelHandlerContext ctx, Object msg)  throws  Exception {         onUnhandledInboundMessage(msg);     } 	     ... } protected  void  onUnhandledInboundException (Throwable cause)  {    try  {         logger.warn(             "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. "  +             "It usually means the last handler in the pipeline did not handle the exception." ,             cause);     } finally  {                  ReferenceCountUtil.release(cause);     } } protected  void  onUnhandledInboundMessage (Object msg)  {    try  {         logger.debug(             "Discarded inbound message {} that reached at the tail of the pipeline. "  +             "Please check your pipeline configuration." , msg);     } finally  {         ReferenceCountUtil.release(msg);     } } final  void  setAddComplete ()  {    for  (;;) {         int  oldState  =  handlerState;         if  (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this , oldState, ADD_COMPLETE)) {             return ;        	}     } } 
AbstractChannelHandlerContext AbstractChannelHandlerContext 是 ChannelHandlerContext 的抽象实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 abstract  class  AbstractChannelHandlerContext  extends  DefaultAttributeMap         implements  ChannelHandlerContext , ResourceLeakHint {          ...               volatile  AbstractChannelHandlerContext next;          volatile  AbstractChannelHandlerContext prev; 	 	     private  final  boolean  inbound;          private  final  boolean  outbound;          private  final  DefaultChannelPipeline pipeline;          private  final  String name;     private  final  boolean  ordered;                    final  EventExecutor executor;          ...     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean  inbound, boolean  outbound) {                  this .name = ObjectUtil.checkNotNull(name, "name" );                  this .pipeline = pipeline;                  this .executor = executor;                  this .inbound = inbound;                  this .outbound = outbound;                  ordered = executor == null  || executor instanceof  OrderedEventExecutor;     } 	... 	 } 
DefaultChannelHandlerContext 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final  class  DefaultChannelHandlerContext  extends  AbstractChannelHandlerContext  {	     private  final  ChannelHandler handler; 	     DefaultChannelHandlerContext(             DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {                  super (pipeline, executor, name, isInbound(handler), isOutbound(handler));         if  (handler == null ) {             throw  new  NullPointerException ("handler" );         }         this .handler = handler;     }     @Override      public  ChannelHandler handler ()  {         return  handler;     }          private  static  boolean  isInbound (ChannelHandler handler)  {         return  handler instanceof  ChannelInboundHandler;     } 	     private  static  boolean  isOutbound (ChannelHandler handler)  {         return  handler instanceof  ChannelOutboundHandler;     } } 
HeadContext HeadContext 是头部节点,outbound 类型,用于传播事件和进行一些底层 socket 操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 final  class  HeadContext  extends  AbstractChannelHandlerContext         implements  ChannelOutboundHandler , ChannelInboundHandler {     private  final  Unsafe unsafe; 	     HeadContext(DefaultChannelPipeline pipeline) { 		 		         super (pipeline, null , HEAD_NAME, false , true );                  unsafe = pipeline.channel().unsafe();                  setAddComplete();     } 	 	     @Override      public  ChannelHandler handler ()  {         return  this ;     } 	     @Override      public  void  bind (              ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)             throws  Exception {                  unsafe.bind(localAddress, promise);     } 	     @Override      public  void  connect (              ChannelHandlerContext ctx,             SocketAddress remoteAddress, SocketAddress localAddress,             ChannelPromise promise)  throws  Exception {                 unsafe.connect(remoteAddress, localAddress, promise);     } 	     @Override      public  void  disconnect (ChannelHandlerContext ctx, ChannelPromise promise)  throws  Exception {     	         unsafe.disconnect(promise);     } 	     @Override      public  void  close (ChannelHandlerContext ctx, ChannelPromise promise)  throws  Exception {     	         unsafe.close(promise);     } 	     @Override      public  void  deregister (ChannelHandlerContext ctx, ChannelPromise promise)  throws  Exception {     	         unsafe.deregister(promise);     } 	     @Override      public  void  read (ChannelHandlerContext ctx)  {   	           unsafe.beginRead();     } 	     @Override      public  void  write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise)  throws  Exception {     	         unsafe.write(msg, promise);     } 	     @Override      public  void  flush (ChannelHandlerContext ctx)  throws  Exception {     	         unsafe.flush();     } 	     @Override      public  void  exceptionCaught (ChannelHandlerContext ctx, Throwable cause)  throws  Exception {     	         ctx.fireExceptionCaught(cause);     } 	     @Override      public  void  channelRegistered (ChannelHandlerContext ctx)  throws  Exception {         invokeHandlerAddedIfNeeded();                  ctx.fireChannelRegistered();     } 	     @Override      public  void  channelUnregistered (ChannelHandlerContext ctx)  throws  Exception {                  ctx.fireChannelUnregistered();                  if  (!channel.isOpen()) {             destroy();         }     }     @Override      public  void  channelActive (ChannelHandlerContext ctx)  throws  Exception {                  ctx.fireChannelActive(); 		 		 		         readIfIsAutoRead();     }     @Override      public  void  channelInactive (ChannelHandlerContext ctx)  throws  Exception {                  ctx.fireChannelInactive();     }     @Override      public  void  channelRead (ChannelHandlerContext ctx, Object msg)  throws  Exception {                  ctx.fireChannelRead(msg);     }     @Override      public  void  channelReadComplete (ChannelHandlerContext ctx)  throws  Exception {                  ctx.fireChannelReadComplete(); 		         readIfIsAutoRead();     }     private  void  readIfIsAutoRead ()  {         if  (channel.config().isAutoRead()) {             channel.read();         }     }     @Override      public  void  userEventTriggered (ChannelHandlerContext ctx, Object evt)  throws  Exception {     	         ctx.fireUserEventTriggered(evt);     }     @Override      public  void  channelWritabilityChanged (ChannelHandlerContext ctx)  throws  Exception {                  ctx.fireChannelWritabilityChanged();     } } 
Pipeline 节点添加 上面我们分析了 Pipeline 的基本结构,接下来我们看看 Pipeline 添加节点(也就是 Handler 处理器)的过程。该过程主要分为三步:
判断是否重复添加 
创建节点并添加至链表 
回调添加完成事件 
 
以这段常见的代码为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ServerBootstrap  b  =  new  ServerBootstrap ();b.group(group)         .channel(NioServerSocketChannel.class)         .localAddress(new  InetSocketAddress (port))         .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)         .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000 )         .handler(new  LoggingHandler (LogLevel.INFO))         .childHandler(new  ChannelInitializer <SocketChannel>() {             @Override              public  void  initChannel (SocketChannel ch)  throws  Exception {                                  ch.pipeline().addLast(serverHandler);             }         }); ChannelFuture  f  =  b.bind().sync();
我们从 ChannelPipeline.addLast()  方法进去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 public  class  DefaultChannelPipeline  implements  ChannelPipeline  {	     ... 	     @Override      public  final  ChannelPipeline addLast (ChannelHandler... handlers)  {         return  addLast(null , handlers);     }     @Override      public  final  ChannelPipeline addLast (EventExecutorGroup executor, ChannelHandler... handlers)  {         if  (handlers == null ) {             throw  new  NullPointerException ("handlers" );         }                  for  (ChannelHandler h: handlers) {             if  (h == null ) {                 break ;             }             addLast(executor, null , h);         }         return  this ;     } 	     @Override      public  final  ChannelPipeline addLast (EventExecutorGroup group, String name, ChannelHandler handler)  {         final  AbstractChannelHandlerContext newCtx;         synchronized  (this ) {                          checkMultiplicity(handler);                          newCtx = newContext(group, filterName(name, handler), handler);                          addLast0(newCtx);                                                                 if  (!registered) {                                  newCtx.setAddPending();                 callHandlerCallbackLater(newCtx, true );                 return  this ;             } 			                          EventExecutor  executor  =  newCtx.executor();             if  (!executor.inEventLoop()) {                                  newCtx.setAddPending();                 executor.execute(new  Runnable () {                     @Override                      public  void  run ()  {                                                  callHandlerAdded0(newCtx);                     }                 });                 return  this ;             }         }                  callHandlerAdded0(newCtx);         return  this ;     }          private  static  void  checkMultiplicity (ChannelHandler handler)  {                  if  (handler instanceof  ChannelHandlerAdapter) {             ChannelHandlerAdapter  h  =  (ChannelHandlerAdapter) handler;                          if  (!h.isSharable() && h.added) {                 throw  new  ChannelPipelineException (                     h.getClass().getName() +                     " is not a @Sharable handler, so can't be added or removed multiple times." );             }             h.added = true ;         }     }          private  AbstractChannelHandlerContext newContext (EventExecutorGroup group, String name, ChannelHandler handler)  {                  return  new  DefaultChannelHandlerContext (this , childExecutor(group), name, handler);     }          private  void  addLast0 (AbstractChannelHandlerContext newCtx)  {         AbstractChannelHandlerContext  prev  =  tail.prev;         newCtx.prev = prev;         newCtx.next = tail;         prev.next = newCtx;         tail.prev = newCtx;     } 	          private  void  callHandlerAdded0 (final  AbstractChannelHandlerContext ctx)  {         try  {                                       ctx.setAddComplete();                          ctx.handler().handlerAdded(ctx);         } catch  (Throwable t) {             ...                          remove0(ctx);             ...         }     } 	     ... } 
我们来看下 setAddComplete () 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 abstract  class  AbstractChannelHandlerContext  extends  DefaultAttributeMap         implements  ChannelHandlerContext , ResourceLeakHint {           ...      	     final  void  setAddComplete ()  {         for  (;;) {             int  oldState  =  handlerState;                                                    if  (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this , oldState, ADD_COMPLETE)) {                 return ;             }         }     }          ...               final  void  setAddPending ()  {         boolean  updated  =  HANDLER_STATE_UPDATER.compareAndSet(this , INIT, ADD_PENDING);         assert  updated;               } 	 	... } 
回调用户自定义 Handler 中的 handlerAdded 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Sharable public  class  ServerHandler  extends  ChannelInboundHandlerAdapter  {         ... 	        @Override      public  void  handlerAdded (ChannelHandlerContext ctx)  throws  Exception {         System.out.printf("ServerHandler added ...." );     }    	...      }     
ChannelInitializer 关于回调 ChannelHandler 中的 handlerAdded () 方法,最常见的一个场景就是,使用 ChannelInitializer 来添加我们自定义的 ChannelHandler。ChannelInitializer 被添加完成之后,会回调到它的 initChannel 方法。
接下来,我们看看 ChannelInitializer 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 @Sharable public  abstract  class  ChannelInitializer <C extends  Channel > extends  ChannelInboundHandlerAdapter  {   private  final  ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();            protected  abstract  void  initChannel (C ch)  throws  Exception;        ...            @Override      public  void  handlerAdded (ChannelHandlerContext ctx)  throws  Exception {         if  (ctx.channel().isRegistered()) {                                                                 initChannel(ctx);         }     }            @SuppressWarnings("unchecked")      private  boolean  initChannel (ChannelHandlerContext ctx)  throws  Exception {                  if  (initMap.putIfAbsent(ctx, Boolean.TRUE) == null ) {             try  {                                  initChannel((C) ctx.channel());             } catch  (Throwable cause) {                                                   exceptionCaught(ctx, cause);             } finally  {                                  remove(ctx);             }             return  true ;         }         return  false ;     }          private  void  remove (ChannelHandlerContext ctx)  {         try  {                          ChannelPipeline  pipeline  =  ctx.pipeline();                          if  (pipeline.context(this ) != null ) {                                                   pipeline.remove(this );             }         } finally  {             initMap.remove(ctx);         }     } } 
遍历 ChannelHandlerContext 节点查询出 ChannelHandler 实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public  class  DefaultChannelPipeline  implements  ChannelPipeline  {    ...               @Override      public  final  ChannelHandlerContext context (ChannelHandler handler)  {         if  (handler == null ) {             throw  new  NullPointerException ("handler" );         } 				         AbstractChannelHandlerContext  ctx  =  head.next;         for  (;;) {             if  (ctx == null ) {                 return  null ;             }             if  (ctx.handler() == handler) {                 return  ctx;             }             ctx = ctx.next;         }     }          ...      }     
Pipeline 中除了 addLast 方法外, 还有 addFirst、addBefore、addAfter 等方法,逻辑类似,可以自行研究学习。
Pipeline 节点删除 上面,我们讲了 Pipeline 节点的添加,这小结我们看看 Pipeline 节点的删除功能。
netty 有个最大的特性之一就是 Handler 可插拔,做到动态编织 pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此 context 移除,下次 pipeline 在传播事件的时候就就不会调用到权限认证处理器。
下面是权限认证 Handler 最简单的实现,第一个数据包传来的是认证信息,如果校验通过,就删除此 Handler,否则,直接关闭连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  class  AuthHandler  extends  SimpleChannelInboundHandler <ByteBuf> {	     ...          @Override      protected  void  channelRead0 (ChannelHandlerContext ctx, ByteBuf data)  throws  Exception {         if  (verify(authDataPacket)) {             ctx.pipeline().remove(this );         } else  {             ctx.close();         }     }     private  boolean  verify (ByteBuf byteBuf)  {              }          @Override      public  void  handlerRemoved (ChannelHandlerContext ctx)  throws  Exception {         System.out.println("AuthHandler has been removed ! " );     } } 
我们来看看 DefaultChannelPipeline 中的 remove 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public  class  DefaultChannelPipeline  implements  ChannelPipeline  {	     ... 	          @Override      public  final  ChannelPipeline remove (ChannelHandler handler)  {         remove(getContextOrDie(handler));         return  this ;     } 	     ...      	     private  AbstractChannelHandlerContext getContextOrDie (ChannelHandler handler)  {         AbstractChannelHandlerContext  ctx  =  (AbstractChannelHandlerContext) context(handler);         if  (ctx == null ) {             throw  new  NoSuchElementException (handler.getClass().getName());         } else  {             return  ctx;         }     } 	     ...               private  AbstractChannelHandlerContext remove (final  AbstractChannelHandlerContext ctx)  {                  assert  ctx != head && ctx != tail;         synchronized  (this ) {                          remove0(ctx);                                                    if  (!registered) {                 callHandlerCallbackLater(ctx, false );                 return  ctx;             }             EventExecutor  executor  =  ctx.executor();             if  (!executor.inEventLoop()) {                 executor.execute(new  Runnable () {                     @Override                      public  void  run ()  {                                                  callHandlerRemoved0(ctx);                     }                 });                 return  ctx;             }         }                  callHandlerRemoved0(ctx);         return  ctx;     }       	...            private  static  void  remove0 (AbstractChannelHandlerContext ctx)  {         AbstractChannelHandlerContext  prev  =  ctx.prev;         AbstractChannelHandlerContext  next  =  ctx.next;         prev.next = next;         next.prev = prev;     }          ...              private  void  callHandlerRemoved0 (final  AbstractChannelHandlerContext ctx)  {                  try  {             try  {                                                   ctx.handler().handlerRemoved(ctx);             } finally  {                                  ctx.setRemoved();             }         } catch  (Throwable t) {             fireExceptionCaught(new  ChannelPipelineException (                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception." , t));         }     }          ... 	 }     
好了, 删除的逻辑就分析到这里了。
小结 这一讲我们分析了 Pipeline 的创建过程,了解 Pipeline 中的链表结构以及每个节点的数据结构。还分析了 Pipeline 是如何添加节点的,又是如何删除节点的。接下来  ,我们会分析 Pipeline 如何进行事件传播的。
参考资料