Skip to content

Commit

Permalink
Remove COMPRESSOR variable from CompressorFactory (opensearch-project…
Browse files Browse the repository at this point in the history
…#7907)

Removed a deprecated COMPRESSOR variable from
CompressorFactory and use DEFLATE_COMPRESSOR instea

Signed-off-by: Andrey Pleskach <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
willyborankin authored and shiv0408 committed Apr 25, 2024
1 parent 874c641 commit 347f4de
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove LegacyESVersion.V_7_10_ Constants ([#5018](https://github.com/opensearch-project/OpenSearch/pull/5018))
- Remove Version.V_1_ Constants ([#5021](https://github.com/opensearch-project/OpenSearch/pull/5021))
- Remove custom Map, List and Set collection classes ([#6871](https://github.com/opensearch-project/OpenSearch/pull/6871))
- Remove COMPRESSOR variable from CompressorFactory and use DEFLATE_COMPRESSOR instead ([7907](https://github.com/opensearch-project/OpenSearch/pull/7907))

### Fixed
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class CompressedStreamUtils {
public static BytesReference createCompressedStream(Version version, CheckedConsumer<StreamOutput, IOException> outputConsumer)
throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream))) {
stream.setVersion(version);
outputConsumer.accept(stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private CompressedXContent(byte[] compressed, int crc32) {
*/
public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream);
OutputStream compressedStream = CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream);
CRC32 crc32 = new CRC32();
OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32);
try (XContentBuilder builder = XContentFactory.jsonBuilder(checkedStream)) {
Expand Down Expand Up @@ -113,7 +113,7 @@ public CompressedXContent(BytesReference data) throws IOException {
this.bytes = BytesReference.toBytes(data);
this.crc32 = crc32(uncompressed());
} else {
this.bytes = BytesReference.toBytes(CompressorFactory.COMPRESSOR.compress(data));
this.bytes = BytesReference.toBytes(CompressorFactory.defaultCompressor().compress(data));
this.crc32 = crc32(data);
}
assertConsistent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ public class CompressorFactory {

public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor();

@Deprecated
public static final Compressor COMPRESSOR = DEFLATE_COMPRESSOR;

public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor();

public static final Compressor NONE_COMPRESSOR = new NoneCompressor();
Expand All @@ -59,14 +56,18 @@ public static boolean isCompressed(BytesReference bytes) {
return compressor(bytes) != null;
}

public static Compressor defaultCompressor() {
return DEFLATE_COMPRESSOR;
}

@Nullable
public static Compressor compressor(BytesReference bytes) {
if (COMPRESSOR.isCompressed(bytes)) {
if (DEFLATE_COMPRESSOR.isCompressed(bytes)) {
// bytes should be either detected as compressed or as xcontent,
// if we have bytes that can be either detected as compressed or
// as a xcontent, we have a problem
assert XContentHelper.xContentType(bytes) == null;
return COMPRESSOR;
return DEFLATE_COMPRESSOR;
} else if (ZSTD_COMPRESSOR.isCompressed(bytes)) {
assert XContentHelper.xContentType(bytes) == null;
return ZSTD_COMPRESSOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,7 +1720,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
if (cacheRepositoryData && bestEffortConsistency == false) {
final BytesReference serialized;
try {
serialized = CompressorFactory.COMPRESSOR.compress(updated);
serialized = CompressorFactory.defaultCompressor().compress(updated);
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
logger.debug(
Expand Down Expand Up @@ -1756,7 +1756,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
}

private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
try (InputStream input = CompressorFactory.COMPRESSOR.threadLocalInputStream(cacheEntry.v2().streamInput())) {
try (InputStream input = CompressorFactory.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) {
return RepositoryData.snapshotsFromXContent(
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input),
cacheEntry.v1()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ final class CompressibleBytesOutputStream extends StreamOutput {
this.bytesStreamOutput = bytesStreamOutput;
this.shouldCompress = shouldCompress;
if (shouldCompress) {
this.stream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput));
this.stream = CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput));
} else {
this.stream = bytesStreamOutput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.bytes.ReleasableBytesReference;
import org.opensearch.common.compress.Compressor;
import org.opensearch.common.compress.CompressorFactory;
import org.opensearch.common.recycler.Recycler;
import org.opensearch.common.util.PageCacheRecycler;
Expand Down Expand Up @@ -69,7 +70,8 @@ public TransportDecompressor(PageCacheRecycler recycler) {
public int decompress(BytesReference bytesReference) throws IOException {
int bytesConsumed = 0;
if (hasReadHeader == false) {
if (CompressorFactory.COMPRESSOR.isCompressed(bytesReference) == false) {
final Compressor compressor = CompressorFactory.defaultCompressor();
if (compressor.isCompressed(bytesReference) == false) {
int maxToRead = Math.min(bytesReference.length(), 10);
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)
.append("] content bytes out of [")
Expand All @@ -85,7 +87,7 @@ public int decompress(BytesReference bytesReference) throws IOException {
throw new IllegalStateException(sb.toString());
}
hasReadHeader = true;
int headerLength = CompressorFactory.COMPRESSOR.headerLength();
int headerLength = compressor.headerLength();
bytesReference = bytesReference.slice(headerLength, bytesReference.length() - headerLength);
bytesConsumed += headerLength;
}
Expand Down Expand Up @@ -135,7 +137,7 @@ public int decompress(BytesReference bytesReference) throws IOException {
}

public boolean canDecompress(int bytesAvailable) {
return hasReadHeader || bytesAvailable >= CompressorFactory.COMPRESSOR.headerLength();
return hasReadHeader || bytesAvailable >= CompressorFactory.defaultCompressor().headerLength();
}

public boolean isEOS() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private static String format(TcpChannel channel, InboundMessage message, String
private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
try {
return new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(streamInput));
return new InputStreamStreamInput(CompressorFactory.defaultCompressor().threadLocalInputStream(streamInput));
} catch (IllegalArgumentException e) {
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testStoredValue() throws IOException {

// case 2: a value that looks compressed: this used to fail in 1.x
BytesStreamOutput out = new BytesStreamOutput();
try (OutputStream compressed = CompressorFactory.COMPRESSOR.threadLocalOutputStream(out)) {
try (OutputStream compressed = CompressorFactory.defaultCompressor().threadLocalOutputStream(out)) {
new BytesArray(binaryValue1).writeTo(compressed);
}
final byte[] binaryValue2 = BytesReference.toBytes(out.bytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testStreamWithoutCompression() throws IOException {
// Closing compression stream does not close underlying stream
stream.close();

assertFalse(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));
assertFalse(CompressorFactory.defaultCompressor().isCompressed(bytesRef));

StreamInput streamInput = bytesRef.streamInput();
byte[] actualBytes = new byte[expectedBytes.length];
Expand All @@ -83,9 +83,11 @@ public void testStreamWithCompression() throws IOException {
BytesReference bytesRef = stream.materializeBytes();
stream.close();

assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));
assertTrue(CompressorFactory.defaultCompressor().isCompressed(bytesRef));

StreamInput streamInput = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bytesRef.streamInput()));
StreamInput streamInput = new InputStreamStreamInput(
CompressorFactory.defaultCompressor().threadLocalInputStream(bytesRef.streamInput())
);
byte[] actualBytes = new byte[expectedBytes.length];
streamInput.readBytes(actualBytes, 0, expectedBytes.length);

Expand All @@ -108,7 +110,7 @@ public void testCompressionWithCallingMaterializeFails() throws IOException {
stream.write(expectedBytes);

StreamInput streamInput = new InputStreamStreamInput(
CompressorFactory.COMPRESSOR.threadLocalInputStream(bStream.bytes().streamInput())
CompressorFactory.defaultCompressor().threadLocalInputStream(bStream.bytes().streamInput())
);
byte[] actualBytes = new byte[expectedBytes.length];
EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public class TransportDecompressorTests extends OpenSearchTestCase {
public void testSimpleCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
byte randomByte = randomByte();
try (OutputStream deflateStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))) {
try (
OutputStream deflateStream = CompressorFactory.defaultCompressor()
.threadLocalOutputStream(Streams.flushOnCloseStream(output))
) {
deflateStream.write(randomByte);
}

Expand All @@ -74,7 +77,7 @@ public void testMultiPageCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
StreamOutput deflateStream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))
CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output))
)
) {
for (int i = 0; i < 10000; ++i) {
Expand Down Expand Up @@ -106,7 +109,7 @@ public void testIncrementalMultiPageCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
StreamOutput deflateStream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))
CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output))
)
) {
for (int i = 0; i < 10000; ++i) {
Expand Down

0 comments on commit 347f4de

Please sign in to comment.