23、Netty的ReplayingDecoder源码分析与特性解读
1、 ReplayingDecoder[*]ReplayingDecoder S 是指一个枚举,如果不必要指定Void即可
[*]ReplayingDecoder 不必要判断(ByteBuf)中的数量是否足够
public abstract class ReplayingDecoder extends ByteToMessageDecoder {https://p26.toutiaoimg.com/large/pgc-image/4c472f805462416ba92d236f362395cdReplayingDecoder
[*]可见ByteToMessageDecoder的子类。类界说中的泛型 S 是一个用于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等方法中会用到。在简单解码时也可以用java.lang.Void来占位。
[*]与ByteToMessageDecoder不同,该类可以在接收到所必要长度的字节之后再调用decode方法,而不用一遍又一遍的手动检查流中的字节长度
[*]从源码上看ReplayingDecoder重写了ByteToMessageDecoder的callDecode()方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { replayable.setCumulation(in); try { while (in.isReadable()) { int oldReaderIndex = checkpoint = in.readerIndex(); int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear();//清除 //在继续解码之前,检查这个处置惩罚程序是否已被删除。 //如果它被移除,继续在缓冲区上操纵是不安全的。 // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } S oldState = state; int oldInputLength = in.readableBytes(); try { //解码移除再入保护 decodeRemovalReentryProtection(ctx, replayable, out); //在继续循环之前,检查是否删除了这个处置惩罚程序。 //如果它被移除,继续在缓冲区上操纵是不安全的。 // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " + "data or change its state if it did not decode anything."); } else { //以前的数据已被丢弃或导致状态转换。 //也许它还在继续读。 continue; } } } catch (Signal replay) { replay.expect(REPLAY); //在继续循环之前,检查是否删除了此处置惩罚程序。 //如果它被移除,继续在缓冲区上操纵是不安全的。 // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } //返回到这个检查点(或是旧的位置)和重试 int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { } break; } if (oldReaderIndex == in.readerIndex() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }2、ReplayingDecoder如何工作?
[*]ReplayingDecoder传递一个专门的ByteBuf实现,当缓冲区中没有足够的数据时,这个实现会抛出某种类型的错误。在上面的IntegerHeaderFrameDecoder中,您只是假设在调用buf.readInt()时,缓冲区中有4个或更多字节。如果缓冲区中确实有4个字节,它将像您盼望的那样返回整数报头。否则,将引发错误并将控制返回到ReplayingDecoder。如果ReplayingDecoder捕捉到错误,那么它会将缓冲区的readerIndex倒回“初始”位置(即缓冲区的开始),并在缓冲区接收到更多数据时再次调用decode(..)方法。
[*]请注意,ReplayingDecoder总是抛出雷同的缓存错误实例,以避免每次抛出时创建新错误并填充其堆栈跟踪的开销。
3、ReplayingDecoder如何提高性能
[*]幸运的是,使用checkpoint()方法可以明显提高复杂解码器实现的性能。checkpoint()方法更新缓冲区的“初始”位置,以便重新播放解码器将缓冲区的readerIndex回滚到调用checkpoint点()方法的最后位置。
4、ReplayingDecoder使用枚举Enum调用 checkpoint(T)
[*]即使您可以只使用checkpoint()方法并自己管明白码器的状态,但是管明白码器状态的最简单方法是创建一个表示解码器当前状态的Enum类型,并在状态发生变化时调用checkpoint(T)方法。根据要解码的消息的复杂性,可以有任意多个状态:
public enum MyDecoderState { READ_LENGTH, READ_CONTENT; } public class IntegerHeaderFrameDecoder extends ReplayingDecoder { private int length; public IntegerHeaderFrameDecoder() { // Set the initial state. super(MyDecoderState.READ_LENGTH); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) throws Exception { switch (state()) { case READ_LENGTH: length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: ByteBuf frame = buf.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); out.add(frame); break; default: throw new Error("Shouldn't reach here."); } } }5、ReplayingDecoder调用没有参数的checkpoint()方法
[*]管明白码器状态的另一种方法是自己管理它。
public class IntegerHeaderFrameDecoder extends ReplayingDecoder {}6、 ReplayingDecoder用管道中的另一个解码器替换一个解码器
[*]如果您要编写一个协议多路复用器,您可能必要用另一个重放解码器(ByteToMessageDecoder或MessageToMessageDecoder,实际的协议解码器)替换ReplayingDecoder(协议检测器)。仅仅通过调用ChannelPipeline是不可能实现这一点的。替换(ChannelHandler, String, ChannelHandler),但必要一些额外的步骤:
public class FirstDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx,ByteBuf buf, List out) { ... // Decode the first message Object firstMessage = ...; // Add the second decoder ctx.pipeline().addLast("second", new SecondDecoder()); if (buf.isReadable()) { // Hand off the remaining data to the second decoder out.add(firstMessage); out.add(buf.readBytes(super.actualReadableBytes())); } else { // Nothing to hand off out.add(firstMessage); } // Remove the first decoder (me) ctx.pipeline().remove(this); }} 7、开发过程中编写解码器与编码器的发起(注意点)
[*]发起要么解码器,要么编码器,遵循单一原则,不会那么困扰,方便开发与维护
页:
[1]