Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing ZStd compression codec plugin #9658

Merged
merged 18 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
- Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503))
- Move ZStd to a plugin ([#9658](https://github.com/opensearch-project/OpenSearch/pull/9658))

### Deprecated

Expand Down
2 changes: 2 additions & 0 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies {
testImplementation project(':modules:transport-netty4')
// for parent/child testing
testImplementation project(':modules:parent-join')
testImplementation project(':plugins:custom-codecs')
}

restResources {
Expand All @@ -95,4 +96,5 @@ forbiddenPatterns {
tasks.named("bundlePlugin").configure {
dependsOn("copyParentJoinMetadata")
dependsOn("copyTransportNetty4Metadata")
dependsOn("copyCustomCodecsMetadata")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
import org.opensearch.index.reindex.ReindexModulePlugin;
import org.opensearch.index.reindex.ReindexRequestBuilder;
import org.opensearch.index.reindex.ReindexTestCase;
import org.opensearch.plugins.Plugin;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand All @@ -40,6 +45,11 @@

public class MultiCodecReindexIT extends ReindexTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(CustomCodecPlugin.class, ReindexModulePlugin.class);
sarthakaggarwal97 marked this conversation as resolved.
Show resolved Hide resolved
}

public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException {
internalCluster().ensureAtLeastNumDataNodes(1);
Map<String, String> codecMap = Map.of(
Expand Down
27 changes: 27 additions & 0 deletions plugins/custom-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
name 'custom-codecs'
sarthakaggarwal97 marked this conversation as resolved.
Show resolved Hide resolved
description 'A plugin that implements custom compression codecs.'
classname 'org.opensearch.index.codec.customcodecs.CustomCodecPlugin'
licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt')
noticeFile rootProject.file('NOTICE.txt')
}

dependencies {
api "com.github.luben:zstd-jni:1.5.5-5"
}

testingConventions.enabled = false;
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC;
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class CodecCompressionLevelIT extends OpenSearchIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(CustomCodecPlugin.class);
}

public void testLuceneCodecsCreateIndexWithCompressionLevel() {

Expand Down Expand Up @@ -62,7 +72,7 @@ public void testZStandardCodecsCreateIndexWithCompressionLevel() {
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
.build()
);
Expand All @@ -81,7 +91,7 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
.build()
);
Expand Down Expand Up @@ -164,7 +174,7 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx
.updateSettings(
new UpdateSettingsRequest(index).settings(
Settings.builder()
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.index.engine.Segment;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -40,6 +44,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class MultiCodecMergeIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(CustomCodecPlugin.class);
}

public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {

Map<String, String> codecMap = Map.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.Plugin;

import java.util.Optional;

/**
* A plugin that implements custom codecs. Supports these codecs:
* <ul>
* <li>ZSTD
* <li>ZSTDNODICT
* </ul>
*
* @opensearch.internal
*/
public final class CustomCodecPlugin extends Plugin implements EnginePlugin {

/**
* Creates a new instance
*/
public CustomCodecPlugin() {}

/**
* @param indexSettings is the default indexSettings
* @return the engine factory
*/
@Override
public Optional<CodecServiceFactory> getCustomCodecServiceFactory(final IndexSettings indexSettings) {
String codecName = indexSettings.getValue(EngineConfig.INDEX_CODEC_SETTING);
if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC) || codecName.equals(CustomCodecService.ZSTD_CODEC)) {
return Optional.of(new CustomCodecServiceFactory());
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.mapper.MapperService;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Stream;

import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;

/**
* CustomCodecService provides ZSTD and ZSTD_NO_DICT compression codecs.
*/
public class CustomCodecService extends CodecService {
sarthakaggarwal97 marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, Codec> codecs;
/**
* ZStandard codec
*/
public static final String ZSTD_CODEC = "zstd";
/**
* ZStandard without dictionary codec
*/
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";

/**
* Creates a new CustomCodecService.
*
* @param mapperService The mapper service.
* @param indexSettings The index settings.
* @param logger The logger.
*/
public CustomCodecService(MapperService mapperService, IndexSettings indexSettings, Logger logger) {
super(mapperService, indexSettings, logger);
int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
} else {
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
}
this.codecs = codecs.immutableMap();
}

@Override
public Codec codec(String name) {
Codec codec = codecs.get(name);
if (codec == null) {
return super.codec(name);
}
return codec;
}

@Override
public String[] availableCodecs() {
return Stream.concat(Arrays.stream(super.availableCodecs()), codecs.keySet().stream()).toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceConfig;
import org.opensearch.index.codec.CodecServiceFactory;

/**
* A factory for creating new {@link CodecService} instance
*/
public class CustomCodecServiceFactory implements CodecServiceFactory {

/** Creates a new instance. */
public CustomCodecServiceFactory() {}

@Override
public CodecService createCodecService(CodecServiceConfig config) {
return new CustomCodecService(config.getMapperService(), config.getIndexSettings(), config.getLogger());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
import org.opensearch.index.mapper.MapperService;

import java.util.Collections;
import java.util.Set;

/**
*
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
Expand All @@ -23,12 +26,48 @@
* @opensearch.internal
*/
public abstract class Lucene95CustomCodec extends FilterCodec {

/** Default compression level used for compression */
public static final int DEFAULT_COMPRESSION_LEVEL = 3;

/** Each mode represents a compression algorithm. */
public enum Mode {
ZSTD,
ZSTD_NO_DICT
/**
* ZStandard mode with dictionary
*/
ZSTD("ZSTD", Set.of("zstd")),
/**
* ZStandard mode without dictionary
*/
ZSTD_NO_DICT("ZSTDNODICT", Set.of("zstd_no_dict")),
/**
* Deprecated ZStandard mode, added for backward compatibility to support indices created in 2.9.0 where
* both ZSTD and ZSTD_NO_DICT used Lucene95CustomCodec underneath. This should not be used to
* create new indices.
*/
ZSTD_DEPRECATED("Lucene95CustomCodec", Collections.emptySet());

private final String codec;
private final Set<String> aliases;

Mode(String codec, Set<String> aliases) {
this.codec = codec;
this.aliases = aliases;
}

/**
* Returns the Codec that is registered with Lucene
*/
public String getCodec() {
return codec;
}

/**
* Returns the aliases of the Codec
*/
public Set<String> getAliases() {
return aliases;
}
}

private final StoredFieldsFormat storedFieldsFormat;
Expand All @@ -51,12 +90,22 @@ public Lucene95CustomCodec(Mode mode) {
* @param compressionLevel The compression level.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel) {
super("Lucene95CustomCodec", new Lucene95Codec());
super(mode.getCodec(), new Lucene95Codec());
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
* @param mapperService The mapper service.
* @param logger The logger.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) {
super("Lucene95CustomCodec", new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger));
super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger));
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
}

Expand Down
Loading