Skip to content

Commit

Permalink
introducing zstd compression codec plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Aug 31, 2023
1 parent 79e5aee commit 0d0aea6
Show file tree
Hide file tree
Showing 29 changed files with 518 additions and 114 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BWC and API enforcement] Decorate the existing APIs with proper annotations (part 1) ([#9520](https://github.com/opensearch-project/OpenSearch/pull/9520))
- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622))
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))
- Introducing ZStd as a plugin ([#9658](https://github.com/opensearch-project/OpenSearch/pull/9658))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
1 change: 1 addition & 0 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies {
testImplementation project(':modules:transport-netty4')
// for parent/child testing
testImplementation project(':modules:parent-join')
testImplementation project(':plugins:custom-codecs')
}

restResources {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
import org.opensearch.index.reindex.ReindexModulePlugin;
import org.opensearch.index.reindex.ReindexRequestBuilder;
import org.opensearch.index.reindex.ReindexTestCase;
import org.opensearch.plugins.Plugin;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand All @@ -40,6 +45,11 @@

public class MultiCodecReindexIT extends ReindexTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(CustomCodecPlugin.class, ReindexModulePlugin.class);
}

public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException {
internalCluster().ensureAtLeastNumDataNodes(1);
Map<String, String> codecMap = Map.of(
Expand Down
29 changes: 29 additions & 0 deletions plugins/custom-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
name 'custom-codecs'
description 'A plugin that implements custom compression codecs.'
classname 'org.opensearch.index.codec.customcodecs.CustomCodecPlugin'
licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt')
noticeFile rootProject.file('NOTICE.txt')
}

dependencies {
api "com.github.luben:zstd-jni:1.5.5-5"
}

yamlRestTest.enabled = false;
testingConventions.enabled = false;
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC;
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class CodecCompressionLevelIT extends OpenSearchIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(CustomCodecPlugin.class);
}

public void testLuceneCodecsCreateIndexWithCompressionLevel() {

Expand Down Expand Up @@ -62,7 +72,7 @@ public void testZStandardCodecsCreateIndexWithCompressionLevel() {
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
.build()
);
Expand All @@ -81,7 +91,7 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
.build()
);
Expand Down Expand Up @@ -164,7 +174,7 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx
.updateSettings(
new UpdateSettingsRequest(index).settings(
Settings.builder()
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.index.engine.Segment;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -40,6 +44,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class MultiCodecMergeIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(CustomCodecPlugin.class);
}

public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {

Map<String, String> codecMap = Map.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.Plugin;

import java.util.Optional;

/**
* A plugin that implements custom codecs. Supports these codecs:
* <ul>
* <li>ZSTD
* <li>ZSTDNODICT
* </ul>
*
* @opensearch.internal
*/
public final class CustomCodecPlugin extends Plugin implements EnginePlugin {

/**
* Creates a new instance
*/
public CustomCodecPlugin() {}

/**
* @param indexSettings is the default indexSettings
* @return the engine factory
*/
@Override
public Optional<CodecServiceFactory> getCustomCodecServiceFactory(final IndexSettings indexSettings) {
String codecName = indexSettings.getValue(EngineConfig.INDEX_CODEC_SETTING);
if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC) || codecName.equals(CustomCodecService.ZSTD_CODEC)) {
return Optional.of(new CustomCodecServiceFactory());
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.mapper.MapperService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;

import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;

/**
* CustomCodecService provides ZSTD and ZSTD_NO_DICT compression codecs.
*/
public class CustomCodecService extends CodecService {
private final Map<String, Codec> codecs;
/**
* ZStandard codec
*/
public static final String ZSTD_CODEC = "zstd";
/**
* ZStandard without dictionary codec
*/
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";

/**
* Creates a new CustomCodecService.
*
* @param mapperService The mapper service.
* @param indexSettings The index settings.
* @param logger The logger.
*/
public CustomCodecService(MapperService mapperService, IndexSettings indexSettings, Logger logger) {
super(mapperService, indexSettings, logger);
int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
} else {
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
}
this.codecs = codecs.immutableMap();
}

@Override
public Codec codec(String name) {
Codec codec = codecs.get(name);
if (codec == null) {
return super.codec(name);
}
return codec;
}

@Override
public String[] availableCodecs() {
ArrayList<String> ac = new ArrayList<String>(Arrays.asList(super.availableCodecs()));
ac.addAll(codecs.keySet());
return ac.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceConfig;
import org.opensearch.index.codec.CodecServiceFactory;

/**
* A factory for creating new {@link CodecService} instance
*/
public class CustomCodecServiceFactory implements CodecServiceFactory {

/** Creates a new instance. */
public CustomCodecServiceFactory() {}

@Override
public CodecService createCodecService(CodecServiceConfig config) {
return new CustomCodecService(config.getMapperService(), config.getIndexSettings(), config.getLogger());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
* @opensearch.internal
*/
public abstract class Lucene95CustomCodec extends FilterCodec {

/** Default compression level used for compression */
public static final int DEFAULT_COMPRESSION_LEVEL = 3;

/** Each mode represents a compression algorithm. */
public enum Mode {
/** ZStandard mode with dictionary*/
ZSTD,
/** ZStandard mode without dictionary*/
ZSTD_NO_DICT
}

Expand Down Expand Up @@ -55,6 +59,16 @@ public Lucene95CustomCodec(Mode mode, int compressionLevel) {
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
* @param mapperService The mapper service.
* @param logger The logger.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) {
super("Lucene95CustomCodec", new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger));
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ Lucene95CustomCodec.Mode getMode() {
return mode;
}

/**
* Returns the compression level.
*/
public int getCompressionLevel() {
return compressionLevel;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.opensearch.index.mapper.MapperService;

/** PerFieldMappingPostingFormatCodec. {@link org.opensearch.index.codec.PerFieldMappingPostingFormatCodec} */
public class PerFieldMappingPostingFormatCodec extends Lucene95CustomCodec {

/**
* Creates a new instance.
*
* @param compressionMode The compression mode (ZSTD or ZSTDNODICT).
* @param mapperService The mapper service.
*/
public PerFieldMappingPostingFormatCodec(Lucene95CustomCodec.Mode compressionMode, MapperService mapperService) {
super(compressionMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public ZstdCodec(int compressionLevel) {
super(Mode.ZSTD, compressionLevel);
}

/**
* Creates a new ZstdCodec instance.
*
* @param mapperService The mapper service.
* @param logger The logger.
* @param compressionLevel The compression level.
*/
public ZstdCodec(MapperService mapperService, Logger logger, int compressionLevel) {
super(Mode.ZSTD, compressionLevel, mapperService, logger);
}
Expand Down
Loading

0 comments on commit 0d0aea6

Please sign in to comment.