主题
ChannelPipeline分析
概览
在Netty中,ChannelPipeline和ChannelHandler类似于Servlet和Filter过滤器,这类拦截器实际上是责任链模式的一种变形,这么设计是为了方便事件的拦截和用户业务逻辑的定制。
Servlet和Filter是JAVAEE中的基础组件,初学JAVAEE的小伙伴再熟悉不过了,它能够拦截到HTTP请求以及响应,并读出请求和响应的数据用作业务逻辑的处理,通过这种方式Servlet和Filter可以对Web应用程序进行预处理和后置处理。
Netty的Channel过滤器实现原理与ServletFilter机制一致,它将Channel的数据管道抽象为ChannelPipeline, 消息在ChannelPipeline中流动和传递。ChannelPipeline 持有I/O事件拦截器ChannelHandler的链表,由ChanneIHandler对I/O事件进行拦截和处理,可以方便地通过新增和删除ChannelHandler来实现不同的业务逻辑定制,不需要对已有的ChannelHandler进行修改,能够实现对修改封闭和对扩展的支持。总的而言,在Netty中,pipeline相当于Netty的大动脉,负责Netty的读写事件的传播。
ChannelPipeline原理分析
ChannelPipeline是ChannelHandler的容器,它负责ChannelHandler的管理和事件拦截。下面用一张图来展示在Netty中,一个“消息”被ChannelPipeline拦截,然后被ChannelHandler处理的过程,流程如下:
- 底层的SocketChannel read(方法读取ByteBuf, 触发ChannelRead 事件,由I/O线程NioEventLoop 调用ChannelPipeline 的fireChannelRead(Object msg)方法, 将消息 (ByteBuf)传输到ChannelPipeline中
- 消息依次被HeadHandler、ChannelHandler1、 ChannelHander2.....TailHandler 拦截和处理,在这个过程中,任何ChannelHandler都可以中断当前的流程,结束消息的传递
- 调用ChannelHandlerContext的write方法发送消息,消息从TailHandler开始,途经ChannelHanderN.....ChannelHandlerl. HeadHandler, 最终被添加到消息发送缓冲区中等待刷新和发送,在此过程中也可以中断消息的传递,例如当编码失败时,就需要中断流程,构造异常的Future返回
Netty中的事件分为inbound事件和outbound事件.inbound事件通常由1/O线程触发,例如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等,它对应上图的左半部分。
触发inbound事件的方法如下
- ChannelHandlerContext#fireChannelRegistered(): Channel 注册事件
- ChannelHandlerContext#fireChannelActive(): TCP链路建立成功, Channel激活事件
- ChannelHandlerContext#fireChannelRead(Object): 读事件
- ChannelHandlerContext#fireChannelReadComplete(): 读操作完成通知事件;
- ChannelHandlerContext#fireExceptionCaught(Throwable): 异常通知事件;
- ChannelHandlerContext#fireUserEventTriggered(Object): 用户自定义事件:
- ChannelHandlerContext#fireChannelWritabilityChanged(): Channel 的可写状态变化通知事件;
- ChannelHandlerContext#fireChannellnactive(): TCP连接关闭,链路不可用通知事件。
Outbound事件通常是由用户主动发起的网络I/O操作,例如用户发起的连接操作、绑定操作、消息发送等操作,它对应上图的右半部分。
触发outbound事件的方法如下
- ChannelHandlerContext#bind( SocketAddress, ChannelPromise):绑定本地地址事件
- ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise):连接服务端事件
- ChannelHandlerContext#write(Object, ChannelPromise):发送事件
- ChannelHandlerContext#flushO): 刷新事件
- ChannelHandlerContext#read(): 读事件
- ChannelHandlerContext#disconnect(ChannelPromise): 断开连接事件
- ChannelHandlerContext#close(ChannelPromise): 关闭当前Channel事件
ChannelPipeline接口
为了接下来能够方便的学习原理以及阅读源码,我们先看下ChannelPipeline的接口的继承关系图:
可以发现,ChannelPipeline接口还继承了ChannelOutboundInvoker以及ChannelInboundInvoker,这两个invoker接口作为ChannelPipeline接口的增强接口。分别看下ChannelPipeline和ChannelOutboundInvoker、ChannelInboundInvoker这三个接口定义。
对于ChannelPipeline接口,方法分别可以分为以下几组类别方法:
第一组是向ChannelPipeline中添加ChannelHandler,如下图所示:
这里需要提前知道的是,ChannelPipeline维护这一组双向链表的数据结构。
addFirst是向ChannelPipeline双向链表头补添加节点,addLast是向ChannelPipeline双向链表尾部添加节点,addBefore是向ChannelPipeline双向链表中指定的ChannelHandler之前添加一个新的节点,addAfter是向ChannelPipeline双向链表中指定的ChannelHandler之后添加一个节点。
第二组是向ChannelPipeline中移除ChannelHandler
第三组是向获取ChannelHandlerContext对象
第四组是ChannelInboundInvoker接口增强而来的方法
第五组是ChannelOutboundInvoker接口增强而来的方法
在Netty中,ChannelPipeline是一个双向链表的数据结构,那么链表节点是什么呢?答案就是ChannelHandlerContext对象。
在Netty中,ChannelHandlerContext对象就是存在ChannelPipeline双向链表中的节点元素,在ChannelPipeline中,Netty会为其初始化Head头结点和Tail尾结点,在ChannelPipeline实现类:DefaultChannelPipeline中可以看到定义:
java
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
DefaultChannelPipeline构造方法中,对head和tail进行了初始化
java
protected DefaultChannelPipeline(Channel channel) {
// 给channel赋值channel对象
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 节点对象是AbstractChannelHandlerContext对象,是用于进行业务处理
// HeadContext和TailContext就是用户可以模仿实现的ChannelHandler实现类
// channelPipeline双向连表的头节点
tail = new TailContext(this);
// channelPipeline双向连表的尾结点
head = new HeadContext(this);
// channelPipeline: head -> tail
head.next = tail;
tail.prev = head;
}
当Netty初始化完DefaultChannelPipeline对象之后,ChannelPipeline中就已经存在了head和tail两个节点了,自然Netty会通过前面介绍的addXxx方法来添加,下面看下ChannelPipeline的addXxx方法源代码:
DefaultChannelPipeline.java
java
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
/**
* 生成一个新的ChannelHandlerContext对象,这里返回的是DefaultChannelHandlerContext对象
*/
newCtx = newContext(group, filterName(name, handler), handler);
/**
* 向pipeline链表中添加一个新的节点
*/
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
// 触发handlerAdded方法,并开始传播handlerAdded事件,此处最终会调用ChannelInitializer#handlerAdded方法,并最终调用到initChannel方法。
callHandlerCallbackLater(newCtx, true);
return this;
}
/**
* 从NioEventLoopGroup中获取到NioEventLoop对象
*/
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 调用HandlerAdded方法
callHandlerAdded0(newCtx);
return this;
}
// 向尾结点添加一个节点,并移动指针位置
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
那么,对于拥有双向链表结构的ChannelPipeline来说,是如何让事件在链表结构中进行转移执行的?
就拿fireChannelRead方法来分析:
这里需要提前知道的一点是,AbstractCHannelHandlerContext#fireChannelRead方法会被复写了channelRead方法的ChannelHandler调用。
AbstractCHannelHandlerContext.java
java
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
/**
* findContextInbound 返回的-> AbstractChannelHandlerContext对象
*/
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
/**
* 查找下一个Inbound节点
* @param mask
* @return
*/
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
// Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
// We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
// everything to preserve ordering.
//
// See https://github.com/netty/netty/issues/10067
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
经过while循环遍历出下一个节点之后,变调用DefaultChannelPipeline#invokeChannelRead方法。
DefaultChannelPipeline
java
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 在Netty线程中,则直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
// 不在Netty线程中,则另开一个线程来调用ChanelRead方法
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
// 过滤handler的状态
if (invokeHandler()) {
try {
// 调用inboundHandler的channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
就这样,前一个节点的ChannelRead方法执行完,就会遍历出下一个节点的ChannelRead并执行,以此达到了在双向链表中移动节点元素的效果。
ChannelPipeline对象是在哪里创建的
Netty中,ChannelPipeline对象是在Channel被创建的时候生成的,看源码。
AbstractChannel.java
java
protected AbstractChannel(Channel parent) {
this.parent = parent;
// channel的标识
id = newId();
// channel的unsafe类
// NioSocketChannel和NioServerSocketChannel的unsafe对象都一样
unsafe = newUnsafe();
// 新建pipeline
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
// 随后调用DefaultChannelPipeline对象构造方法,在构造方法中生成TailContext和HeadContext,并维护好他们的链表关系
return new DefaultChannelPipeline(this);
}
ChannelHandler原理分析
ChannelPipeline是通过ChannelHandler接口来实现事件的拦截和处理,一般ChannelHandler只需要继承ChannelHandlerAdapter,然后覆盖自己关心的方法即可。
对于ChannelHandler接口,先看下其接口实现图:
可以看到ChannelHandler接口的子类实现图中,有两个重要的子接口:ChannelInboundHandler、ChannelOutboundHandlerAdapter,这两个子接口扩展了ChannelHandler的功能,分别对应着ChannelPipeline章节中介绍的inbound和outbound事件功能。先看看ChannelHandler接口定义了哪些方法。
ChannelHandler接口
java
public interface ChannelHandler {
/**
* 添加ChannelHandler的回调
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* 移除ChannelHandler的回调
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/**
*
*/
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
ChannelHandler是作为业务处理器保存在ChannelPipeline中的,它的其他功能都是在子类实现或者是子接口继承的,下面看下: ChannelHandlerAdapter
java
public abstract class ChannelHandlerAdapter implements ChannelHandler {
/**
* 判断当前这个ChannelHandler是否有@Shareble修饰,有的话该ChannelHandler就可以在不同的ChannelPipeline之间共享
*/
public boolean isSharable() {
/**
* Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
* {@link Thread}s are quite limited anyway.
*
* See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
*/
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
@Skip
@Override
@Deprecated
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
可以看到,ChannelHandlerAdapter作为抽象类只实现了顶级接口ChannelHandler的两个方法:isShareble和exceptionCaught,这里是Netty的风格之一,就是定义完顶级接口后,分别有公共抽象子类、子接口来对功能进行增强。那么对于ChannelHandler的功能增强,则由:ChannelOutboundHandler、ChannelInboundHandler来进行的增强。
ChannelOutboundHandler、ChannelInboundHandler接口
java
public interface ChannelInboundHandler extends ChannelHandler {
/**
* 通道注册完成的回调方法,方法中多以fireChannelRegistered方法为主,作用是往pipeline中传播channelRegistered事件
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* 通道解除注册的回调方法
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* 通道触发
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* 通道读取到消息
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
java
public interface ChannelOutboundHandler extends ChannelHandler {
/**
* 绑定socket事件回调
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* socket连接回调
*/
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* socket断开连接回调
*/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* socket关闭回调
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/
void read(ChannelHandlerContext ctx) throws Exception;
/**
*
*/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
*
*/
void flush(ChannelHandlerContext ctx) throws Exception;
}
ChannelInitializer抽象类
在来看看ChannelInitializer这个抽象类,定义了什么功能。
java
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
// initMap用于保存在不同pipeline之间共享的ChannelHandler对象,减少开销
private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
/**
* 初始化channel的抽象方法,具体由子类提供实现逻辑
*/
protected abstract void initChannel(C ch) throws Exception;
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// 通道注册完成后,对通道进行初始化
if (initChannel(ctx)) {
// 将通道注册完这个事件往pipeline里传播
ctx.pipeline().fireChannelRegistered();
// We are done with init the Channel, removing all the state for the Channel now.
removeState(ctx);
} else {
ctx.fireChannelRegistered();
}
}
/**
* Handle the {@link Throwable} by logging and closing the {@link Channel}. Sub-classes may override this.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (logger.isWarnEnabled()) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
}
ctx.close();
}
/**
* {@inheritDoc} If override this method ensure you call super!
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
initMap.remove(ctx);
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 调用抽象方法initChannel(channel)
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
// 将Channelhandler从initMap中移除
private void removeState(final ChannelHandlerContext ctx) {
// The removal may happen in an async fashion if the EventExecutor we use does something funky.
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
// The context is not removed yet which is most likely the case because a custom EventExecutor is used.
// Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
ctx.executor().execute(new Runnable() {
@Override
public void run() {
initMap.remove(ctx);
}
});
}
}
}
对于ChannelInboundHandlerAdapter这个抽象类来说,已经实现了ChannelInboundHandler这个接口的所有方法了,而ChannelOutboundHandlerAdapter抽象类同样已经实现了ChannelOutboundHandler接口的所有方法,因此继承了ChannelInitializer的实现类,只需要实现initChannel(Channel ch)方法即可。
下面看一个ChannelInitializer的例子
java
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public HttpHelloWorldServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpServerCodec());
p.addLast(new HttpServerExpectContinueHandler());
p.addLast(new HttpHelloWorldServerHandler());
}
}
由上面的例子知道,对于initChannel(Channel ch)方法而言,主要是用于往pipeline中添加ChannelHandler的。
ChannelHandlerContext源码分析
ChannelHandlerContext接口定义
java
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
/**
* 获得一个channel对象
*/
Channel channel();
/**
* 获取一个EventExecutor对象,这里实际获得的是NioEventLoop
*/
EventExecutor executor();
/**
* 获取ChannelHandler的名称
*/
String name();
/**
* 绑定在ChannelHandlerContext上的ChannelHandler
*/
ChannelHandler handler();
boolean isRemoved();
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
/**
* 获取pipeline
*/
ChannelPipeline pipeline();
/**
* 获取一个内存分配器
*/
ByteBufAllocator alloc();
/**
* @deprecated Use {@link Channel#attr(AttributeKey)}
*/
@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);
/**
* @deprecated Use {@link Channel#hasAttr(AttributeKey)}
*/
@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);
}
可以看到ChannelHandlerContext分别继承了ChannelInboundInvoker和ChannelOutboundInvoker接口,在分析Channelpipeline章节时,介绍过其二者定义的功能,ChannelInboundInvoker多以fireXxxx方法构成,代表的是触发的Xxx事件的传播,例如:
java
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
fireChannelRegistered方法就是触发了ChannelRegistered事件能够在ChannelPipeline中进行传播。
而ChannelOutboundInvoker多以socket的绑定、连接、读和写为住,常见的方法由write、flush以及writeAndFlush。
java
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
按照Netty架构的习惯,在给定一个接口之后,一般都会有对应的公共抽象类来定义公共的方法,并将需要定制的方法定义为抽象方法供不同的子类实现,照着这个思路可以找到AbstractChannelHandlerContext这个抽象类。
AbstractChannelHandlerContext源码分析
成员变量定义
在分析AbstractChannelHandlerContext源码之前,我们先看下它的成员变量定义,入下图所示,定义了两个volatile对象:
- volatile AbstractChannelHandlerContext next
- volatile AbstractChannelHandlerContext prev
这两个AbstractChannelHandlerContext对象作为指针实现了ChannelPipeline作为双向链表的数据结构。
java
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
关于XxxFieldUpdater类,可以阅读:https://my.oschina.net/u/4072299/blog/3115164
接着是AbstractChannelHandlerContext的状态:
- private static final int ADD_PENDING = 1
- private static final int ADD_COMPLETE = 2
- private static final int REMOVE_COMPLETE = 3
ADD_PENDING表示正在调用handlerAdded,ADD_COMPLETE表示已经调用完成了handlerAdded,而REMOVE_COMPLETE表示已经调用完handlerRemoved方法。
而ChannelHandlerContext中还会存有ChannelPipeline。
java
private final DefaultChannelPipeline pipeline
还有一个handlerState变量,用于定义当前ChannelHandler对象的状态,初始为INIT状态,表示handlerAdded和handlerRemove都还未调用过。
java
private volatile int handlerState = INIT
对于ChannelPipeline中的ChannelHandler是如何被调用以及如何移动双向链表中的对象的,实现原理就在这几个方法之间:
AbstractChannelHandlerContext.java
java
/*
* 触发ChannelActive事件
*/
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
// 获取NioEventLoop线程
EventExecutor executor = next.executor();
// 如果获取到的NioEventLoop线程是当前的线程
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
// 另开一个线程去执行
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
private void invokeChannelActive() {
// 检查ChannelHandler的状态
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
// 遍历下一个节点,重新调用下一个节点
fireChannelActive();
}
}
/*
* 找到inbound节点
*/
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
// 拿到当前的NioEventLoop线程
EventExecutor currentExecutor = executor();
do {
// 获取ChannelPipeline中的next节点
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
// 判断是否跳过此节点
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
当调用来到
java
((ChannelInboundHandler) handler()).channelActive(this);
会触发下一个Handler的channelActive,此处就拿Tail节点的channelActive来分析。
java
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
可以看到,调用又来到了ChannelHandlerContext#fireChannelActive,这样又要进行节点的遍历,就这样把事件传播了下去。
DefaultChannelPipeline源码分析
在DefaultChannelPipeline中,定义了一个head“头结点”和一个tail“尾结点”,它们都是AbstractChannelhandlerContext类的节点,我们都知道在ChannelPipeline中AbstractChannelHandlerContext就是节点元素的抽象类实现,而这个handlerContext持有ChannelHandler。
在Netty中我们还需要知道inbound和outbound类型的ChannelHandler节点的执行顺序。
下面来先看下一个Netty的demo
该Netty的demo中,分别定义了六个Handler,分为两组,一组是inboundHandler,另一组是outboundHandler。
InBoundHandlerA
java
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
ctx.fireChannelRead(msg);
}
}
InBoundHandlerB
java
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
ctx.write(msg, promise);
}
@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
ctx.executor().schedule(() -> {
ctx.channel().write("ctx.channel().write -> hello world");
ctx.write("hello world");
}, 3, TimeUnit.SECONDS);
}
}
InBoundHandlerC
java
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
ctx.fireChannelRead(msg);
}
}
java
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
执行结果如下:
InBoundHandlerA: hello world
InBoundHandlerB: hello world
InBoundHandlerC: hello world
可以发现Netty中,对于inboundHandler来说是按照顺序执行操作的。
接着在看看outboundHandler定义如下
OutBoundHandlerA
java
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerA: " + msg);
ctx.write(msg, promise);
}
}
OutBoundHandlerB
java
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
ctx.write(msg, promise);
}
}
OutBoundHandlerC
java
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerC: " + msg);
ctx.write(msg, promise);
}
}
然后修改Server类为如下,
java
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
执行结果如下:
OutBoundHandlerC: ctx.channel().write -> hello world
OutBoundHandlerB: ctx.channel().write -> hello world
OutBoundHandlerA: ctx.channel().write -> hello world
OutBoundHandlerA: hello world
可以看到在Netty中对于ountboundHandler来说,是倒序执行的。
整个Netty执行ChannelHandler可以用下图来描述。
上图描述的Head节点顺序执行,Tail节点逆序执行的源码是在DefaultChannelPipeline中,在《Netty-ChannelPipeline-上》文章开头就已经说明了,对于inboundHandler类型的Handler,主要还是用于监听Channel的read、register、active、exceptionCaught等事件,而对于outboundHandler类型来说,主要是用于bind、connect、write、flush等事件,回顾了这一点后,我们在继续看DefaultChannelPipeline源码
java
public class DefaultChannelPipeline implements ChannelPipeline {
... 省略
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
... 省略
}
分别以inbound类型的channelRead和outbound类型的write来分析。
DefaultChannelPipeline.java
java
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
在AbstractChannelHandlerContext#invokeChannelRead方法中,传入了一个重要的入参:head,这里就是传入的Head头结点,这一重要调用得以让inbound类型handler在ChannelPipeline中按顺序执行。
AbstractChannelHandlerContext.java
java
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 在NioEventLoop线程内,next这里传入的是head头结点
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
ChannelInboundHandler#channelRead的调用,会最终来到InBoundHandlerA里的channelRead方法。
java
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
ctx.fireChannelRead(msg);
}
}
经过AbstractChannelHandlerContext#fireChannelRead,会在ChannelPipeline中寻找下一个inbound,然后继续执行channelRead。
java
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
细看OutBoundHandlerB#handlerAdded方法由两个write,一个是ctx.channel.write,另一个是ctx.write,这两个有啥区别呢?为啥输出结果是三条:ctx.channel().write -> hello world,一条hello world呢?
启动Server启动类之后,再cmd窗口输入连接socket的命令debug之后分析得
telnet 127.0.0.1 8888
在客户端socket连接进Netty之后,会先注册channel并init初始化,这时会调用Server类里ServerBootstrap注入的ChannelInitilizer的initChannel方法,最终得以向ChannelPipeline里添加进OutBoundHandlerA、OutBoundHandlerB、OutBoundHandlerC,随后调用
java
ch.pipeline().addLast(new xxxx)
只有会触发DefaultChannelPipeline#callHandlerAdded0()方法,最终来到OutBoundHandler里的handlerAdded()方法,并向Netty的定时任务队列里添加了一个匿名内部任务,也就是:
java
@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
ctx.executor().schedule(() -> {
ctx.channel().write("ctx.channel().write -> hello world");
ctx.write("hello world");
}, 3, TimeUnit.SECONDS);
}
随后完成客户端Socket的初始化工作。此时服务端的selector继续执行for死循环,执行到任务队列,此时发现任务队列中有一个定时任务需要执行,则拿出任务并执行任务,执行过程会跳转到上面的匿名内部类,并依次执行ctx.channel().write()和ctx.write()两个方法。
java
ctx.channel().write()
方法会从ChannelPipeline的尾部tail开始执行(上文已经总结过,outboundHandler都是从tail节点开始执行handler) ,所以字符串“ctx.channel().write -> hello world”就会按outboundHandlerC、outboundHandlerB、outboundHandlerC这个顺序开始执行,执行完head节点之后会一路往上返回到Ctx.channel().write() 方法,并最后去执行ctx.write()方法,而ctx.write()方法会从当前的handler节点开始向前执行,所以当前outboundHandlerB的前节点是outboundHandlerA,所以最终控制台打印出:
OutBoundHandlerC: ctx.channel().write -> hello world
OutBoundHandlerB: ctx.channel().write -> hello world
OutBoundHandlerA: ctx.channel().write -> hello world
OutBoundHandlerA: hello world
整个过程比较复杂,也比较绕,下面用一张流程图来描述整个过程。
- TODO ChannelPipeline优化?MASK
- TODO SimpleChannelInboundHandler源码分析