From ae95f6d8ab1b0472df1dd2d869b4ea8aea32de6a Mon Sep 17 00:00:00 2001 From: Baoyi Chen Date: Wed, 28 Oct 2020 12:39:12 +0800 Subject: [PATCH] Fix issue #5499 this PR let the ByteAccumulator recyclable. after invoke ByteAccumulator.transferTo method we can invoke ByteAccumulator.recycle method to reuse byte[] via ByteAccumulator.newByteArray method Signed-off-by: Baoyi Chen --- .../extensions/compress/ByteAccumulator.java | 51 +++++++++++-- .../compress/CompressExtension.java | 16 +++- .../compress/DeflateFrameExtension.java | 5 +- .../compress/PerMessageDeflateExtension.java | 3 +- .../compress/ByteAccumulatorTest.java | 73 +++++++++++++++++++ 5 files changed, 133 insertions(+), 15 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 3b56753e7b8e..02bd0c8821e1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -27,9 +27,25 @@ public class ByteAccumulator { - private final List chunks = new ArrayList<>(); + private class ByteArray + { + private byte[] buf; + private int offset; + private int length; + + private ByteArray(byte[] buf, int offset, int length) + { + this.buf = buf; + this.offset = offset; + this.length = length; + } + } + + private List prevChunks = null; + private List nextChunks = new ArrayList<>(); private final int maxSize; private int length = 0; + private int index; public ByteAccumulator(int maxOverallBufferSize) { @@ -43,11 +59,7 @@ public void copyChunk(byte[] buf, int offset, int length) String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); throw new MessageTooLargeException(err); } - - byte[] copy = new byte[length - offset]; - System.arraycopy(buf, offset, copy, 0, length); - - chunks.add(copy); + nextChunks.add(new ByteArray(buf, offset, length)); this.length += length; } @@ -56,6 +68,21 @@ public int getLength() return length; } + byte[] newByteArray(int size) + { + byte[] bytes; + if (prevChunks != null && prevChunks.size() > index) + { + bytes = prevChunks.get(index).buf; + } + else + { + bytes = new byte[size]; + } + index++; + return bytes; + } + public void transferTo(ByteBuffer buffer) { if (buffer.remaining() < length) @@ -65,10 +92,18 @@ public void transferTo(ByteBuffer buffer) } int position = buffer.position(); - for (byte[] chunk : chunks) + for (ByteArray chunk : nextChunks) { - buffer.put(chunk, 0, chunk.length); + buffer.put(chunk.buf, chunk.offset, chunk.length); } BufferUtil.flipToFlush(buffer, position); } + + void recycle() + { + index = 0; + length = 0; + prevChunks = nextChunks; + nextChunks = new ArrayList<>(); + } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 6952ccb67ea0..633d13815c1e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -46,6 +46,11 @@ public abstract class CompressExtension extends AbstractExtension protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF}; protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES); private static final Logger LOG = Log.getLogger(CompressExtension.class); + + /** + * thread local accumulator + */ + protected ThreadLocal accumulator = ThreadLocal.withInitial(() -> newByteAccumulator()); /** * Never drop tail bytes 0000FFFF, from any frame type @@ -185,10 +190,10 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da { return; } - byte[] output = new byte[DECOMPRESS_BUF_SIZE]; + Inflater inflater = getInflater(); - + while (buf.hasRemaining() && inflater.needsInput()) { if (!supplyInput(inflater, buf)) @@ -199,9 +204,12 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da } int read; - while ((read = inflater.inflate(output)) >= 0) + + while (true) { - if (read == 0) + byte[] output = accumulator.newByteArray(DECOMPRESS_BUF_SIZE); + read = inflater.inflate(output); + if (read <= 0) { if (LOG.isDebugEnabled()) LOG.debug("Decompress: read 0 {}", toDetail(inflater)); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java index 0476c0fcc441..da14dffd3701 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java @@ -47,7 +47,7 @@ int getTailDropMode() { return TAIL_DROP_ALWAYS; } - + @Override public void incomingFrame(Frame frame) { @@ -63,10 +63,11 @@ public void incomingFrame(Frame frame) try { - ByteAccumulator accumulator = newByteAccumulator(); + ByteAccumulator accumulator = this.accumulator.get(); decompress(accumulator, frame.getPayload()); decompress(accumulator, TAIL_BYTES_BUF.slice()); forwardIncoming(frame, accumulator); + accumulator.recycle(); } catch (DataFormatException e) { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java index 37482f8bd678..d7620b5421d1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java @@ -78,7 +78,7 @@ public void incomingFrame(Frame frame) throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame"); } - ByteAccumulator accumulator = newByteAccumulator(); + ByteAccumulator accumulator = this.accumulator.get(); try { @@ -90,6 +90,7 @@ public void incomingFrame(Frame frame) } forwardIncoming(frame, accumulator); + accumulator.recycle(); } catch (DataFormatException e) { diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java index 5aa41765e070..2ef37c4c279c 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java @@ -91,4 +91,77 @@ public void testCopyChunkNotEnoughSpace() MessageTooLargeException e = assertThrows(MessageTooLargeException.class, () -> accumulator.copyChunk(world, 0, world.length)); assertThat(e.getMessage(), containsString("too large for configured max")); } + + @Test + public void testRecycle() + { + ByteAccumulator accumulator = new ByteAccumulator(10_000); + ByteBuffer out0 = ByteBuffer.allocate(200); + ByteBuffer out1 = ByteBuffer.allocate(200); + { + // 1 + byte[] bytes = accumulator.newByteArray(10); + byte[] hello = "Hello".getBytes(UTF_8); + System.arraycopy(hello, 0, bytes, 0, hello.length); + accumulator.copyChunk(bytes, 0, hello.length); + + // 2 + bytes = accumulator.newByteArray(10); + byte[] space = " ".getBytes(UTF_8); + System.arraycopy(space, 0, bytes, 0, space.length); + accumulator.copyChunk(bytes, 0, space.length); + + // 3 + bytes = accumulator.newByteArray(10); + byte[] world = "World".getBytes(UTF_8); + System.arraycopy(world, 0, bytes, 0, world.length); + accumulator.copyChunk(bytes, 0, world.length); + + assertThat("Length", accumulator.getLength(), is(hello.length + space.length + world.length)); + + accumulator.transferTo(out0); + + // reuse that byte[] + accumulator.recycle(); + } + + { + // 1 + byte[] bytes = accumulator.newByteArray(10); + byte[] olleh = "olleH".getBytes(UTF_8); + System.arraycopy(olleh, 0, bytes, 0, olleh.length); + accumulator.copyChunk(bytes, 0, olleh.length); + + // 2 + bytes = accumulator.newByteArray(10); + byte[] space = " ".getBytes(UTF_8); + System.arraycopy(space, 0, bytes, 0, space.length); + accumulator.copyChunk(bytes, 0, space.length); + + // 3 + bytes = accumulator.newByteArray(10); + byte[] dlrow = "dlroW".getBytes(UTF_8); + System.arraycopy(dlrow, 0, bytes, 0, dlrow.length); + accumulator.copyChunk(bytes, 0, dlrow.length); + + // 4 + bytes = accumulator.newByteArray(10); + byte[] done = " enoD".getBytes(UTF_8); + System.arraycopy(done, 0, bytes, 0, done.length); + accumulator.copyChunk(bytes, 0, done.length); + + assertThat("Length", accumulator.getLength(), is(olleh.length + space.length + dlrow.length + done.length)); + + accumulator.transferTo(out1); + + // reuse that byte[] + accumulator.recycle(); + } + + String result0 = BufferUtil.toUTF8String(out0); + assertThat("result", result0, is("Hello World")); + + String result1 = BufferUtil.toUTF8String(out1); + assertThat("result", result1, is("olleH dlroW enoD")); + } }