在[基礎介紹](https://hackmd.io/@Iv5IzxMYT4OfZ08k6tYFMw/SyLbPCfBa)有提到,`ChannelHandler`是Netty 中用來處理IO 事件的介面,不過它提供的方法只有三個: - **handlerAdded()**:會在`ChannelHandler`被添加到`ChannelPipeline`時調用,可以在這個方法中進行一些初始化的操作或者準備工作。 - **handlerRemoved()**:與`handlerAdded()`相反,當`ChannelHandler`從`ChannelPipeline`中移除時被調用。 - **exceptionCaught()**:當`ChannelHandler`在處理業務邏輯的過程中出現異常時被調用。 很明顯,只有三個方法對於龐大的業務需求來說是不夠的,因此通常是實現豐富的子類別,但是這邊要先介紹兩個物件,`ChannelHandlerContext`和`ByteBuf`。 --- ## 核心物件 ### ChannelHandlerContext 雖然在基本介紹中沒有提及,但其實`ChannelHandlerContext`在Netty 中扮演著關鍵的角色,它用於管理與操作`ChannelHandler`,同時負責在`ChannelPipeline`中傳遞事件。 它的功能包括: 1. **管理處理器(Handler)鏈**:`ChannelHandlerContext`維護著一個雙向鏈結列表,連接了`ChannelHandler`物件。這個鏈結按照處理器添加的順序排列,允許處理器有序地處理傳入和傳出的事件。 2. **事件傳播**:`ChannelHandlerContext`提供了觸發事件傳播的方法,例如 `fireChannelRead()`用於觸發資料讀取事件,`fireExceptionCaught()`用於觸發異常事件等。這些方法能夠沿著處理器鏈傳播事件,讓每個處理器有機會對事件進行處理或將其傳遞給鏈中的下一個處理器。 3. **與`Channel`的關聯**:每個`ChannelHandlerContext`都與一個特定的`Channel`相關聯,因此它能夠提供有關這個`Channel`的信息,例如底層連接的狀態、寫入和刷新操作等。 4. **操作`ChannelPipeline`**:`ChannelHandlerContext`可以對`ChannelPipeline`進行操作,例如動態地添加、移除或替換`ChannelHandler`。這使得在運行時靈活地調整處理器鏈成為可能。 5. **線程安全**:`ChannelHandlerContext`是線程安全的,因此可以安全地在多個線程之間共享,用於觸發事件或訪問相關聯的`Channel`。 ### ByteBuf byte 是網路傳輸資料的基本單位,Java NIO 有提供了ByteBuffer 作為byte 容器,但這個類別用起來過於複雜,Netty 為了簡化和優化資料的處理,另外提供`ByteBuf`來代替ByteBuffer。 --- ## 入站資料 在Netty 當中,會將資料流分為**入站資料**和**出站資料**,也就是**從Client 接收到的資料**和**要發送給Client 的資料**,根據資料流的不同,會使用不同的處理器。 ### ChannelInboundHandler ```java public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext var1) throws Exception; void channelUnregistered(ChannelHandlerContext var1) throws Exception; void channelActive(ChannelHandlerContext var1) throws Exception; void channelInactive(ChannelHandlerContext var1) throws Exception; void channelRead(ChannelHandlerContext var1, Object var2) throws Exception; void channelReadComplete(ChannelHandlerContext var1) throws Exception; void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception; void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception; void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; } ``` `ChannelHandler`的第一層子介面,負責提供**入站資料**處理方法的介面,也就是**處理從Client 接收到的資料**,通常也是使用這個介面的實作類進行開發,它提供了以下九種方法: - **channelRegistered()**:在`Channel`被建立完成後會被註冊到某個`EventLoop`上,當註冊成功時被調用。 - **channelUnregistered()**:與`channelRegistered()`相反,當取消註冊時被調用。 - **channelActive()**:當`Channel`處於活躍狀態(連線就緒)時被調用。 - **channelInactive()**:當`Channel`不再處於活躍狀態且即將關閉時被調用。 - **channelRead()**:當從`Channel`讀取到新的資料時被調用,也就是撰寫業務邏輯的方法。 - **channelReadComplete()**:當`Channel`上的一個讀操作完成時被調用,也就是`channelRead()`之後。 - **userEventTriggered()**:當`ChannelPipeline`上的一個用戶事件觸發時被調用,處理用戶自定義事件。 - **channelWritabilityChanged()**:當`Channel`的可寫狀態改變時被調用。 - **exceptionCaught()**:在處理入站資料的業務邏輯時,發生異常時被調用。 ### ChannelInboundHandlerAdapter ```java public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { public ChannelInboundHandlerAdapter() { } @Skip public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } @Skip public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } @Skip public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } @Skip public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Skip public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Skip public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } @Skip public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Skip public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } @Skip public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } } ``` `ChannelInboundHandler`的實作類,**提供所有方法的默認實現**,只要根據需求覆寫對應的方法即可。 這邊需要注意的點是`channelRead()`的資源釋放,在Netty 中,通常是使用`ByteBuf`代表入站資料,預設是使用`ChannelHandlerContext`的`fireChannelRead()`將資料傳遞給下一個處理器。 若是該筆資料在傳遞之後不會再被使用,可以使用`ReferenceCountUtil.release()`或是`ByteBuf.release()`兩種方法來釋放`ByteBuf`物件的資源,這樣**處理完資料之後釋放相關資源可以避免內存泄露**。 ### SimpleChannelInboundHandler ```java public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter { private final TypeParameterMatcher matcher; private final boolean autoRelease; protected SimpleChannelInboundHandler() { this(true); } protected SimpleChannelInboundHandler(boolean autoRelease) { matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I"); this.autoRelease = autoRelease; } protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) { this(inboundMessageType, true); } protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) { matcher = TypeParameterMatcher.get(inboundMessageType); this.autoRelease = autoRelease; } public boolean acceptInboundMessage(Object msg) throws Exception { return matcher.match(msg); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } } protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception; } ``` `ChannelInboundHandlerAdapter`的子類別,覆寫了`channelRead()`方法,**實作了釋放資源的邏輯**,改提供`channelRead0()`給開發人員使用,讓開發人員只需要專注在業務邏輯的內容,不用再自行撰寫釋放的業務。 另外還支援泛型,讓`channelRead0()`只需要處理特定類型的資料,不需要額外撰寫邏輯判斷。 --- ## 出站資料 出站資料的類別與入站資料的差不多,中間的單字Inbound 改成Outbound 而已,應該很好記憶。 ### ChannelOutboundHandler ```java public interface ChannelOutboundHandler extends ChannelHandler { void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception; void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception; void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; void read(ChannelHandlerContext var1) throws Exception; void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception; void flush(ChannelHandlerContext var1) throws Exception; } ``` 與`ChannelInboundHandler`相同,是`ChannelHandler`的第一層子介面,負責提供**入站資料**處理方法的介面,也就是**處理從Client 接收到的資料**,它提供了以下八種方法: - **bind()**:將`Channel`綁定到指定的本地地址。 - **connect()**:連接到遠程節點。 - **disconnect()**:斷開與遠程節點的連接。 - **close()**:關閉`Channel`。 - **deregister()**:解除`Channel`的註冊。 - **read()**:請求從`Channel`讀取資料。 - **write()**:將訊息寫入`Channel`。 - **flush()**:將出站緩衝區的資料刷新到遠程節點。 ### ChannelOutboundHandlerAdapter ```java public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { public ChannelOutboundHandlerAdapter() { } @Skip public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } @Skip public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } @Skip public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } @Skip public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } @Skip public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } @Skip public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } @Skip public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } @Skip public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } } ``` 對應`ChannelInboundHandlerAdapter`是`ChannelOutboundHandler`的**默認實作**類,它的用途包括但不限於: - 資料編碼、加密、壓縮或其他處理。 - 流量控制與管理。 - 錯誤處理與日誌記錄。 - 協議處理。 --- ## 雙向處理器 `ChannelDuplexHandler`是Netty 有一個特殊的處理器,它繼承了`ChannelInboundHandlerAdapter`並實作了`ChannelOutboundHandlerAdapter`,能夠**處理入站和出站事件**,因此被稱為雙向處理器。 基本上就是包含了上面提到的所有方法,這邊就不再贅述了。 --- ## 總結 關於`ChannelHandlerContext`和`ByteBuf`的內容並沒有很詳細的介紹,它們使用方法雖然簡單,但其實底層也很多東西可以寫,有興趣的話可以自行查詢相關資料。 `ChannelHandler`的部分,大部分都是實作入站資料的處理器,可以根據需求去判斷應該在哪個階段的哪個方法使用: 1. **建立`Channel`連線**:handlerAdded()→channelRegistered()→channelActive() 2. **處理`Channel`的IO 事件**:channelRead()→channelReadComplete() 3. **關閉`Channel`連線**:channelInactive()→channelUnregistered()→handlerRemoved() 4. **`Channel`處理過程發生異常**:exceptionCaught() 總體來說,`ChannelHandler`的方法包山包海,可以將資料的處理流程分成好幾個階段,根據需求內容撰寫邏輯在對應的方法上,可以讓整個程式碼更加簡潔且好維護。 --- ## 參考資料 [Netty Github](https://github.com/netty/netty) [Netty ServerBootstrap 服务端启动类](https://blog.csdn.net/W664160450/article/details/123404768) [netty的引导程序ServerBootStrap](https://blog.csdn.net/u010853261/article/details/53738060) [关于netty配置的理解serverBootstrap.option和serverBootstrap.childOption](https://www.cnblogs.com/xiaoyao-001/p/10855249.html)