在上一篇文章中介绍了传统I/O编程 的弊端,本章将概述NIO
的由来和和一些基本概念……
类库简介 2002
年的时候,Sum
公司推出了JDK1.4
并且新增了NIO
的类库,弥补了原来同步阻塞I/O
带来的不足,官方称之为New I/O
,寓意指新的I/O
编程模型,但是由于旧版的Block I/O
在民间更喜欢称它为Non Block I/O
(非阻塞I/O)编程模型,在NIO
的类库中,将原本java.net.Socket
以及 java.net.ServerSocket
分别升级成 java.nio.SocketChannel
和 java.nio.ServerSocketChannel
,它们都支持阻塞与非阻塞模式,前者性能与可靠性较差,后者却恰恰相反,在开发过程中可以选取适合自己的模式,一般来说,低负载、低并发的应用程序可以选择同步阻塞IO以降低编程复杂度。但是对于高负载、高并发的网络应用,需要使用NIO的非阻塞模式进行开发….
Buffer Buffer
是一个含读写数据操的作对象,在NIO
库中,所有的对象都是用缓冲处理的,读写数据操作时都是通过缓冲区来处理,实际上它是一个数组,但通常它是一个字节数组(ByteBuffer),也可以使用其它种类的数组,但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据结构化访问及维护读写位置(limit)等信息…
Channel Channel
是一个全双工
的通道(同时支持双向传输,在BIO
中都是单向流,即InputStream
和 OutputStream
),因为是双向的,所以它可以更好的映射底层操作系统的API,特别是在UNIX网络编程模型中,底层操作系统的通道都是双全工的,同时支持读写…
Selector Selector
是NIO
中的基础,对NIO
编程至关重要,它是一个多路复用器,提供选择已经准备就绪的任务功能,会不断轮训注册在它上面的Channel
,如果某个Channel
上面有新的TCP请求接入,它就会处于就绪状态,供Selector
轮训出来,然后可与通过SelectorKey
获取就绪的Channel
集合,从而进行I/O
操作…
异步非阻塞服务端实现
注意事项
与Selector
使用的 Channel
必须处于非阻塞模式
每次使用Selector
时应该先判断下Selector
是否已经被关闭,否则容易出现java.nio.channels.ClosedSelectorException
错误
1 2 3 4 5 public static void main (String[] args) { int port = 4040 ; MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer,"NIO-MultiplexerTimeServer-1" ).start(); }
通过TimeServer
的时序图,我们来看下实现的异步非阻塞的TimeServer
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel serverSocketChannel; public MultiplexerTimeServer (int port) { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); serverSocketChannel.bind(new InetSocketAddress(port), 1024 ); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("TimeServer启动成功,当前监听的端口 : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1 ); } } @Override public void run () { while (true ) { try { if (!selector.isOpen()) { System.out.println("selector is closed" ); break ; } int select = selector.select(); if (select > 0 ) { Set<SelectionKey> selectionKeys = this .selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null ) { key.cancel(); if (key.channel() != null ) key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } } private void handleInput (SelectionKey key) throws IOException { if (key.isValid()) { if (key.isAcceptable()) { SocketChannel accept = ((ServerSocketChannel) key.channel()).accept(); accept.configureBlocking(false ); accept.register(this .selector, SelectionKey.OP_READ); } if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024 ); int read = sc.read(buffer); if (read > 0 ) { buffer.flip(); byte [] bytes = new byte [buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes, "UTF-8" ); System.out.println("TimeServer 接收到的消息 :" + msg); doWrite(sc, "挽歌君老帅了..." ); } else if (read < 0 ) { key.cancel(); sc.close(); } else { } } } } private void doWrite (SocketChannel channel, String resp) throws IOException { if (resp != null && resp.trim().length() > 0 ) { byte [] bytes = resp.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
select()阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
selectNow()不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。
select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。
如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。
如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
客户端实现
1 2 3 4 5 6 7 public class TimeClient { public static void main (String[] args) { int port = 4040 ; new Thread(new TimeClientHandler("127.0.0.1" ,port),"NIO-MultiplexerTimeServer-1" ).start(); } }
与之前的TimeClient
不同的是这次通过TimeClientHandler
线程来处理异步连接和读写操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public class TimeClientHandler implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandler (String host, int port) { this .host = host == null ? "127.0.0.1" : host; this .port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false ); } catch (IOException e) { e.printStackTrace(); System.exit(1 ); } } @Override public void run () { try { doConnect(); } catch (Exception e) { e.printStackTrace(); System.exit(1 ); } while (!stop) { try { if (selector.select() > 0 ) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); try { handleInput(selectionKey); } catch (Exception e) { if (selectionKey != null ) { selectionKey.cancel(); if (selectionKey.channel() != null ) { selectionKey.channel().close(); } } } } } } catch (IOException e) { System.exit(1 ); } } if (selector != null ) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput (SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel channel = (SocketChannel) key.channel(); if (key.isConnectable()) { if (channel.finishConnect()) { channel.register(this .selector, SelectionKey.OP_READ); doWrite(channel); } else { System.exit(1 ); } ByteBuffer readBuffer = ByteBuffer.allocate(1024 ); int read = channel.read(readBuffer); if (read > 0 ) { readBuffer.flip(); byte [] bytes = new byte [readBuffer.remaining()]; readBuffer.get(bytes); String msg = new String(bytes, "UTF-8" ); System.out.println("TimeClient 接收到的消息:" + msg); } this .stop = true ; } } } private void doConnect () throws IOException { if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(this .selector, SelectionKey.OP_READ); doWrite(socketChannel); } else { socketChannel.register(this .selector, SelectionKey.OP_CONNECT); } } private void doWrite (SocketChannel channel) throws IOException { byte [] req = "挽歌君帅不帅" .getBytes(); ByteBuffer buffer = ByteBuffer.allocate(req.length); buffer.put(req); buffer.flip(); channel.write(buffer); if (!buffer.hasRemaining()) { System.out.println("消息发送成功" ); } } }
测试 分别启动TimeServer
和 TimeClient
服务端
1 TimeServer 接收到的消息 :挽歌君帅不帅
客户端
1 2 消息发送成功 TimeClient 接收到的消息:挽歌君老帅了...
- 遗留问题 1.该处存在一个小BUG,客户端向服务端发送请求,服务端第一次数据回写客户端未接收到,但是后续的请求都有接收到,这应该是异步请求导致
- 说点什么 参考:Netty权威指南
全文代码:https://git.oschina.net/battcn/battcn-netty/tree/master/Chapter2-1/battcn-netty-nio
个人QQ:1837307557
battcn开源群(适合新手):391619659
微信公众号:battcn
(欢迎调戏)