Netty自带编解码器及自定义编解码器

上一篇文章Netty应用层处理粘包以及半包记录了使用Netty处理TCP粘包及半包。包含定长补齐,特殊字符分割,变长协议三种方案,并且使用了Netty自带的FixedLengthFrameDecoder定长解码器,StringDecoder字符串解码器,DelimiterBasedFrameDecoder特殊字符解码器,LengthFieldBasedFrameDecoder变长解码器LengthFieldPrepender编码器。本篇文章我们记录除了这些编解码器之外的Netty编解码器及自定义编解码器。针对Netty自带的编解码器,我们持续更新。


先看看Netty编解码依赖,如上图所示,Netty将编解码器分为为了13个种类,编解码器分类如下

  • Base64编解码器
  • byte字节编解码器
  • 解压缩编解码器
  • json编解码器
  • marshalling编解码器
  • protobuf编解码器
  • Java序列化编解码器
  • String编解码器
  • Xml编解码器
  • 定长编解码器
  • 变长编解码器
  • 特殊字符编解码器
  • dns类型编解码器
  • haproxy编解码器
  • http编解码器
  • http2编解码器
  • memcache编解码器
  • mqtt编解码器
  • redis编解码器
  • smtp编解码器
  • socks编解码器
  • stomp编解码器

上面的所有编解码器中,String编解码器,定长编解码器,变长编解码器,特殊字符编解码器可以查看Netty应用层处理粘包以及半包。剩余的编解码器我们一一尝试。
因为Netty最原始的数据是通过ByteBuf包装,实现不同的编解码器实际上就是将ByteBuf转换成对应的对象即可。

Base64编解码器

Base64是一种编码方式,无需任何条件即可进行编码与解码,所以不要误用于加密。

  • Base64主要的应用场景如前端内置小图标,直接使用Base64编码即可展示,不用从服务器加载资源
  • 还有在文件传输领域也比较常见,比如邮箱发送附件,文件上传等。

上一篇文章我们使用指定分割符处理粘包与半包问题,我们使用了$_,如果应用层数据包本身就包含这串字符怎么办呢?这里我们就可以使用Base64编码,先将数据进行Base64编码,然后在服务端进行解码就可以了。下面是Base64编码表

可以看出如果使用Base64编码,我们可选的特殊字符大大增加,并且永远不会出现分隔符冲突问题。现在我们重构一下之前的特殊字符分割,添加Base64编码解码功能

服务端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class Base64Server {

public void run(int port){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(Delimiter.DELIMITER_ONE.getBytes(StandardCharsets.UTF_8));
// 添加特殊分隔符解码器
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535,delimiter));
// 将bytebuf转为string
ch.pipeline().addLast(new StringDecoder());
// 将string解码
ch.pipeline().addLast(new Base64Decoder());
// 处理接收到的string消息
ch.pipeline().addLast(new Base64ServerHandler());
// 如果服务端需要向客户端发送数据,则在最后添加编码器
ch.pipeline().addLast(new Base64Encoder());
}
});
ChannelFuture future = null;
try {
future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}

}
}

// 自定义handler只是单纯的打印数据

public class Base64ServerHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = Logger.getLogger(Base64ServerHandler.class.getName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String data = (String) msg;
logger.info("服务端接收到数据:"+data);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 客户端添加的消息处理器基本与服务端一致
public class Base64Client {

public void run(String host,int port){
EventLoopGroup rw = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(rw).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(Delimiter.DELIMITER_ONE.getBytes(StandardCharsets.UTF_8));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65536,delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new Base64Decoder());
ch.pipeline().addLast(new Base64ClientHandler());
ch.pipeline().addLast(new Base64Encoder());
}
});
ChannelFuture future = null;
try {
future = bootstrap.connect(host,port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

// 连接成功后向服务端发送100次消息

public class Base64ClientHandler extends ChannelInboundHandlerAdapter {

private final Logger logger = Logger.getLogger(Base64ClientHandler.class.getName());
private final AtomicInteger counter = new AtomicInteger(0);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0;i<100;i++){
ByteBuf buf = Unpooled.copiedBuffer(("李四"+ Delimiter.DELIMITER_ONE).getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buf);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String buf = (String) msg;
logger.info("客户端收到消息: "+buf +" counter-------"+counter.incrementAndGet());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.info(cause.getMessage());
ctx.close();
}
}

运行日志如下:

作者

Labradors

发布于

2022-06-06

更新于

2022-06-06

许可协议

评论