TCP
以流的形式传输,在上一章,我们讲了粘包和拆包,以及LineBaseFrameDecoder
使用和源码探讨,接下来讲讲Netty
为我们实现的其它解码器…..
TCP
以流的方式进行数据传输,上层的应用为了对消息进行区分,往往采用如下方式
- 固定消息长度,累计读取到长度和定长
LEN
的报文后,就认为读取到了个完整的消息,然后将计数器位置重置在读取下一个报文内容
- 将回车换行符作为消息结束符
\r\n
,列如FTP
协议,这种方式在文本中应用比较广泛
- 将特殊分隔符作为消息结束符标志位,回车换行符就是一个特殊结束分隔符(
DelimiterBasedFrameDecoder
)
- 通过在消息头定义一个长度字段来标示消息的总长度(
FixedLengthFrameDecoder
)
Netty
对以上4种做个统一抽象封装,提供了四种不同解码器来解决对应问题,使用起来也非常的方便,了解了它们,我们就不需要自己对读取的报文人工解码,也不需要考虑TCP
粘包和拆包的问题了…
Delimiter自定义分隔符
我将公共的部分做了一层抽离,定义成常量方便调用
1 2 3 4 5 6 7
| public interface EchoConstant { String SEPARATOR = "$_"; Integer ECHO_DELIMITER_PORT = 4040; Integer ECHO_LENGTH_PORT = 5050; String HOST = "127.0.0.1"; Integer FRAME_LENGTH = 10; }
|
定义EchoDelimiterServer
,毫无疑问大部分代码和以前类似,区别是多了一个日志输出以及DelimiterBasedFrameDecoder
的使用
划重点:在做开发调试的时候,我们可以使用Netty
为我们提供的LoggingHandler
输出日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public static void bind(int port) { EventLoopGroup masterGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(masterGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(EchoConstant.SEPARATOR.getBytes()); channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new EchoServerHandler()); } }); System.out.println("绑定端口,同步等待成功......"); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); System.out.println("等待服务端监听端口关闭......"); } catch (Exception e) { e.printStackTrace(); } finally { masterGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); System.out.println("优雅退出释放线程池......"); } }
|
1 2 3 4 5 6 7 8
| @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("EchoDelimiterServer 接收到的消息 :" + body + "; 当前统计:" + ++counter); body = body + EchoConstant.SEPARATOR; ByteBuf echo = Unpooled.copiedBuffer(body.getBytes()); ctx.writeAndFlush(echo); }
|
定义EchoDelimiterClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public static void connect(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); ChannelFuture future = null; try { bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(EchoConstant.SEPARATOR.getBytes()); channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new EchoClientHandler());
} }); future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } }
|
创建EchoClientHandler
继承ChannelHandlerAdapter
,重写读取和写出事件
1 2 3 4 5 6 7 8 9 10 11 12
| @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("EchoDelimiterClient 接收到的消息 :" + body + "; 当前统计:" + ++counter); }
|
试验一把
分别启动EchoDelimiterServer
和EchoDelimiterClient
,输出如下日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| 绑定端口,同步等待成功...... 九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0x3b1849e8] REGISTERED 九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler bind 信息: [id: 0x3b1849e8] BIND: 0.0.0.0/0.0.0.0:4040 九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0x3b1849e8, /0:0:0:0:0:0:0:0:4040] ACTIVE 九月 04, 2017 10:41:33 下午 io.netty.handler.logging.LoggingHandler channelRead 信息: [id: 0x3b1849e8, /0:0:0:0:0:0:0:0:4040] RECEIVED: [id: 0xa45511cd, /127.0.0.1:50226 => /127.0.0.1:4040] EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:1 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:2 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:3 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:4 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:5 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:6 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:7 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:8 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:9 EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:10
------------------------------------------------------------------------------------------------
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:1 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:2 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:3 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:4 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:5 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:6 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:7 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:8 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:9 EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:10
|
FixedLength指定消息长度
FixedLengthFrameDecoder
的方式比较极端,就是解析固定长度的报文消息,举个例子:假设我的报文长度为50
,解析长度为30
,那么这个数据包会被拆分成2
次来解析,反之亦然…..
创建EchoLengthServer
,重写初始化通道与读取事件
1 2 3 4 5 6 7 8 9 10 11 12
| @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new FixedLengthFrameDecoder(EchoConstant.FRAME_LENGTH)); channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new EchoLengthServer.EchoServerHandler()); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("EchoLengthServer 接收到的消息 :" + body + "; 当前统计:" + ++counter); }
|
创建EchoLengthClient
,重写初始化通道,写出与读取事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new FixedLengthFrameDecoder(EchoConstant.FRAME_LENGTH)); channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new EchoLengthServer.EchoServerHandler()); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("EchoLengthClient 接收到的消息 :" + body + "; 当前统计:" + ++counter); }
|
试验一把
分别启动EchoLengthServer
和EchoLengthClient
,输出如下日志
1 2 3 4
| 绑定端口,同步等待成功...... EchoLengthServer 接收到的消息 :EchoLength; 当前统计:1 EchoLengthServer 接收到的消息 :Client Hi; 当前统计:2 EchoLengthServer 接收到的消息 : .Welcome ; 当前统计:3
|
警告: 前面说过,如果切割内容过长或者过断都会出现拆包或者粘包情况,所以这种方式需要根据具体业务需求来…..
总结
由此可以看到DelimiterBasedFrameDecoder
用于对使用分割符结尾的消息进行自动解码,FixedLengthFrameDecoder
用于对固定长度的消息进行自动解码,有了以上两种解码器在结合其它的解码器,如(StringDecoder
),可以轻松完成大部分消息的自动解码,且无需考虑TCP粘包/拆包
导致的半读问题,极大的提升了开发效率…..
- 说点什么
全文代码:https://git.oschina.net/battcn/battcn-netty/tree/master/Chapter5-1/battcn-netty-5-1-1
- 个人QQ:1837307557
- battcn开源群(适合新手):391619659
微信公众号:battcn
(欢迎调戏)