From 9c679cbbfcf685e3865d2cf06b8f4e10c3082d49 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 8 Mar 2022 18:42:32 -0500 Subject: [PATCH] MapperService has to be passed in as null for EnginePlugins CodecService constructor (#2177) * MapperService has to be passed in as null for EnginePlugins CodecService constructor Signed-off-by: Andriy Redko * Addressing code review comments Signed-off-by: Andriy Redko * Delayed CodecService instantiation up to the shard initialization Signed-off-by: Andriy Redko * Added logger (associated with shard) to CodecServiceConfig Signed-off-by: Andriy Redko * Refactored the EngineConfigFactory / IndexShard instantiation of the CodecService Signed-off-by: Andriy Redko --- .../index/codec/CodecServiceConfig.java | 45 ++++++++++++++ .../index/codec/CodecServiceFactory.java | 22 +++++++ .../index/engine/EngineConfigFactory.java | 59 ++++++++++++++++-- .../opensearch/index/shard/IndexShard.java | 3 +- .../org/opensearch/plugins/EnginePlugin.java | 16 +++++ .../engine/EngineConfigFactoryTests.java | 62 +++++++++++++++++++ 6 files changed, 200 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/codec/CodecServiceConfig.java create mode 100644 server/src/main/java/org/opensearch/index/codec/CodecServiceFactory.java diff --git a/server/src/main/java/org/opensearch/index/codec/CodecServiceConfig.java b/server/src/main/java/org/opensearch/index/codec/CodecServiceConfig.java new file mode 100644 index 0000000000000..313c0d359bb02 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/CodecServiceConfig.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec; + +import org.apache.logging.log4j.Logger; +import org.opensearch.common.Nullable; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.MapperService; + +import java.util.Objects; + +/** + * The configuration parameters necessary for the {@link CodecService} instance construction. + */ +public final class CodecServiceConfig { + private final IndexSettings indexSettings; + private final MapperService mapperService; + private final Logger logger; + + public CodecServiceConfig(IndexSettings indexSettings, @Nullable MapperService mapperService, @Nullable Logger logger) { + this.indexSettings = Objects.requireNonNull(indexSettings); + this.mapperService = mapperService; + this.logger = logger; + } + + public IndexSettings getIndexSettings() { + return indexSettings; + } + + @Nullable + public MapperService getMapperService() { + return mapperService; + } + + @Nullable + public Logger getLogger() { + return logger; + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/CodecServiceFactory.java b/server/src/main/java/org/opensearch/index/codec/CodecServiceFactory.java new file mode 100644 index 0000000000000..da28c5f06b035 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/CodecServiceFactory.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec; + +/** + * A factory for creating new {@link CodecService} instance + */ +@FunctionalInterface +public interface CodecServiceFactory { + /** + * Create new {@link CodecService} instance + * @param config code service configuration + * @return new {@link CodecService} instance + */ + CodecService createCodecService(CodecServiceConfig config); +} diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index dffdb58bfec1e..a78a5e5a4820a 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -8,6 +8,7 @@ package org.opensearch.index.engine; +import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.search.QueryCache; @@ -15,9 +16,13 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.similarities.Similarity; +import org.opensearch.common.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexSettings; import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceConfig; +import org.opensearch.index.codec.CodecServiceFactory; +import org.opensearch.index.mapper.MapperService; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; @@ -39,7 +44,7 @@ * A factory to create an EngineConfig based on custom plugin overrides */ public class EngineConfigFactory { - private final CodecService codecService; + private final CodecServiceFactory codecServiceFactory; private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory; /** default ctor primarily used for tests without plugins */ @@ -58,6 +63,8 @@ public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSetti EngineConfigFactory(Collection enginePlugins, IndexSettings idxSettings) { Optional codecService = Optional.empty(); String codecServiceOverridingPlugin = null; + Optional codecServiceFactory = Optional.empty(); + String codecServiceFactoryOverridingPlugin = null; Optional translogDeletionPolicyFactory = Optional.empty(); String translogDeletionPolicyOverridingPlugin = null; for (EnginePlugin enginePlugin : enginePlugins) { @@ -65,7 +72,7 @@ public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSetti if (codecService.isPresent() == false) { codecService = enginePlugin.getCustomCodecService(idxSettings); codecServiceOverridingPlugin = enginePlugin.getClass().getName(); - } else { + } else if (enginePlugin.getCustomCodecService(idxSettings).isPresent()) { throw new IllegalStateException( "existing codec service already overridden in: " + codecServiceOverridingPlugin @@ -76,7 +83,7 @@ public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSetti if (translogDeletionPolicyFactory.isPresent() == false) { translogDeletionPolicyFactory = enginePlugin.getCustomTranslogDeletionPolicyFactory(); translogDeletionPolicyOverridingPlugin = enginePlugin.getClass().getName(); - } else { + } else if (enginePlugin.getCustomTranslogDeletionPolicyFactory().isPresent()) { throw new IllegalStateException( "existing TranslogDeletionPolicyFactory is already overridden in: " + translogDeletionPolicyOverridingPlugin @@ -84,12 +91,37 @@ public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSetti + enginePlugin.getClass().getName() ); } + // get overriding CodecServiceFactory from EnginePlugin + if (codecServiceFactory.isPresent() == false) { + codecServiceFactory = enginePlugin.getCustomCodecServiceFactory(idxSettings); + codecServiceFactoryOverridingPlugin = enginePlugin.getClass().getName(); + } else if (enginePlugin.getCustomCodecServiceFactory(idxSettings).isPresent()) { + throw new IllegalStateException( + "existing codec service factory already overridden in: " + + codecServiceFactoryOverridingPlugin + + " attempting to override again by: " + + enginePlugin.getClass().getName() + ); + } + } + + if (codecService.isPresent() && codecServiceFactory.isPresent()) { + throw new IllegalStateException( + "both codec service and codec service factory are present, codec service provided by: " + + codecServiceOverridingPlugin + + " conflicts with codec service factory provided by: " + + codecServiceFactoryOverridingPlugin + ); } - this.codecService = codecService.orElse(null); + + final CodecService instance = codecService.orElse(null); + this.codecServiceFactory = (instance != null) ? (config) -> instance : codecServiceFactory.orElse(null); this.translogDeletionPolicyFactory = translogDeletionPolicyFactory.orElse((idxs, rtls) -> null); } - /** Instantiates a new EngineConfig from the provided custom overrides */ + /** + * Instantiates a new EngineConfig from the provided custom overrides + */ public EngineConfig newEngineConfig( ShardId shardId, ThreadPool threadPool, @@ -114,6 +146,10 @@ public EngineConfig newEngineConfig( LongSupplier primaryTermSupplier, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier ) { + CodecService codecServiceToUse = codecService; + if (codecService == null && this.codecServiceFactory != null) { + codecServiceToUse = newCodecServiceOrDefault(indexSettings, null, null, null); + } return new EngineConfig( shardId, @@ -124,7 +160,7 @@ public EngineConfig newEngineConfig( mergePolicy, analyzer, similarity, - this.codecService != null ? this.codecService : codecService, + codecServiceToUse, eventListener, queryCache, queryCachingPolicy, @@ -141,4 +177,15 @@ public EngineConfig newEngineConfig( tombstoneDocSupplier ); } + + public CodecService newCodecServiceOrDefault( + IndexSettings indexSettings, + @Nullable MapperService mapperService, + Logger logger, + CodecService defaultCodecService + ) { + return this.codecServiceFactory != null + ? this.codecServiceFactory.createCodecService(new CodecServiceConfig(indexSettings, mapperService, logger)) + : defaultCodecService; + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1474785f5f4e9..df0edd02d4f48 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3155,6 +3155,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { this.warmer.warm(reader); } }; + return this.engineConfigFactory.newEngineConfig( shardId, threadPool, @@ -3164,7 +3165,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { indexSettings.getMergePolicy(), mapperService != null ? mapperService.indexAnalyzer() : null, similarityService.similarity(mapperService), - codecService, + engineConfigFactory.newCodecServiceOrDefault(indexSettings, mapperService, logger, codecService), shardEventListener, indexCache != null ? indexCache.query() : null, cachingPolicy, diff --git a/server/src/main/java/org/opensearch/plugins/EnginePlugin.java b/server/src/main/java/org/opensearch/plugins/EnginePlugin.java index ee285e8be8c2f..4c3a07d7b98d9 100644 --- a/server/src/main/java/org/opensearch/plugins/EnginePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/EnginePlugin.java @@ -34,6 +34,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.translog.TranslogDeletionPolicy; @@ -63,11 +64,26 @@ public interface EnginePlugin { * to determine if a custom {@link CodecService} should be provided for the given index. A plugin that is not overriding * the {@link CodecService} through the plugin can ignore this method and the Codec specified in the {@link IndexSettings} * will be used. + * + * @deprecated Please use {@code getCustomCodecServiceFactory()} instead as it provides more context for {@link CodecService} + * instance construction. */ + @Deprecated default Optional getCustomCodecService(IndexSettings indexSettings) { return Optional.empty(); } + /** + * EXPERT: + * When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings + * to determine if a custom {@link CodecServiceFactory} should be provided for the given index. A plugin that is not overriding + * the {@link CodecServiceFactory} through the plugin can ignore this method and the default Codec specified in the + * {@link IndexSettings} will be used. + */ + default Optional getCustomCodecServiceFactory(IndexSettings indexSettings) { + return Optional.empty(); + } + /** * When an index is created this method is invoked for each engine plugin. Engine plugins that need to provide a * custom {@link TranslogDeletionPolicy} can override this method to return a function that takes the {@link IndexSettings} diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java index df3e8deb6d90a..a6bc87d53c004 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java @@ -14,6 +14,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexSettings; import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; @@ -84,6 +85,18 @@ public void testCreateEngineConfigFromFactoryMultipleCodecServiceIllegalStateExc expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings)); } + public void testCreateEngineConfigFromFactoryMultipleCodecServiceAndFactoryIllegalStateException() { + IndexMetadata meta = IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + List plugins = Arrays.asList(new FooEnginePlugin(), new BakEnginePlugin()); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings()); + + expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings)); + } + public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolicyFactoryIllegalStateException() { IndexMetadata meta = IndexMetadata.builder("test") .settings(settings(Version.CURRENT)) @@ -96,6 +109,43 @@ public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolic expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings)); } + public void testCreateCodecServiceFromFactory() { + IndexMetadata meta = IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + List plugins = Arrays.asList(new BakEnginePlugin()); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings()); + + EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings); + EngineConfig config = factory.newEngineConfig( + null, + null, + indexSettings, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + TimeValue.timeValueMinutes(5), + null, + null, + null, + null, + null, + () -> new RetentionLeases(0, 0, Collections.emptyList()), + null, + null + ); + assertNotNull(config.getCodec()); + } + private static class FooEnginePlugin extends Plugin implements EnginePlugin { @Override public Optional getEngineFactory(final IndexSettings indexSettings) { @@ -125,6 +175,18 @@ public Optional getCustomCodecService(IndexSettings indexSettings) } } + private static class BakEnginePlugin extends Plugin implements EnginePlugin { + @Override + public Optional getEngineFactory(final IndexSettings indexSettings) { + return Optional.empty(); + } + + @Override + public Optional getCustomCodecServiceFactory(IndexSettings indexSettings) { + return Optional.of(config -> new CodecService(config.getMapperService(), LogManager.getLogger(getClass()))); + } + } + private static class BazEnginePlugin extends Plugin implements EnginePlugin { @Override public Optional getEngineFactory(final IndexSettings indexSettings) {