# Netty是什麼? - Netty是一個基於NIO的client-server(客戶端服務器)框架,使用它可以快速簡單地開發網絡應用程序。 它極大地簡化並優化了TCP 和UDP 套接字服務器等網絡編程,並且性能以及安全性等很多方面甚至都要更好。 支持多種協議如FTP,SMTP,HTTP以及各種二進制和基於文本的傳統協議。 # Netty有什麼樣的應用場景? 1.實作一個HTTP服務器 - 通過Netty我們可以自己實現一個簡單的HTTP服務器 2.實作一個即時通訊系統 - 可以實現一個可以聊天類似Line的即時通訊系統 3.實現推播系統 - 市面上有很多推播系統都是基於Netty來做的 - # 阻塞式IO-BIO(Blocking IO) * *實現模式為一個連接一個執行緒* * *當併發數很大,就會創造大量執行緒,佔用很大的資源* ![](https://i.imgur.com/4jqg8Ci.png) - # 非阻塞式IO- NIO(Non-Blocking IO) * 實現模式為一個執行緒處理多個請求,(因為每個連接不是時時刻刻都是在讀寫數據)Selector會去輪詢,發現有I/O請求就進行處理。 **NIO概念圖** ![](https://i.imgur.com/DfK29Ue.png) --- - # NIO三大核心-Selector(選擇器),Channel(通道),Buffer(緩衝區) NIO的Channel類似流Stream,但是Stream是單向的,而Channel是雙向的,也就是說Channel可以從Buffer讀取資料,也可以寫數據到Buffer GroupCharServer.java ``` import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; /** * @author swshawnwu@gmail.com(ShawnWu) */ public class GroupChatServer { private ServerSocketChannel serverChannel; private Selector selector; private static final int PORT = 7777; public GroupChatServer() { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.socket().bind(new InetSocketAddress(PORT)); serverChannel.configureBlocking(false); serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } private void listen() { while (true) { try { int count = selector.select(2000); if (count > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) {// 處理Accept SocketChannel socketChannel = serverChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress() + "上線了"); } if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); readMessage(socketChannel); } iterator.remove(); } } } catch (IOException e) { e.printStackTrace(); } } } private void readMessage(SocketChannel socketChannel) { try { ByteBuffer buffer = ByteBuffer.allocate(1024); int read = socketChannel.read(buffer); if (read > 0) { String msg = new String(buffer.array()); System.out.println("來自Client端:" + msg); sendToOtherClients(msg, socketChannel); } } catch (IOException e) { e.printStackTrace(); } } private void sendToOtherClients(String msg, SocketChannel self) throws IOException { System.out.println("Server轉發訊息中"); for (SelectionKey key : selector.keys()) { Channel targetChannel = key.channel(); if(targetChannel instanceof SocketChannel && targetChannel != self){ SocketChannel otherChannel = (SocketChannel) targetChannel; ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); otherChannel.write(buffer); } } } public static void main(String[] args) { GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } } ``` GroupChatClient.java ``` import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Scanner; /** * @author swshawnwu@gmail.com(ShawnWu) */ public class GroupChatClient { private static final String Host = "127.0.0.1"; private static final int PORT = 7777; private Selector selector; private SocketChannel socketChannel; private String username; public GroupChatClient() throws IOException { selector = Selector.open(); socketChannel = SocketChannel.open(new InetSocketAddress(Host, PORT)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); username = socketChannel.getLocalAddress().toString(); System.out.println(username + " ready to start chatting."); } private void readMessage() { try { int readChannels = selector.select(); if (readChannels > 0) { for (SelectionKey key : selector.selectedKeys()) { if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.read(buffer); String msg = new String(buffer.array()); System.out.println(msg.trim()); } } } } catch (IOException e) { e.printStackTrace(); } } public void sendMessage(String msg) { msg = username + "說:" + msg; try { socketChannel.write(ByteBuffer.wrap(msg.getBytes())); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { GroupChatClient groupChatClient = new GroupChatClient(); new Thread(() -> { while (true) { try { Thread.sleep(3000); groupChatClient.readMessage(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String s = scanner.nextLine(); groupChatClient.sendMessage(s); } } } ``` 這種概念又稱Reactor模式 - # Reactor模式的三種實現 * 單Reactor單執行緒 ![](https://i.imgur.com/WyvRSIk.png) 優點:模型簡單 沒有多執行緒。 缺點:只有一個問題,無法發揮多核心CPU性能,若是執行緒被關掉,會導致整個通訊模組不可用 ---- * 單Reactor多執行緒 ![](https://i.imgur.com/AbRKj9N.png) 優點: 可以充分利用到多核心CPU處理能力 缺點: Reactor處理所有事件的監聽和響應,在單個執行緒運行,當請求變多,一樣容易遇到效能瓶頸 --- * 主從Reactor多執行緒 * ![](https://i.imgur.com/vcYigTN.png) 優點: 1.父執行緒與子執行緒的資料互動簡單職責明確,父執行緒只需要接收新連線,子執行緒完成後續的業務處理 2.Reactor模型本身與具體事件處理邏輯無關 缺點: 設計複雜度高 --- Netty 主要架構基於**主從Reactor多執行緒** ![](https://i.imgur.com/BDyk2Ow.png) --- NettyServer.java ``` /** * @author swshawnwu@gmail.com(ShawnWu) */ package netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) { EventLoopGroup eventExecutors = new NioEventLoopGroup(); ChannelFuture channelFuture; try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new MyNettyClientHandler()); } }); System.out.println("Client端OK"); channelFuture = bootstrap.connect("127.0.0.1", NettyServer.PORT).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { eventExecutors.shutdownGracefully(); } } } ``` NettyServer.java ``` /** * @author swshawnwu@gmail.com(ShawnWu) */ package netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { static final int PORT = 6668; public static void main(String[] args) { /* 分別創建bossGroup和WorkerGroup(兩個執行緒集合,線程組) */ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //server端 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new MyNettyServerHandler()); } }); System.out.println("Server準備好了"); ChannelFuture channelFuture = bootstrap.bind(PORT).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } ``` MyNettyServerHandler.java ``` /** * @author swshawnwu@gmail.com(ShawnWu) */ package netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.EventExecutorGroup; public class MyNettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server ctx"+ctx); ByteBuf buf = (ByteBuf)msg; System.out.println("Client的訊息"+ buf.toString(CharsetUtil.UTF_8)); System.out.println("Client的地址"+ ctx.channel().remoteAddress()); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Client", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } ``` MyNettyClientHandler.java ``` /** * @author swshawnwu@gmail.com(ShawnWu) */ package netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.EventExecutorGroup; public class MyNettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, server", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf)msg; System.out.println("Server回覆訊息:"+ byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("Server Address:"+ ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } ```