From 347f4def9a17e194c2dac650ef84eb28782049a9 Mon Sep 17 00:00:00 2001 From: Andrey Pleskach Date: Tue, 13 Jun 2023 21:25:30 +0200 Subject: [PATCH] Remove COMPRESSOR variable from CompressorFactory (#7907) Removed a deprecated COMPRESSOR variable from CompressorFactory and use DEFLATE_COMPRESSOR instea Signed-off-by: Andrey Pleskach Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../cluster/coordination/CompressedStreamUtils.java | 2 +- .../common/compress/CompressedXContent.java | 4 ++-- .../opensearch/common/compress/CompressorFactory.java | 11 ++++++----- .../repositories/blobstore/BlobStoreRepository.java | 4 ++-- .../transport/CompressibleBytesOutputStream.java | 2 +- .../opensearch/transport/TransportDecompressor.java | 8 +++++--- .../org/opensearch/transport/TransportLogger.java | 2 +- .../index/mapper/BinaryFieldMapperTests.java | 2 +- .../transport/CompressibleBytesOutputStreamTests.java | 10 ++++++---- .../transport/TransportDecompressorTests.java | 9 ++++++--- 11 files changed, 32 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b76adc925210..f88b4c4b18a43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove LegacyESVersion.V_7_10_ Constants ([#5018](https://github.com/opensearch-project/OpenSearch/pull/5018)) - Remove Version.V_1_ Constants ([#5021](https://github.com/opensearch-project/OpenSearch/pull/5021)) - Remove custom Map, List and Set collection classes ([#6871](https://github.com/opensearch-project/OpenSearch/pull/6871)) +- Remove COMPRESSOR variable from CompressorFactory and use DEFLATE_COMPRESSOR instead ([7907](https://github.com/opensearch-project/OpenSearch/pull/7907)) ### Fixed - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java index 57359f553b5a5..97a93a8fa99d4 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java @@ -37,7 +37,7 @@ public final class CompressedStreamUtils { public static BytesReference createCompressedStream(Version version, CheckedConsumer outputConsumer) throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { + try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream))) { stream.setVersion(version); outputConsumer.accept(stream); } diff --git a/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java b/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java index 462943e974f1e..f8c4dc3346b8d 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java +++ b/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java @@ -85,7 +85,7 @@ private CompressedXContent(byte[] compressed, int crc32) { */ public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream); + OutputStream compressedStream = CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream); CRC32 crc32 = new CRC32(); OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32); try (XContentBuilder builder = XContentFactory.jsonBuilder(checkedStream)) { @@ -113,7 +113,7 @@ public CompressedXContent(BytesReference data) throws IOException { this.bytes = BytesReference.toBytes(data); this.crc32 = crc32(uncompressed()); } else { - this.bytes = BytesReference.toBytes(CompressorFactory.COMPRESSOR.compress(data)); + this.bytes = BytesReference.toBytes(CompressorFactory.defaultCompressor().compress(data)); this.crc32 = crc32(data); } assertConsistent(); diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java b/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java index 23a8cac349bd2..4b8d0cfea57f0 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java +++ b/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java @@ -48,9 +48,6 @@ public class CompressorFactory { public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor(); - @Deprecated - public static final Compressor COMPRESSOR = DEFLATE_COMPRESSOR; - public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor(); public static final Compressor NONE_COMPRESSOR = new NoneCompressor(); @@ -59,14 +56,18 @@ public static boolean isCompressed(BytesReference bytes) { return compressor(bytes) != null; } + public static Compressor defaultCompressor() { + return DEFLATE_COMPRESSOR; + } + @Nullable public static Compressor compressor(BytesReference bytes) { - if (COMPRESSOR.isCompressed(bytes)) { + if (DEFLATE_COMPRESSOR.isCompressed(bytes)) { // bytes should be either detected as compressed or as xcontent, // if we have bytes that can be either detected as compressed or // as a xcontent, we have a problem assert XContentHelper.xContentType(bytes) == null; - return COMPRESSOR; + return DEFLATE_COMPRESSOR; } else if (ZSTD_COMPRESSOR.isCompressed(bytes)) { assert XContentHelper.xContentType(bytes) == null; return ZSTD_COMPRESSOR; diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index b67ffc8107d96..13550117a24a4 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1720,7 +1720,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) { if (cacheRepositoryData && bestEffortConsistency == false) { final BytesReference serialized; try { - serialized = CompressorFactory.COMPRESSOR.compress(updated); + serialized = CompressorFactory.defaultCompressor().compress(updated); final int len = serialized.length(); if (len > ByteSizeUnit.KB.toBytes(500)) { logger.debug( @@ -1756,7 +1756,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) { } private RepositoryData repositoryDataFromCachedEntry(Tuple cacheEntry) throws IOException { - try (InputStream input = CompressorFactory.COMPRESSOR.threadLocalInputStream(cacheEntry.v2().streamInput())) { + try (InputStream input = CompressorFactory.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) { return RepositoryData.snapshotsFromXContent( XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input), cacheEntry.v1() diff --git a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java index 6877d9679eb81..58cbc406612f4 100644 --- a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java +++ b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java @@ -68,7 +68,7 @@ final class CompressibleBytesOutputStream extends StreamOutput { this.bytesStreamOutput = bytesStreamOutput; this.shouldCompress = shouldCompress; if (shouldCompress) { - this.stream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); + this.stream = CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); } else { this.stream = bytesStreamOutput; } diff --git a/server/src/main/java/org/opensearch/transport/TransportDecompressor.java b/server/src/main/java/org/opensearch/transport/TransportDecompressor.java index 7690bfdf35d8a..6ecb1dba7ad16 100644 --- a/server/src/main/java/org/opensearch/transport/TransportDecompressor.java +++ b/server/src/main/java/org/opensearch/transport/TransportDecompressor.java @@ -37,6 +37,7 @@ import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.compress.Compressor; import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.recycler.Recycler; import org.opensearch.common.util.PageCacheRecycler; @@ -69,7 +70,8 @@ public TransportDecompressor(PageCacheRecycler recycler) { public int decompress(BytesReference bytesReference) throws IOException { int bytesConsumed = 0; if (hasReadHeader == false) { - if (CompressorFactory.COMPRESSOR.isCompressed(bytesReference) == false) { + final Compressor compressor = CompressorFactory.defaultCompressor(); + if (compressor.isCompressed(bytesReference) == false) { int maxToRead = Math.min(bytesReference.length(), 10); StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead) .append("] content bytes out of [") @@ -85,7 +87,7 @@ public int decompress(BytesReference bytesReference) throws IOException { throw new IllegalStateException(sb.toString()); } hasReadHeader = true; - int headerLength = CompressorFactory.COMPRESSOR.headerLength(); + int headerLength = compressor.headerLength(); bytesReference = bytesReference.slice(headerLength, bytesReference.length() - headerLength); bytesConsumed += headerLength; } @@ -135,7 +137,7 @@ public int decompress(BytesReference bytesReference) throws IOException { } public boolean canDecompress(int bytesAvailable) { - return hasReadHeader || bytesAvailable >= CompressorFactory.COMPRESSOR.headerLength(); + return hasReadHeader || bytesAvailable >= CompressorFactory.defaultCompressor().headerLength(); } public boolean isEOS() { diff --git a/server/src/main/java/org/opensearch/transport/TransportLogger.java b/server/src/main/java/org/opensearch/transport/TransportLogger.java index c97b35a150e91..1f7facbca49ec 100644 --- a/server/src/main/java/org/opensearch/transport/TransportLogger.java +++ b/server/src/main/java/org/opensearch/transport/TransportLogger.java @@ -179,7 +179,7 @@ private static String format(TcpChannel channel, InboundMessage message, String private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException { if (TransportStatus.isCompress(status) && streamInput.available() > 0) { try { - return new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(streamInput)); + return new InputStreamStreamInput(CompressorFactory.defaultCompressor().threadLocalInputStream(streamInput)); } catch (IllegalArgumentException e) { throw new IllegalStateException("stream marked as compressed, but is missing deflate header"); } diff --git a/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java b/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java index 2ebbb5c841cbd..878fd48e3fb02 100644 --- a/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java @@ -119,7 +119,7 @@ public void testStoredValue() throws IOException { // case 2: a value that looks compressed: this used to fail in 1.x BytesStreamOutput out = new BytesStreamOutput(); - try (OutputStream compressed = CompressorFactory.COMPRESSOR.threadLocalOutputStream(out)) { + try (OutputStream compressed = CompressorFactory.defaultCompressor().threadLocalOutputStream(out)) { new BytesArray(binaryValue1).writeTo(compressed); } final byte[] binaryValue2 = BytesReference.toBytes(out.bytes()); diff --git a/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java b/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java index 643b6665cd659..1c300bc5e2adf 100644 --- a/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java +++ b/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java @@ -56,7 +56,7 @@ public void testStreamWithoutCompression() throws IOException { // Closing compression stream does not close underlying stream stream.close(); - assertFalse(CompressorFactory.COMPRESSOR.isCompressed(bytesRef)); + assertFalse(CompressorFactory.defaultCompressor().isCompressed(bytesRef)); StreamInput streamInput = bytesRef.streamInput(); byte[] actualBytes = new byte[expectedBytes.length]; @@ -83,9 +83,11 @@ public void testStreamWithCompression() throws IOException { BytesReference bytesRef = stream.materializeBytes(); stream.close(); - assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef)); + assertTrue(CompressorFactory.defaultCompressor().isCompressed(bytesRef)); - StreamInput streamInput = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bytesRef.streamInput())); + StreamInput streamInput = new InputStreamStreamInput( + CompressorFactory.defaultCompressor().threadLocalInputStream(bytesRef.streamInput()) + ); byte[] actualBytes = new byte[expectedBytes.length]; streamInput.readBytes(actualBytes, 0, expectedBytes.length); @@ -108,7 +110,7 @@ public void testCompressionWithCallingMaterializeFails() throws IOException { stream.write(expectedBytes); StreamInput streamInput = new InputStreamStreamInput( - CompressorFactory.COMPRESSOR.threadLocalInputStream(bStream.bytes().streamInput()) + CompressorFactory.defaultCompressor().threadLocalInputStream(bStream.bytes().streamInput()) ); byte[] actualBytes = new byte[expectedBytes.length]; EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length)); diff --git a/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java b/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java index 08308f80ab09d..4675f12b3e086 100644 --- a/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java +++ b/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java @@ -53,7 +53,10 @@ public class TransportDecompressorTests extends OpenSearchTestCase { public void testSimpleCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { byte randomByte = randomByte(); - try (OutputStream deflateStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))) { + try ( + OutputStream deflateStream = CompressorFactory.defaultCompressor() + .threadLocalOutputStream(Streams.flushOnCloseStream(output)) + ) { deflateStream.write(randomByte); } @@ -74,7 +77,7 @@ public void testMultiPageCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { try ( StreamOutput deflateStream = new OutputStreamStreamOutput( - CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output)) + CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) ) { for (int i = 0; i < 10000; ++i) { @@ -106,7 +109,7 @@ public void testIncrementalMultiPageCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { try ( StreamOutput deflateStream = new OutputStreamStreamOutput( - CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output)) + CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) ) { for (int i = 0; i < 10000; ++i) {