Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add ZSTD compression for snapshotting (#2996) #7906

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `org.gradle.test-retry` from 1.5.2 to 1.5.3 (#7810)
- Bump `com.diffplug.spotless` from 6.17.0 to 6.18.0 (#7896)
- Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897))
- Add `com.github.luben:zstd-jni` version 1.5.5-3 ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996))

### Changed
- Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836))
- Reduce memory copy in zstd compression ([#7681](https://github.com/opensearch-project/OpenSearch/pull/7681))
- Add min, max, average and thread info to resource stats in tasks API ([#7673](https://github.com/opensearch-project/OpenSearch/pull/7673))
- Add ZSTD compression for snapshotting ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996))

### Deprecated

Expand Down
3 changes: 3 additions & 0 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ bytebuddy = 1.14.3

# benchmark dependencies
jmh = 1.35

# compression
zstd = 1.5.5-3
3 changes: 0 additions & 3 deletions distribution/tools/plugin-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ thirdPartyAudit.ignoreViolations(
)

thirdPartyAudit.ignoreMissingClasses(
'com.github.luben.zstd.BufferPool',
'com.github.luben.zstd.ZstdInputStream',
'com.github.luben.zstd.ZstdOutputStream',
'org.brotli.dec.BrotliInputStream',
'org.objectweb.asm.AnnotationVisitor',
'org.objectweb.asm.Attribute',
Expand Down
1 change: 0 additions & 1 deletion modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ thirdPartyAudit {
'org.slf4j.LoggerFactory',
'org.slf4j.spi.LocationAwareLogger',

'com.github.luben.zstd.Zstd',
'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',
'com.jcraft.jzlib.Deflater',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ public static final class Repository {
MAX_CHUNK_SIZE,
Property.NodeScope
);
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Property.NodeScope);
}

private final BlobPath basePath;
Expand All @@ -118,7 +116,7 @@ public AzureRepository(
) {
super(
metadata,
Repository.COMPRESS_SETTING.get(metadata.settings()),
COMPRESS_SETTING.get(metadata.settings()),
namedXContentRegistry,
clusterService,
recoverySettings,
Expand All @@ -142,8 +140,8 @@ public AzureRepository(
// If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting.
// For secondary_only setting, the repository should be read only
final LocationMode locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
if (Repository.READONLY_SETTING.exists(metadata.settings())) {
this.readonly = Repository.READONLY_SETTING.get(metadata.settings());
if (READONLY_SETTING.exists(metadata.settings())) {
this.readonly = READONLY_SETTING.get(metadata.settings());
} else {
this.readonly = locationMode == LocationMode.SECONDARY_ONLY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.function.Function;

import static org.opensearch.common.settings.Setting.Property;
import static org.opensearch.common.settings.Setting.boolSetting;
import static org.opensearch.common.settings.Setting.byteSizeSetting;
import static org.opensearch.common.settings.Setting.simpleString;

Expand All @@ -70,7 +69,6 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {

static final Setting<String> BUCKET = simpleString("bucket", Property.NodeScope, Property.Dynamic);
static final Setting<String> BASE_PATH = simpleString("base_path", Property.NodeScope, Property.Dynamic);
static final Setting<Boolean> COMPRESS = boolSetting("compress", false, Property.NodeScope, Property.Dynamic);
static final Setting<ByteSizeValue> CHUNK_SIZE = byteSizeSetting(
"chunk_size",
MAX_CHUNK_SIZE,
Expand All @@ -94,7 +92,14 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
super(
metadata,
getSetting(COMPRESS_SETTING, metadata),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)
);
this.storageService = storageService;

String basePath = BASE_PATH.get(metadata.settings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public HdfsRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService, recoverySettings);
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,6 @@ class S3Repository extends MeteredBlobStoreRepository {
new ByteSizeValue(5, ByteSizeUnit.TB)
);

/**
* When set to true metadata files are stored in compressed format. This setting doesn’t affect index
* files that are already compressed by default. Defaults to false.
*/
static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false);

/**
* Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy,
* standard_ia, onezone_ia and intelligent_tiering. Defaults to standard.
Expand Down
1 change: 0 additions & 1 deletion plugins/transport-nio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ thirdPartyAudit {
'org.slf4j.LoggerFactory',
'org.slf4j.spi.LocationAwareLogger',

'com.github.luben.zstd.Zstd',
'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',
'com.jcraft.jzlib.Deflater',
Expand Down
2 changes: 1 addition & 1 deletion sandbox/plugins/custom-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ opensearchplugin {
}

dependencies {
api "com.github.luben:zstd-jni:1.5.5-1"
api "com.github.luben:zstd-jni:${versions.zstd}"
}

yamlRestTest.enabled = false;
Expand Down

This file was deleted.

3 changes: 3 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ dependencies {
api "com.google.protobuf:protobuf-java:${versions.protobuf}"
api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}"

//zstd
api "com.github.luben:zstd-jni:${versions.zstd}"

testImplementation(project(":test:framework")) {
// tests use the locally compiled version of server
exclude group: 'org.opensearch', module: 'server'
Expand Down
1 change: 1 addition & 0 deletions server/licenses/zstd-jni-1.5.5-3.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
488dd9b15c9e8cf87d857f65f5cd6359c2853381
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@
*/
public class CompressorFactory {

public static final Compressor COMPRESSOR = new DeflateCompressor();
public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor();

@Deprecated
public static final Compressor COMPRESSOR = DEFLATE_COMPRESSOR;

public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor();

public static final Compressor NONE_COMPRESSOR = new NoneCompressor();

public static boolean isCompressed(BytesReference bytes) {
return compressor(bytes) != null;
Expand All @@ -61,6 +68,9 @@ public static Compressor compressor(BytesReference bytes) {
// as a xcontent, we have a problem
assert XContentHelper.xContentType(bytes) == null;
return COMPRESSOR;
} else if (ZSTD_COMPRESSOR.isCompressed(bytes)) {
assert XContentHelper.xContentType(bytes) == null;
return ZSTD_COMPRESSOR;
}

XContentType contentType = XContentHelper.xContentType(bytes);
Expand All @@ -81,7 +91,6 @@ private static boolean isAncient(BytesReference bytes) {

/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
* @throws NullPointerException a NullPointerException will be thrown when bytes is null
*/
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.compress;

/**
* Supported compression types
*
* @opensearch.internal
*/
public enum CompressorType {

DEFLATE {
@Override
public Compressor compressor() {
return CompressorFactory.DEFLATE_COMPRESSOR;
}
},

ZSTD {
@Override
public Compressor compressor() {
return CompressorFactory.ZSTD_COMPRESSOR;
}
},

NONE {
@Override
public Compressor compressor() {
return CompressorFactory.NONE_COMPRESSOR;
}
};

public abstract Compressor compressor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,9 @@ public InputStream threadLocalInputStream(InputStream in) throws IOException {
* @return decompressing stream
*/
public static InputStream inputStream(InputStream in, boolean threadLocal) throws IOException {
final byte[] headerBytes = new byte[HEADER.length];
int len = 0;
while (len < headerBytes.length) {
final int read = in.read(headerBytes, len, headerBytes.length - len);
if (read == -1) {
break;
}
len += read;
}
if (len != HEADER.length || Arrays.equals(headerBytes, HEADER) == false) {
final byte[] header = in.readNBytes(HEADER.length);

if (Arrays.equals(header, HEADER) == false) {
throw new IllegalArgumentException("Input stream is not compressed with DEFLATE!");
}

Expand Down Expand Up @@ -252,9 +245,11 @@ public BytesReference uncompress(BytesReference bytesReference) throws IOExcepti
} finally {
inflater.reset();
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
try {
return buffer.copyBytes();
} finally {
buffer.reset();
}
}

// Reusable Deflater reference. Note: This is a separate instance from the one used for the compressing stream wrapper because we
Expand All @@ -271,8 +266,10 @@ public BytesReference compress(BytesReference bytesReference) throws IOException
} finally {
deflater.reset();
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
try {
return buffer.copyBytes();
} finally {
buffer.reset();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.compress;

import org.opensearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* {@link Compressor} no compressor implementation.
*
* @opensearch.internal
*/
public class NoneCompressor implements Compressor {
@Override
public boolean isCompressed(BytesReference bytes) {
return false;
}

@Override
public int headerLength() {
return 0;
}

@Override
public InputStream threadLocalInputStream(InputStream in) throws IOException {
return in;
}

@Override
public OutputStream threadLocalOutputStream(OutputStream out) throws IOException {
return out;
}

@Override
public BytesReference uncompress(BytesReference bytesReference) throws IOException {
return bytesReference;
}

@Override
public BytesReference compress(BytesReference bytesReference) throws IOException {
return bytesReference;
}

}
Loading