From d406e5807ef0079d886128fb0e763e7768519705 Mon Sep 17 00:00:00 2001 From: Navneet Verma Date: Tue, 7 May 2024 10:45:23 -0700 Subject: [PATCH] Add capability to disable source recovery_source for an index Signed-off-by: Navneet Verma --- CHANGELOG.md | 1 + .../recovery/FullRollingRestartIT.java | 143 ++++++++++++++++++ .../index/mapper/SourceFieldMapper.java | 117 ++++++++++++-- .../index/mapper/SourceFieldMapperTests.java | 137 ++++++++++++++++- 4 files changed, 385 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 098405f8f1d44..597e0ae31b054 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819)) - [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131)) - Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776)) +- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590)) - Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304)) - [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027)) diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java index 0752ab7c9d0f1..d9e3cec426edf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java @@ -36,6 +36,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -45,6 +46,8 @@ import org.opensearch.common.collect.MapBuilder; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; @@ -253,4 +256,144 @@ public void testNoRebalanceOnRollingRestart() throws Exception { ); } } + + public void testFullRollingRestart_withNoRecoveryPayloadAndSource() throws Exception { + internalCluster().startNode(); + XContentBuilder builder = XContentFactory.jsonBuilder() + .startObject() + .startObject("_source") + .field("enabled") + .value(false) + .field("recovery_source_enabled") + .value(false) + .endObject() + .endObject(); + CreateIndexResponse response = prepareCreate("test").setMapping(builder).get(); + logger.info("Create index response is : {}", response); + + final String healthTimeout = "1m"; + + for (int i = 0; i < 1000; i++) { + client().prepareIndex("test") + .setId(Long.toString(i)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + i).map()) + .execute() + .actionGet(); + } + + for (int i = 1000; i < 2000; i++) { + client().prepareIndex("test") + .setId(Long.toString(i)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + i).map()) + .execute() + .actionGet(); + } + // ensuring all docs are committed to file system + flush(); + + logger.info("--> now start adding nodes"); + internalCluster().startNode(); + internalCluster().startNode(); + + // make sure the cluster state is green, and all has been recovered + assertTimeout( + client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setTimeout(healthTimeout) + .setWaitForGreenStatus() + .setWaitForNoRelocatingShards(true) + .setWaitForNodes("3") + ); + + logger.info("--> add two more nodes"); + internalCluster().startNode(); + internalCluster().startNode(); + + // make sure the cluster state is green, and all has been recovered + assertTimeout( + client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setTimeout(healthTimeout) + .setWaitForGreenStatus() + .setWaitForNoRelocatingShards(true) + .setWaitForNodes("5") + ); + + logger.info("--> refreshing and checking data"); + refreshAndWaitForReplication(); + for (int i = 0; i < 10; i++) { + assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L); + } + + // now start shutting nodes down + internalCluster().stopRandomDataNode(); + // make sure the cluster state is green, and all has been recovered + assertTimeout( + client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setTimeout(healthTimeout) + .setWaitForGreenStatus() + .setWaitForNoRelocatingShards(true) + .setWaitForNodes("4") + ); + + internalCluster().stopRandomDataNode(); + // make sure the cluster state is green, and all has been recovered + assertTimeout( + client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setTimeout(healthTimeout) + .setWaitForGreenStatus() + .setWaitForNoRelocatingShards(true) + .setWaitForNodes("3") + ); + + logger.info("--> stopped two nodes, verifying data"); + refreshAndWaitForReplication(); + for (int i = 0; i < 10; i++) { + assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L); + } + + // closing the 3rd node + internalCluster().stopRandomDataNode(); + // make sure the cluster state is green, and all has been recovered + assertTimeout( + client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setTimeout(healthTimeout) + .setWaitForGreenStatus() + .setWaitForNoRelocatingShards(true) + .setWaitForNodes("2") + ); + + internalCluster().stopRandomDataNode(); + + // make sure the cluster state is yellow, and all has been recovered + assertTimeout( + client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setTimeout(healthTimeout) + .setWaitForYellowStatus() + .setWaitForNoRelocatingShards(true) + .setWaitForNodes("1") + ); + + logger.info("--> one node left, verifying data"); + refreshAndWaitForReplication(); + for (int i = 0; i < 10; i++) { + assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L); + } + } } diff --git a/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java index a2d769d486a0f..54f1528c04bf5 100644 --- a/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java @@ -74,6 +74,7 @@ public class SourceFieldMapper extends MetadataFieldMapper { public static final String CONTENT_TYPE = "_source"; private final Function, Map> filter; + private final Function, Map> recoverySourceFilter; /** * Default parameters for source fields @@ -119,13 +120,64 @@ public static class Builder extends MetadataFieldMapper.Builder { Collections.emptyList() ); + /** + * A mapping parameter which define whether the recovery_source should be added or not. Default value is true. + *

+ * Recovery source gets added if source is disabled or there are filters that are applied on _source using + * {@link #includes}/{@link #excludes}, which has the possibility to change the original document provided by + * customer. Recovery source is not a permanent field and gets removed during merges. Refer this merge + * policy: org.opensearch.index.engine.RecoverySourcePruneMergePolicy + *

+ * The main reason for adding the _recovery_source was to ensure Peer to Peer recovery if segments + * are not flushed to the disk. If you are disabling the recovery source, then ensure that you are calling + * flush operation of Opensearch periodically to ensure that segments are flushed to the disk and if required + * Peer to Peer recovery can happen using segment files rather than replaying traffic by querying Lucene + * snapshot. + * + *

+ * This is an expert mapping parameter. + * + */ + private final Parameter recoverySourceEnabled = Parameter.boolParam( + "recovery_source_enabled", + false, + m -> toType(m).recoverySourceEnabled, + Defaults.ENABLED + ); + + /** + * Provides capability to add specific fields in the recovery_source. + *

+ * Refer {@link #recoverySourceEnabled} for more details + * This is an expert parameter. + */ + private final Parameter> recoverySourceIncludes = Parameter.stringArrayParam( + "recovery_source_includes", + false, + m -> Arrays.asList(toType(m).recoverySourceIncludes), + Collections.emptyList() + ); + + /** + * Provides capability to remove specific fields in the recovery_source. + * + * Refer {@link #recoverySourceEnabled} for more details + * This is an expert parameter. + */ + private final Parameter> recoverySourceExcludes = Parameter.stringArrayParam( + "recovery_source_excludes", + false, + m -> Arrays.asList(toType(m).recoverySourceExcludes), + Collections.emptyList() + ); + public Builder() { super(Defaults.NAME); } @Override protected List> getParameters() { - return Arrays.asList(enabled, includes, excludes); + return Arrays.asList(enabled, includes, excludes, recoverySourceEnabled, recoverySourceIncludes, recoverySourceExcludes); } @Override @@ -133,7 +185,10 @@ public SourceFieldMapper build(BuilderContext context) { return new SourceFieldMapper( enabled.getValue(), includes.getValue().toArray(new String[0]), - excludes.getValue().toArray(new String[0]) + excludes.getValue().toArray(new String[0]), + recoverySourceEnabled.getValue(), + recoverySourceIncludes.getValue().toArray(new String[0]), + recoverySourceExcludes.getValue().toArray(new String[0]) ); } } @@ -173,17 +228,27 @@ public Query termQuery(Object value, QueryShardContext context) { } private final boolean enabled; + private final boolean recoverySourceEnabled; /** indicates whether the source will always exist and be complete, for use by features like the update API */ private final boolean complete; private final String[] includes; private final String[] excludes; + private final String[] recoverySourceIncludes; + private final String[] recoverySourceExcludes; private SourceFieldMapper() { - this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY); + this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY, Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY); } - private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes) { + private SourceFieldMapper( + boolean enabled, + String[] includes, + String[] excludes, + boolean recoverySourceEnabled, + String[] recoverySourceIncludes, + String[] recoverySourceExcludes + ) { super(new SourceFieldType(enabled)); this.enabled = enabled; this.includes = includes; @@ -191,6 +256,16 @@ private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes) final boolean filtered = CollectionUtils.isEmpty(includes) == false || CollectionUtils.isEmpty(excludes) == false; this.filter = enabled && filtered ? XContentMapValues.filter(includes, excludes) : null; this.complete = enabled && CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes); + + // Set parameters for recovery source + this.recoverySourceEnabled = recoverySourceEnabled; + this.recoverySourceIncludes = recoverySourceIncludes; + this.recoverySourceExcludes = recoverySourceExcludes; + final boolean recoverySourcefiltered = CollectionUtils.isEmpty(recoverySourceIncludes) == false + || CollectionUtils.isEmpty(recoverySourceExcludes) == false; + this.recoverySourceFilter = this.recoverySourceEnabled && recoverySourcefiltered + ? XContentMapValues.filter(recoverySourceIncludes, recoverySourceExcludes) + : null; } public boolean enabled() { @@ -212,22 +287,40 @@ public void preParse(ParseContext context) throws IOException { context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length)); } - if (originalSource != null && adaptedSource != originalSource) { - // if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery - BytesRef ref = originalSource.toBytesRef(); - context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); - context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1)); + if (recoverySourceEnabled) { + if (originalSource != null && adaptedSource != originalSource) { + final BytesReference adaptedRecoverySource = applyFilters( + originalSource, + contentType, + recoverySourceEnabled, + recoverySourceFilter + ); + // if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery + BytesRef ref = adaptedRecoverySource.toBytesRef(); + context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); + context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1)); + } } } @Nullable public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable MediaType contentType) throws IOException { - if (enabled && originalSource != null) { + return applyFilters(originalSource, contentType, enabled, filter); + } + + @Nullable + private BytesReference applyFilters( + @Nullable BytesReference originalSource, + @Nullable MediaType contentType, + boolean isProvidedSourceEnabled, + @Nullable final Function, Map> filters + ) throws IOException { + if (isProvidedSourceEnabled && originalSource != null) { // Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data - if (filter != null) { + if (filters != null) { // we don't update the context source if we filter, we want to keep it as is... Tuple> mapTuple = XContentHelper.convertToMap(originalSource, true, contentType); - Map filteredSource = filter.apply(mapTuple.v2()); + Map filteredSource = filters.apply(mapTuple.v2()); BytesStreamOutput bStream = new BytesStreamOutput(); MediaType actualContentType = mapTuple.v1(); XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource); diff --git a/server/src/test/java/org/opensearch/index/mapper/SourceFieldMapperTests.java b/server/src/test/java/org/opensearch/index/mapper/SourceFieldMapperTests.java index 83d42fd423f08..4f3a4530b5475 100644 --- a/server/src/test/java/org/opensearch/index/mapper/SourceFieldMapperTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/SourceFieldMapperTests.java @@ -90,7 +90,8 @@ public void testNoFormat() throws Exception { XContentType.SMILE ) ); - + final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source"); + assertNull(recoverySourceIndexableField); assertThat(MediaTypeRegistry.xContentType(doc.source()), equalTo(XContentType.SMILE)); } @@ -128,6 +129,52 @@ public void testIncludes() throws Exception { ) ); + IndexableField sourceField = doc.rootDoc().getField("_source"); + Map sourceAsMap; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(sourceField.binaryValue()))) { + sourceAsMap = parser.map(); + } + final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source"); + assertNotNull(recoverySourceIndexableField); + assertThat(sourceAsMap.containsKey("path1"), equalTo(true)); + assertThat(sourceAsMap.containsKey("path2"), equalTo(false)); + } + + public void testIncludesForRecoverySource() throws Exception { + String mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("type") + .startObject("_source") + .array("includes", new String[] { "path1*" }) + .array("recovery_source_includes", new String[] { "path2*" }) + .endObject() + .endObject() + .endObject() + .toString(); + + DocumentMapper documentMapper = createIndex("test").mapperService() + .documentMapperParser() + .parse("type", new CompressedXContent(mapping)); + + ParsedDocument doc = documentMapper.parse( + new SourceToParse( + "test", + "1", + BytesReference.bytes( + XContentFactory.jsonBuilder() + .startObject() + .startObject("path1") + .field("field1", "value1") + .endObject() + .startObject("path2") + .field("field2", "value2") + .endObject() + .endObject() + ), + MediaTypeRegistry.JSON + ) + ); + IndexableField sourceField = doc.rootDoc().getField("_source"); Map sourceAsMap; try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(sourceField.binaryValue()))) { @@ -135,6 +182,39 @@ public void testIncludes() throws Exception { } assertThat(sourceAsMap.containsKey("path1"), equalTo(true)); assertThat(sourceAsMap.containsKey("path2"), equalTo(false)); + + final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source"); + assertNotNull(recoverySourceIndexableField); + Map recoverySourceAsMap; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(recoverySourceIndexableField.binaryValue()))) { + recoverySourceAsMap = parser.map(); + } + + assertThat(recoverySourceAsMap.containsKey("path1"), equalTo(false)); + assertThat(recoverySourceAsMap.containsKey("path2"), equalTo(true)); + } + + public void testNoRecoverySourceAndNoSource_whenBothAreDisabled() throws Exception { + String mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("type") + .startObject("_source") + .field("enabled", "false") + .field("recovery_source_enabled", "false") + .endObject() + .endObject() + .endObject() + .toString(); + + DocumentMapperParser parser = createIndex("test").mapperService().documentMapperParser(); + DocumentMapper documentMapper = parser.parse("type", new CompressedXContent(mapping)); + BytesReference source = BytesReference.bytes(XContentFactory.jsonBuilder().startObject().field("field", "value").endObject()); + ParsedDocument doc = documentMapper.parse(new SourceToParse("test", "1", source, MediaTypeRegistry.JSON)); + + final IndexableField sourceIndexableField = doc.rootDoc().getField("_source"); + final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source"); + assertNull(recoverySourceIndexableField); + assertNull(sourceIndexableField); } public void testExcludes() throws Exception { @@ -171,6 +251,52 @@ public void testExcludes() throws Exception { ) ); + IndexableField sourceField = doc.rootDoc().getField("_source"); + Map sourceAsMap; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(sourceField.binaryValue()))) { + sourceAsMap = parser.map(); + } + final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source"); + assertNotNull(recoverySourceIndexableField); + assertThat(sourceAsMap.containsKey("path1"), equalTo(false)); + assertThat(sourceAsMap.containsKey("path2"), equalTo(true)); + } + + public void testExcludesForRecoverySource() throws Exception { + String mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("type") + .startObject("_source") + .array("excludes", "path1*") + .array("recovery_source_excludes", "path2*") + .endObject() + .endObject() + .endObject() + .toString(); + + DocumentMapper documentMapper = createIndex("test").mapperService() + .documentMapperParser() + .parse("type", new CompressedXContent(mapping)); + + ParsedDocument doc = documentMapper.parse( + new SourceToParse( + "test", + "1", + BytesReference.bytes( + XContentFactory.jsonBuilder() + .startObject() + .startObject("path1") + .field("field1", "value1") + .endObject() + .startObject("path2") + .field("field2", "value2") + .endObject() + .endObject() + ), + MediaTypeRegistry.JSON + ) + ); + IndexableField sourceField = doc.rootDoc().getField("_source"); Map sourceAsMap; try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(sourceField.binaryValue()))) { @@ -178,6 +304,15 @@ public void testExcludes() throws Exception { } assertThat(sourceAsMap.containsKey("path1"), equalTo(false)); assertThat(sourceAsMap.containsKey("path2"), equalTo(true)); + + final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source"); + assertNotNull(recoverySourceIndexableField); + Map recoverySourceAsMap; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(recoverySourceIndexableField.binaryValue()))) { + recoverySourceAsMap = parser.map(); + } + assertThat(recoverySourceAsMap.containsKey("path1"), equalTo(true)); + assertThat(recoverySourceAsMap.containsKey("path2"), equalTo(false)); } public void testEnabledNotUpdateable() throws Exception {