From f5586b3a4959600e9b2481704166c912d81d9d1e Mon Sep 17 00:00:00 2001 From: larryTheCoder Date: Fri, 30 Jun 2023 16:14:11 +0800 Subject: [PATCH] Improve ZSTD compression method. Upgrade zstd-jni to v1.5.5-4 Metrics is now available with the hybrid zstd compression enabled. Encoding a packet buffer for compression will now use a composite direct buffer. Increased ZLIB raw inflater decompressed size to 12MB. Inline both compressor and decompressor into one duplex channel handler. --- pom.xml | 2 +- .../compression/ZstdCompressionCodec.java | 136 ++++++++++++++++++ .../decoder/PartialDecompressor.java | 43 ------ .../encoder/DataPackEncoder.java | 60 -------- .../proxytransport/encoder/ZStdEncoder.java | 38 ----- .../impl/TransportChannelInitializer.java | 13 +- 6 files changed, 142 insertions(+), 150 deletions(-) create mode 100644 src/main/java/org/nethergames/proxytransport/compression/ZstdCompressionCodec.java delete mode 100644 src/main/java/org/nethergames/proxytransport/decoder/PartialDecompressor.java delete mode 100644 src/main/java/org/nethergames/proxytransport/encoder/DataPackEncoder.java delete mode 100644 src/main/java/org/nethergames/proxytransport/encoder/ZStdEncoder.java diff --git a/pom.xml b/pom.xml index ad79324..f2b9363 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,7 @@ com.github.luben zstd-jni - 1.5.4-2 + 1.5.5-4 dev.waterdog.waterdogpe diff --git a/src/main/java/org/nethergames/proxytransport/compression/ZstdCompressionCodec.java b/src/main/java/org/nethergames/proxytransport/compression/ZstdCompressionCodec.java new file mode 100644 index 0000000..9505039 --- /dev/null +++ b/src/main/java/org/nethergames/proxytransport/compression/ZstdCompressionCodec.java @@ -0,0 +1,136 @@ +package org.nethergames.proxytransport.compression; + +import com.github.luben.zstd.Zstd; +import dev.waterdog.waterdogpe.ProxyServer; +import dev.waterdog.waterdogpe.network.NetworkMetrics; +import dev.waterdog.waterdogpe.network.PacketDirection; +import dev.waterdog.waterdogpe.network.connection.client.ClientConnection; +import dev.waterdog.waterdogpe.network.connection.codec.BedrockBatchWrapper; +import dev.waterdog.waterdogpe.network.connection.codec.compression.SnappyCompressionCodec; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageCodec; +import io.netty.util.ReferenceCountUtil; +import lombok.AllArgsConstructor; +import org.cloudburstmc.protocol.common.util.Zlib; +import org.nethergames.proxytransport.utils.CompressionType; + +import java.nio.ByteBuffer; +import java.util.List; + +@AllArgsConstructor +public class ZstdCompressionCodec extends MessageToMessageCodec { + public static final String NAME = "compression-codec"; + + private static final SnappyCompressionCodec snappyCompressionCodec = new SnappyCompressionCodec(); + + private final int compressionLevel; + private final ClientConnection connection; + + @Override + protected void encode(ChannelHandlerContext ctx, BedrockBatchWrapper msg, List out) { + CompositeByteBuf buf = ctx.alloc().compositeDirectBuffer(2); + + try { + NetworkMetrics metrics = ctx.channel().attr(NetworkMetrics.ATTRIBUTE).get(); + PacketDirection direction = ctx.channel().attr(PacketDirection.ATTRIBUTE).get(); + + // The batch is already compressed correctly, we can send the buffer straight to the server + if (!msg.isModified() && msg.getCompressed() != null) { + buf.addComponent(true, ctx.alloc().ioBuffer(1).writeByte(msg.getAlgorithm().getBedrockAlgorithm().ordinal())); + buf.addComponent(true, msg.getCompressed().retainedSlice()); + + if (metrics != null) { + metrics.passedThroughBytes(msg.getCompressed().readableBytes(), direction); + } + } else { + // The batch was modified or the wrapper has no compressed data while still retaining + // the uncompressed data. + if ((msg.isModified() || msg.getCompressed() == null) && msg.getUncompressed() == null) { + throw new IllegalArgumentException("BedrockPacket is not encoded."); + } + + msg.setCompressed(encode0(ctx, msg.getUncompressed())); + + buf.addComponent(true, ctx.alloc().ioBuffer(1).writeByte(CompressionType.METHOD_ZSTD.ordinal())); + buf.addComponent(true, msg.getCompressed().retainedSlice()); + + if (metrics != null) { + metrics.compressedBytes(msg.getCompressed().readableBytes(), direction); + } + } + + out.add(buf.retain()); + } catch (Throwable err) { + ProxyServer.getInstance().getLogger().error("An exception were thrown while encoding packet", err); + } finally { + ReferenceCountUtil.release(buf); + } + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf compressed, List out) { + BedrockBatchWrapper msg = BedrockBatchWrapper.newInstance(compressed.readRetainedSlice(compressed.readableBytes()), null); + + try { + msg.setAlgorithm(connection.getPlayer().getCompression()); + switch (connection.getPlayer().getCompression().getBedrockAlgorithm()) { + case ZLIB -> msg.setUncompressed(Zlib.RAW.inflate(msg.getCompressed().slice(), 1024 * 1024 * 12)); + case SNAPPY -> msg.setUncompressed(snappyCompressionCodec.decode0(ctx, msg.getCompressed().slice())); + } + + if (msg.getUncompressed() == null) { + throw new UnsupportedOperationException("The given compression algorithm is not supported by ProxyTransport"); + } + + NetworkMetrics metrics = ctx.channel().attr(NetworkMetrics.ATTRIBUTE).get(); + if (metrics != null) { + PacketDirection direction = ctx.channel().attr(PacketDirection.ATTRIBUTE).get(); + metrics.decompressedBytes(msg.getUncompressed().readableBytes(), direction); + } + + out.add(msg.retain()); + } catch (Throwable err) { + ProxyServer.getInstance().getLogger().error("An exception were thrown while decoding packet", err); + } finally { + ReferenceCountUtil.release(msg); + } + } + + private ByteBuf encode0(ChannelHandlerContext ctx, ByteBuf source) { + ByteBuf direct; + if (!source.isDirect() || source instanceof CompositeByteBuf) { + direct = ctx.alloc().ioBuffer(source.readableBytes()); + direct.writeBytes(source); + } else { + direct = source; + } + + ByteBuf output = ctx.alloc().directBuffer(); + try { + int uncompressedLength = direct.readableBytes(); + int maxLength = (int) Zstd.compressBound(uncompressedLength); + + output.ensureWritable(maxLength); + + int compressedLength; + if (direct.hasMemoryAddress()) { + compressedLength = (int) Zstd.compressUnsafe(output.memoryAddress(), maxLength, direct.memoryAddress() + direct.readerIndex(), uncompressedLength, compressionLevel); + } else { + ByteBuffer sourceNio = direct.nioBuffer(direct.readerIndex(), direct.readableBytes()); + ByteBuffer targetNio = output.nioBuffer(0, maxLength); + + compressedLength = Zstd.compress(targetNio, sourceNio, compressionLevel); + } + + output.writerIndex(compressedLength); + return output.retain(); + } finally { + ReferenceCountUtil.release(output); + if (direct != source) { + ReferenceCountUtil.release(direct); + } + } + } +} diff --git a/src/main/java/org/nethergames/proxytransport/decoder/PartialDecompressor.java b/src/main/java/org/nethergames/proxytransport/decoder/PartialDecompressor.java deleted file mode 100644 index fab02f5..0000000 --- a/src/main/java/org/nethergames/proxytransport/decoder/PartialDecompressor.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.nethergames.proxytransport.decoder; - -import dev.waterdog.waterdogpe.network.connection.client.ClientConnection; -import dev.waterdog.waterdogpe.network.connection.codec.BedrockBatchWrapper; -import dev.waterdog.waterdogpe.network.connection.codec.compression.CompressionAlgorithm; -import dev.waterdog.waterdogpe.network.connection.codec.compression.SnappyCompressionCodec; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.MessageToMessageDecoder; -import lombok.RequiredArgsConstructor; -import org.cloudburstmc.protocol.common.util.Zlib; - -import java.util.List; - -@RequiredArgsConstructor -public class PartialDecompressor extends MessageToMessageDecoder { - public static final String NAME = "decompress"; - private final ClientConnection connection; - private static final SnappyCompressionCodec snappyCompressionCodec = new SnappyCompressionCodec(); - - - @Override - protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf compressed, List list) throws Exception { - ByteBuf decompressed = null; - - compressed.markReaderIndex(); - switch (connection.getPlayer().getCompression().getBedrockAlgorithm()){ - case ZLIB -> decompressed = Zlib.RAW.inflate(compressed, 4 * 1024 * 1024); - case SNAPPY -> decompressed = snappyCompressionCodec.decode0(channelHandlerContext, compressed); - } - - if(decompressed == null){ - throw new UnsupportedOperationException("The given compression algorithm is not supported by ProxyTransport"); - } - - compressed.resetReaderIndex(); - BedrockBatchWrapper wrapper = BedrockBatchWrapper.newInstance(compressed.retain(), decompressed); - wrapper.setAlgorithm(connection.getPlayer().getCompression()); - list.add(wrapper); - - } -} diff --git a/src/main/java/org/nethergames/proxytransport/encoder/DataPackEncoder.java b/src/main/java/org/nethergames/proxytransport/encoder/DataPackEncoder.java deleted file mode 100644 index 7d14025..0000000 --- a/src/main/java/org/nethergames/proxytransport/encoder/DataPackEncoder.java +++ /dev/null @@ -1,60 +0,0 @@ -package org.nethergames.proxytransport.encoder; - -import dev.waterdog.waterdogpe.ProxyServer; -import dev.waterdog.waterdogpe.network.connection.client.ClientConnection; -import dev.waterdog.waterdogpe.network.connection.codec.BedrockBatchWrapper; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; -import lombok.RequiredArgsConstructor; -import org.nethergames.proxytransport.utils.CompressionType; - -import java.util.List; - -@RequiredArgsConstructor -public class DataPackEncoder extends MessageToMessageEncoder { - public static final String NAME = "data-pack-encoder"; - private final ClientConnection clientConnection; - - @Override - protected void encode(ChannelHandlerContext channelHandlerContext, BedrockBatchWrapper wrapper, List out) { - ByteBuf buf = channelHandlerContext.alloc().ioBuffer(); - ByteBuf dir = null; - ByteBuf com = null; - - try { - // The batch was modified or the wrapper has no compressed data while still retaining - // the uncompressed data. - if ((wrapper.isModified() || wrapper.getCompressed() == null) && wrapper.getUncompressed() != null) { - - buf.writeByte(CompressionType.METHOD_ZSTD.ordinal()); - ByteBuf source = wrapper.getUncompressed(); - - if (!source.isDirect() || source instanceof CompositeByteBuf) { - // ZStd-jni needs direct buffers to function properly - // Composite Buffers or indirect buffers will not generate valid NIO ByteBuffers - - dir = channelHandlerContext.alloc().ioBuffer(source.readableBytes()); - dir.writeBytes(source); - - com = ZStdEncoder.compress(dir); - } else { - com = ZStdEncoder.compress(source); - } - - buf.writeBytes(com); - } else if (!wrapper.isModified() && wrapper.getCompressed() != null) { // The batch is already compressed correctly and we can yeet the buffer straight to the server - buf.writeByte(CompressionType.METHOD_ZLIB.ordinal()); - buf.writeBytes(wrapper.getCompressed()); - } - } catch (Throwable t) { - ProxyServer.getInstance().getLogger().error("Error in DataPack Encoding", t); - } finally { - if (com != null) com.release(); - if (dir != null) dir.release(); - } - - out.add(buf); - } -} diff --git a/src/main/java/org/nethergames/proxytransport/encoder/ZStdEncoder.java b/src/main/java/org/nethergames/proxytransport/encoder/ZStdEncoder.java deleted file mode 100644 index 33a691f..0000000 --- a/src/main/java/org/nethergames/proxytransport/encoder/ZStdEncoder.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.nethergames.proxytransport.encoder; - -import com.github.luben.zstd.Zstd; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; - -import java.nio.ByteBuffer; -import java.util.List; - -@ChannelHandler.Sharable -public class ZStdEncoder extends MessageToMessageEncoder { - - public static ByteBuf compress(ByteBuf buf) { - int decompressedSize = buf.readableBytes(); - int compressedSize = (int) Zstd.compressBound(decompressedSize); - - ByteBuf compressed = buf.alloc().directBuffer(compressedSize); - - if (buf.hasMemoryAddress()) { - compressedSize = (int) Zstd.compressUnsafe(compressed.memoryAddress(), compressedSize, buf.memoryAddress() + buf.readerIndex(), decompressedSize, 3); - } else { - ByteBuffer compressedNio = compressed.nioBuffer(0, compressedSize); - ByteBuffer decompressedNio = buf.nioBuffer(buf.readerIndex(), buf.readableBytes()); - - compressedSize = Zstd.compress(compressedNio, decompressedNio, 3); - } - - compressed.writerIndex(compressedSize); - return compressed; - } - - @Override - protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - list.add(compress(byteBuf)); - } -} \ No newline at end of file diff --git a/src/main/java/org/nethergames/proxytransport/impl/TransportChannelInitializer.java b/src/main/java/org/nethergames/proxytransport/impl/TransportChannelInitializer.java index e7624e8..dadcabd 100644 --- a/src/main/java/org/nethergames/proxytransport/impl/TransportChannelInitializer.java +++ b/src/main/java/org/nethergames/proxytransport/impl/TransportChannelInitializer.java @@ -5,8 +5,6 @@ import dev.waterdog.waterdogpe.network.connection.client.ClientConnection; import dev.waterdog.waterdogpe.network.connection.codec.batch.BedrockBatchDecoder; import dev.waterdog.waterdogpe.network.connection.codec.batch.BedrockBatchEncoder; -import dev.waterdog.waterdogpe.network.connection.codec.client.ClientEventHandler; -import dev.waterdog.waterdogpe.network.connection.codec.compression.CompressionAlgorithm; import dev.waterdog.waterdogpe.network.connection.codec.packet.BedrockPacketCodec; import dev.waterdog.waterdogpe.network.serverinfo.ServerInfo; import dev.waterdog.waterdogpe.player.ProxiedPlayer; @@ -18,14 +16,14 @@ import org.cloudburstmc.netty.channel.raknet.RakChannel; import org.cloudburstmc.netty.channel.raknet.config.RakChannelOption; import org.cloudburstmc.netty.channel.raknet.config.RakMetrics; -import org.nethergames.proxytransport.decoder.PartialDecompressor; -import org.nethergames.proxytransport.encoder.DataPackEncoder; +import org.nethergames.proxytransport.compression.ZstdCompressionCodec; import org.nethergames.proxytransport.integration.CustomClientEventHandler; -import static dev.waterdog.waterdogpe.network.connection.codec.initializer.ProxiedSessionInitializer.BATCH_DECODER; -import static dev.waterdog.waterdogpe.network.connection.codec.initializer.ProxiedSessionInitializer.getPacketCodec; +import static dev.waterdog.waterdogpe.network.connection.codec.initializer.ProxiedSessionInitializer.*; public class TransportChannelInitializer extends ChannelInitializer { + private static final int ZSTD_COMPRESSION_LEVEL = 3; + private final ProxiedPlayer player; private final ServerInfo serverInfo; private final Promise promise; @@ -60,8 +58,7 @@ protected void initChannel(Channel channel) { ClientConnection connection = this.createConnection(channel); channel.pipeline() - .addLast(DataPackEncoder.NAME, new DataPackEncoder(connection)) - .addLast(PartialDecompressor.NAME, new PartialDecompressor(connection)) + .addLast(ZstdCompressionCodec.NAME, new ZstdCompressionCodec(ZSTD_COMPRESSION_LEVEL, connection)) .addLast(BedrockBatchDecoder.NAME, BATCH_DECODER) .addLast(BedrockBatchEncoder.NAME, new BedrockBatchEncoder()) .addLast(BedrockPacketCodec.NAME, getPacketCodec(rakVersion));