diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java index de6b7a682324e..482867d072fc2 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java @@ -12,6 +12,9 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestStatus; import org.junit.Before; import java.io.IOException; @@ -122,13 +125,25 @@ public void testExplicitlyResetDataStreamOptions() throws IOException { assertOK(client().performRequest(otherRequest)); } - public void testEnableDisableFailureStore() throws IOException { + public void testBehaviorWithEachFailureStoreOptionAndClusterSetting() throws IOException { { + // Default data stream options assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options"))); - assertFailureStore(false, 1); + setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME); assertDataStreamOptions(null); + assertFailureStoreValuesInGetDataStreamResponse(true, 1); + assertRedirectsDocWithBadMappingToFailureStore(); + setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream"); + assertDataStreamOptions(null); + assertFailureStoreValuesInGetDataStreamResponse(false, 1); + assertFailsDocWithBadMapping(); + setDataStreamFailureStoreClusterSetting(null); // should get same behaviour as when we set it to something non-matching + assertDataStreamOptions(null); + assertFailureStoreValuesInGetDataStreamResponse(false, 1); + assertFailsDocWithBadMapping(); } { + // Data stream options with failure store enabled Request enableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options"); enableRequest.setJsonEntity(""" { @@ -137,11 +152,21 @@ public void testEnableDisableFailureStore() throws IOException { } }"""); assertAcknowledged(client().performRequest(enableRequest)); - assertFailureStore(true, 1); + setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME); + assertDataStreamOptions(true); + assertFailureStoreValuesInGetDataStreamResponse(true, 1); + assertRedirectsDocWithBadMappingToFailureStore(); + setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream"); // should have no effect as enabled in options assertDataStreamOptions(true); + assertFailureStoreValuesInGetDataStreamResponse(true, 1); + assertRedirectsDocWithBadMappingToFailureStore(); + setDataStreamFailureStoreClusterSetting(null); // same as previous + assertDataStreamOptions(true); + assertFailureStoreValuesInGetDataStreamResponse(true, 1); + assertRedirectsDocWithBadMappingToFailureStore(); } - { + // Data stream options with failure store disabled Request disableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options"); disableRequest.setJsonEntity(""" { @@ -150,13 +175,23 @@ public void testEnableDisableFailureStore() throws IOException { } }"""); assertAcknowledged(client().performRequest(disableRequest)); - assertFailureStore(false, 1); + setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME); // should have no effect as disabled in options assertDataStreamOptions(false); + assertFailureStoreValuesInGetDataStreamResponse(false, 1); + assertFailsDocWithBadMapping(); + setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream"); + assertDataStreamOptions(false); + assertFailureStoreValuesInGetDataStreamResponse(false, 1); + assertFailsDocWithBadMapping(); + setDataStreamFailureStoreClusterSetting(null); + assertDataStreamOptions(false); + assertFailureStoreValuesInGetDataStreamResponse(false, 1); + assertFailsDocWithBadMapping(); } } @SuppressWarnings("unchecked") - private void assertFailureStore(boolean failureStoreEnabled, int failureStoreSize) throws IOException { + private void assertFailureStoreValuesInGetDataStreamResponse(boolean failureStoreEnabled, int failureStoreSize) throws IOException { final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME)); List dataStreams = (List) entityAsMap(dataStreamResponse).get("data_streams"); assertThat(dataStreams.size(), is(1)); @@ -198,4 +233,32 @@ private List getIndices(Map response) { List> indices = (List>) response.get("indices"); return indices.stream().map(index -> index.get("index_name")).toList(); } + + private static void setDataStreamFailureStoreClusterSetting(String value) throws IOException { + updateClusterSettings( + Settings.builder().put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), value).build() + ); + } + + private Response putDocumentWithBadMapping() throws IOException { + Request request = new Request("POST", DATA_STREAM_NAME + "/_doc"); + request.setJsonEntity(""" + { + "@timestamp": "not a timestamp", + "foo": "bar" + } + """); + return client().performRequest(request); + } + + private void assertRedirectsDocWithBadMappingToFailureStore() throws IOException { + Response response = putDocumentWithBadMapping(); + String failureStoreResponse = (String) entityAsMap(response).get("failure_store"); + assertThat(failureStoreResponse, is("used")); + } + + private void assertFailsDocWithBadMapping() { + ResponseException e = assertThrows(ResponseException.class, this::putDocumentWithBadMapping); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(RestStatus.BAD_REQUEST.getStatus())); + } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java index ffa2447f5f5aa..2d310fef0be7e 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -64,6 +65,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadAction private final SystemIndices systemIndices; private final ClusterSettings clusterSettings; private final DataStreamGlobalRetentionSettings globalRetentionSettings; + private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings; private final Client client; @Inject @@ -75,6 +77,7 @@ public TransportGetDataStreamsAction( IndexNameExpressionResolver indexNameExpressionResolver, SystemIndices systemIndices, DataStreamGlobalRetentionSettings globalRetentionSettings, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings, Client client ) { super( @@ -91,6 +94,7 @@ public TransportGetDataStreamsAction( this.systemIndices = systemIndices; this.globalRetentionSettings = globalRetentionSettings; clusterSettings = clusterService.getClusterSettings(); + this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; this.client = new OriginSettingClient(client, "stack"); } @@ -122,6 +126,7 @@ public void onResponse(DataStreamsStatsAction.Response response) { systemIndices, clusterSettings, globalRetentionSettings, + dataStreamFailureStoreSettings, maxTimestamps ) ); @@ -134,7 +139,16 @@ public void onFailure(Exception e) { }); } else { listener.onResponse( - innerOperation(state, request, indexNameExpressionResolver, systemIndices, clusterSettings, globalRetentionSettings, null) + innerOperation( + state, + request, + indexNameExpressionResolver, + systemIndices, + clusterSettings, + globalRetentionSettings, + dataStreamFailureStoreSettings, + null + ) ); } } @@ -146,11 +160,16 @@ static GetDataStreamAction.Response innerOperation( SystemIndices systemIndices, ClusterSettings clusterSettings, DataStreamGlobalRetentionSettings globalRetentionSettings, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings, @Nullable Map maxTimestamps ) { List dataStreams = getDataStreams(state, indexNameExpressionResolver, request); List dataStreamInfos = new ArrayList<>(dataStreams.size()); for (DataStream dataStream : dataStreams) { + // For this action, we are returning whether the failure store is effectively enabled, either in metadata or by cluster setting. + // Users can use the get data stream options API to find out whether it is explicitly enabled in metadata. + boolean failureStoreEffectivelyEnabled = DataStream.isFailureStoreFeatureFlagEnabled() + && dataStream.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings); final String indexTemplate; boolean indexTemplatePreferIlmValue = true; String ilmPolicyName = null; @@ -254,6 +273,7 @@ public int compareTo(IndexInfo o) { dataStreamInfos.add( new GetDataStreamAction.Response.DataStreamInfo( dataStream, + failureStoreEffectivelyEnabled, streamHealth.getStatus(), indexTemplate, ilmPolicyName, diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java index 710ea8c15b66e..9414943cbb439 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java @@ -102,6 +102,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti Response.DataStreamInfo dataStreamInfo = new Response.DataStreamInfo( logs, + true, ClusterHealthStatus.GREEN, "index-template", null, @@ -205,6 +206,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti Response.DataStreamInfo dataStreamInfo = new Response.DataStreamInfo( logs, + true, ClusterHealthStatus.GREEN, "index-template", null, @@ -282,6 +284,7 @@ public void testManagedByDisplayValuesDontAccidentalyChange() { private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance) { var dataStream = instance.getDataStream(); + var failureStoreEffectivelyEnabled = instance.isFailureStoreEffectivelyEnabled(); var status = instance.getDataStreamStatus(); var indexTemplate = instance.getIndexTemplate(); var ilmPolicyName = instance.getIlmPolicy(); @@ -289,7 +292,7 @@ private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance) var indexSettings = instance.getIndexSettingsValues(); var templatePreferIlm = instance.templatePreferIlmValue(); var maximumTimestamp = instance.getMaximumTimestamp(); - switch (randomIntBetween(0, 7)) { + switch (randomIntBetween(0, 8)) { case 0 -> dataStream = randomValueOtherThan(dataStream, DataStreamTestHelper::randomInstance); case 1 -> status = randomValueOtherThan(status, () -> randomFrom(ClusterHealthStatus.values())); case 2 -> indexTemplate = randomBoolean() && indexTemplate != null ? null : randomAlphaOfLengthBetween(2, 10); @@ -314,9 +317,11 @@ private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance) case 7 -> maximumTimestamp = (maximumTimestamp == null) ? randomNonNegativeLong() : (usually() ? randomValueOtherThan(maximumTimestamp, ESTestCase::randomNonNegativeLong) : null); + case 8 -> failureStoreEffectivelyEnabled = failureStoreEffectivelyEnabled ? false : true; } return new Response.DataStreamInfo( dataStream, + failureStoreEffectivelyEnabled, status, indexTemplate, ilmPolicyName, @@ -355,6 +360,7 @@ private Response.DataStreamInfo generateRandomDataStreamInfo() { List> timeSeries = randomBoolean() ? generateRandomTimeSeries() : null; return new Response.DataStreamInfo( DataStreamTestHelper.randomInstance(), + randomBoolean(), ClusterHealthStatus.GREEN, randomAlphaOfLengthBetween(2, 10), randomAlphaOfLengthBetween(2, 10), diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java index 2efe881266c1b..ba4627f8955a1 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; @@ -39,6 +40,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class TransportGetDataStreamsActionTests extends ESTestCase { @@ -48,6 +51,9 @@ public class TransportGetDataStreamsActionTests extends ESTestCase { private final DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = DataStreamGlobalRetentionSettings.create( ClusterSettings.createBuiltInClusterSettings() ); + private final DataStreamFailureStoreSettings emptyDataStreamFailureStoreSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings() + ); public void testGetDataStream() { final String dataStreamName = "my-data-stream"; @@ -166,6 +172,7 @@ public void testGetTimeSeriesDataStream() { systemIndices, ClusterSettings.createBuiltInClusterSettings(), dataStreamGlobalRetentionSettings, + emptyDataStreamFailureStoreSettings, null ); assertThat( @@ -197,6 +204,7 @@ public void testGetTimeSeriesDataStream() { systemIndices, ClusterSettings.createBuiltInClusterSettings(), dataStreamGlobalRetentionSettings, + emptyDataStreamFailureStoreSettings, null ); assertThat( @@ -248,6 +256,7 @@ public void testGetTimeSeriesDataStreamWithOutOfOrderIndices() { systemIndices, ClusterSettings.createBuiltInClusterSettings(), dataStreamGlobalRetentionSettings, + emptyDataStreamFailureStoreSettings, null ); assertThat( @@ -292,6 +301,7 @@ public void testGetTimeSeriesMixedDataStream() { systemIndices, ClusterSettings.createBuiltInClusterSettings(), dataStreamGlobalRetentionSettings, + emptyDataStreamFailureStoreSettings, null ); @@ -338,6 +348,7 @@ public void testPassingGlobalRetention() { systemIndices, ClusterSettings.createBuiltInClusterSettings(), dataStreamGlobalRetentionSettings, + emptyDataStreamFailureStoreSettings, null ); assertThat(response.getGlobalRetention(), nullValue()); @@ -363,8 +374,102 @@ public void testPassingGlobalRetention() { systemIndices, ClusterSettings.createBuiltInClusterSettings(), withGlobalRetentionSettings, + emptyDataStreamFailureStoreSettings, null ); assertThat(response.getGlobalRetention(), equalTo(globalRetention)); } + + public void testDataStreamIsFailureStoreEffectivelyEnabled_disabled() { + var metadata = new Metadata.Builder(); + DataStreamTestHelper.getClusterStateWithDataStreams( + metadata, + List.of(Tuple.tuple("data-stream-1", 2)), + List.of(), + System.currentTimeMillis(), + Settings.EMPTY, + 0, + false, + false + ); + ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + + var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {}); + var response = TransportGetDataStreamsAction.innerOperation( + state, + req, + resolver, + systemIndices, + ClusterSettings.createBuiltInClusterSettings(), + dataStreamGlobalRetentionSettings, + emptyDataStreamFailureStoreSettings, + null + ); + assertThat(response.getDataStreams(), hasSize(1)); + assertThat(response.getDataStreams().getFirst().isFailureStoreEffectivelyEnabled(), is(false)); + } + + public void testDataStreamIsFailureStoreEffectivelyEnabled_enabledExplicitly() { + var metadata = new Metadata.Builder(); + DataStreamTestHelper.getClusterStateWithDataStreams( + metadata, + List.of(Tuple.tuple("data-stream-1", 2)), + List.of(), + System.currentTimeMillis(), + Settings.EMPTY, + 0, + false, + true + ); + ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + + var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {}); + var response = TransportGetDataStreamsAction.innerOperation( + state, + req, + resolver, + systemIndices, + ClusterSettings.createBuiltInClusterSettings(), + dataStreamGlobalRetentionSettings, + emptyDataStreamFailureStoreSettings, + null + ); + assertThat(response.getDataStreams(), hasSize(1)); + assertThat(response.getDataStreams().getFirst().isFailureStoreEffectivelyEnabled(), is(true)); + } + + public void testDataStreamIsFailureStoreEffectivelyEnabled_enabledByClusterSetting() { + var metadata = new Metadata.Builder(); + DataStreamTestHelper.getClusterStateWithDataStreams( + metadata, + List.of(Tuple.tuple("data-stream-1", 2)), + List.of(), + System.currentTimeMillis(), + Settings.EMPTY, + 0, + false, + false + ); + ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + + var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {}); + var response = TransportGetDataStreamsAction.innerOperation( + state, + req, + resolver, + systemIndices, + ClusterSettings.createBuiltInClusterSettings(), + dataStreamGlobalRetentionSettings, + DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "data-stream-*") + .build() + ) + ), + null + ); + assertThat(response.getDataStreams(), hasSize(1)); + assertThat(response.getDataStreams().getFirst().isFailureStoreEffectivelyEnabled(), is(true)); + } } diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_require_data_stream.yml similarity index 100% rename from modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml rename to modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_require_data_stream.yml diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml similarity index 100% rename from modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml rename to modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml new file mode 100644 index 0000000000000..90bd6fe406b57 --- /dev/null +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml @@ -0,0 +1,222 @@ +setup: + - requires: + reason: "Data stream options was added in 8.18+" + test_runner_features: [ capabilities, allowed_warnings, contains ] + capabilities: + - method: POST + path: /{index}/_doc + capabilities: [ 'failure_store_status' ] + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] + - method: PUT + path: /_cluster/settings + capabilities: [ 'data_stream_failure_store_cluster_setting' ] + + - do: + cluster.put_settings: + body: + persistent: + data_streams.failure_store.enabled: '*-matching' + + - do: + ingest.put_pipeline: + id: "failing_pipeline" + body: > + { + "processors": [ + { + "fail": { + "message" : "pipeline go boom" + } + } + ] + } + + - do: + indices.put_index_template: + name: index_template_default_fs + body: + index_patterns: default-fs-* + data_stream: {} + template: + settings: + number_of_shards: 1 + number_of_replicas: 1 + + - do: + cluster.put_component_template: + name: component_template_disabled_fs + body: + template: + data_stream_options: + failure_store: + enabled: false + + - do: + indices.put_index_template: + name: index_template_disabled_fs + body: + index_patterns: disabled-fs-* + data_stream: {} + composed_of: + - component_template_disabled_fs + template: + settings: + number_of_shards: 1 + number_of_replicas: 1 + +--- +teardown: + - do: + indices.delete_data_stream: + name: default-fs-matching + ignore: 404 + + - do: + indices.delete_data_stream: + name: disabled-fs-matching + ignore: 404 + + - do: + indices.delete_index_template: + name: index_template_disabled_fs + ignore: 404 + + - do: + cluster.delete_component_template: + name: component_template_disabled_fs + ignore: 404 + + - do: + indices.delete_index_template: + name: index_template_default_fs + ignore: 404 + + - do: + ingest.delete_pipeline: + id: "failing_pipeline" + ignore: 404 + + - do: + cluster.put_settings: + body: + persistent: + data_streams.failure_store.enabled: null + +--- +"Redirect ingest failure when auto-creating data stream to failure store when enabled by setting": + - do: + index: + index: default-fs-matching + refresh: true + pipeline: 'failing_pipeline' + body: + '@timestamp': '2020-12-12' + foo: bar + - match: { failure_store: used } + - match: { _index: '/\.fs-default-fs-matching-(\d{4}\.\d{2}\.\d{2}-)?\d{6}/' } + +--- +"Redirect ingest failure into pre-existing data stream to failure store when enabled by setting": + - do: + indices.create_data_stream: + name: default-fs-matching + + - do: + index: + index: default-fs-matching + refresh: true + pipeline: 'failing_pipeline' + body: + '@timestamp': '2020-12-12' + foo: bar + - match: { failure_store: used } + - match: { _index: '/\.fs-default-fs-matching-(\d{4}\.\d{2}\.\d{2}-)?\d{6}/' } + +--- +"Do not redirect ingest failure when auto-creating data stream to failure store when enabled by setting but disabled in template": + - do: + index: + index: disabled-fs-matching + refresh: true + pipeline: 'failing_pipeline' + body: + '@timestamp': '2020-12-12' + foo: bar + catch: '/pipeline go boom/' + +--- +"Do not redirect ingest failure into pre-existing data stream to failure store when enabled by setting but disabled in template": + - do: + indices.create_data_stream: + name: disabled-fs-matching + + - do: + index: + index: disabled-fs-matching + refresh: true + pipeline: 'failing_pipeline' + body: + '@timestamp': '2020-12-12' + foo: bar + catch: '/pipeline go boom/' + +--- +"Redirect mapping failure when auto-creating data stream to failure store when enabled by setting": + - do: + index: + index: default-fs-matching + refresh: true + body: + '@timestamp': 'not a timestamp' + foo: bar + - match: { failure_store: used } + - match: { _index: '/\.fs-default-fs-matching-(\d{4}\.\d{2}\.\d{2}-)?\d{6}/' } + +--- +"Redirect mapping failure into pre-existing data stream to failure store when enabled by setting": + - do: + indices.create_data_stream: + name: default-fs-matching + + - do: + index: + index: default-fs-matching + refresh: true + body: + '@timestamp': 'not a timestamp' + foo: bar + - match: { failure_store: used } + - match: { _index: '/\.fs-default-fs-matching-(\d{4}\.\d{2}\.\d{2}-)?\d{6}/' } + +--- +"Do not redirect mapping failure when auto-creating data stream to failure store when enabled by setting but disabled in template": + - do: + index: + index: disabled-fs-matching + refresh: true + body: + '@timestamp': 'not a timestamp' + foo: bar + catch: '/failed to parse field/' + +--- +"Do not redirect mapping failure into pre-existing data stream to failure store when enabled by setting but disabled in template": + - do: + indices.create_data_stream: + name: disabled-fs-matching + + - do: + index: + index: disabled-fs-matching + refresh: true + body: + '@timestamp': 'not a timestamp' + foo: bar + catch: '/failed to parse field/' + +# See also DataStreamOptionsIT for tests of the interaction between the failure store cluster setting and using +# the /_data_stream/{name}/_options API to explicitly enable and disable the failure store. (At time of writing, these +# can only be done in a Java REST test case, not a YAML one, because the failure store is behind a feature gate and so +# the REST API spec has not been added.) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 9bc2487f89b12..ab8b66e765e91 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -144,6 +144,7 @@ static TransportVersion def(int id) { public static final TransportVersion EQL_ALLOW_PARTIAL_SEARCH_RESULTS = def(8_809_00_0); public static final TransportVersion NODE_VERSION_INFORMATION_WITH_MIN_READ_ONLY_INDEX_VERSION = def(8_810_00_0); public static final TransportVersion ERROR_TRACE_IN_TRANSPORT_HEADER = def(8_811_00_0); + public static final TransportVersion FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING = def(8_812_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index c233ed57b748e..cc96954c8a8e4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -480,13 +480,19 @@ public CreateIndexRequest requireDataStream(boolean requireDataStream) { return this; } + /** + * Returns whether the failure store should be initialized. N.B. If true, failure store index creation will be performed regardless of + * whether the template indicates that the failure store is enabled. + */ public boolean isInitializeFailureStore() { return initializeFailureStore; } /** * Set whether this CreateIndexRequest should initialize the failure store on data stream creation. This can be necessary when, for - * example, a failure occurs while trying to ingest a document into a data stream that has to be auto-created. + * example, a failure occurs while trying to ingest a document into a data stream that has to be auto-created. N.B. If true, failure + * store index creation will be performed regardless of whether the template indicates that the failure store is enabled. It is the + * caller's responsibility to ensure that this is correct. */ public CreateIndexRequest initializeFailureStore(boolean initializeFailureStore) { this.initializeFailureStore = initializeFailureStore; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index 4df228240add5..b137809047d18 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -98,6 +99,7 @@ final class BulkOperation extends ActionRunnable { private final Set failedRolloverRequests = ConcurrentCollections.newConcurrentSet(); private final Map shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap(); private final FailureStoreMetrics failureStoreMetrics; + private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings; BulkOperation( Task task, @@ -111,7 +113,8 @@ final class BulkOperation extends ActionRunnable { LongSupplier relativeTimeProvider, long startTimeNanos, ActionListener listener, - FailureStoreMetrics failureStoreMetrics + FailureStoreMetrics failureStoreMetrics, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings ) { this( task, @@ -127,7 +130,8 @@ final class BulkOperation extends ActionRunnable { listener, new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()), new FailureStoreDocumentConverter(), - failureStoreMetrics + failureStoreMetrics, + dataStreamFailureStoreSettings ); } @@ -145,7 +149,8 @@ final class BulkOperation extends ActionRunnable { ActionListener listener, ClusterStateObserver observer, FailureStoreDocumentConverter failureStoreDocumentConverter, - FailureStoreMetrics failureStoreMetrics + FailureStoreMetrics failureStoreMetrics, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings ) { super(listener); this.task = task; @@ -164,6 +169,7 @@ final class BulkOperation extends ActionRunnable { this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN); this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures()); this.failureStoreMetrics = failureStoreMetrics; + this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; } @Override @@ -544,7 +550,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques // Do not redirect documents to a failure store that were already headed to one. var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest); if (isFailureStoreRequest == false - && failureStoreCandidate.isFailureStoreEnabled() + && failureStoreCandidate.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings) && error instanceof VersionConflictEngineException == false && error instanceof EsRejectedExecutionException == false) { // Prepare the data stream failure store if necessary @@ -577,7 +583,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques if (isFailureStoreRequest) { return IndexDocFailureStoreStatus.FAILED; } - if (failureStoreCandidate.isFailureStoreEnabled() == false) { + if (failureStoreCandidate.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings) == false) { return IndexDocFailureStoreStatus.NOT_ENABLED; } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index e83bca4b661c9..24534826f8e3e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -343,9 +343,11 @@ public boolean isForceExecution() { * @param indexName The index name to check. * @param metadata Cluster state metadata. * @param epochMillis A timestamp to use when resolving date math in the index name. - * @return true if this is not a simulation, and the given index name corresponds to a data stream with a failure store - * or if it matches a template that has a data stream failure store enabled. Returns false if the index name corresponds to a - * data stream, but it doesn't have the failure store enabled. Returns null when it doesn't correspond to a data stream. + * @return true if this is not a simulation, and the given index name corresponds to a data stream with a failure store, or if it + * matches a template that has a data stream failure store enabled, or if it matches a data stream template with no failure store + * option specified and the name matches the cluster setting to enable the failure store. Returns false if the index name + * corresponds to a data stream, but it doesn't have the failure store enabled by one of those conditions. Returns null when it + * doesn't correspond to a data stream. */ protected abstract Boolean resolveFailureStore(String indexName, Metadata metadata, long epochMillis); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index e2c73349b93ec..65264faf50129 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -17,7 +17,6 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -35,6 +34,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -85,6 +85,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction { private final IndexNameExpressionResolver indexNameExpressionResolver; private final OriginSettingClient rolloverClient; private final FailureStoreMetrics failureStoreMetrics; + private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings; @Inject public TransportBulkAction( @@ -98,7 +99,8 @@ public TransportBulkAction( IndexNameExpressionResolver indexNameExpressionResolver, IndexingPressure indexingPressure, SystemIndices systemIndices, - FailureStoreMetrics failureStoreMetrics + FailureStoreMetrics failureStoreMetrics, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings ) { this( threadPool, @@ -112,7 +114,8 @@ public TransportBulkAction( indexingPressure, systemIndices, threadPool::relativeTimeInNanos, - failureStoreMetrics + failureStoreMetrics, + dataStreamFailureStoreSettings ); } @@ -128,7 +131,8 @@ public TransportBulkAction( IndexingPressure indexingPressure, SystemIndices systemIndices, LongSupplier relativeTimeProvider, - FailureStoreMetrics failureStoreMetrics + FailureStoreMetrics failureStoreMetrics, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings ) { this( TYPE, @@ -144,7 +148,8 @@ public TransportBulkAction( indexingPressure, systemIndices, relativeTimeProvider, - failureStoreMetrics + failureStoreMetrics, + dataStreamFailureStoreSettings ); } @@ -162,7 +167,8 @@ public TransportBulkAction( IndexingPressure indexingPressure, SystemIndices systemIndices, LongSupplier relativeTimeProvider, - FailureStoreMetrics failureStoreMetrics + FailureStoreMetrics failureStoreMetrics, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings ) { super( bulkAction, @@ -176,6 +182,7 @@ public TransportBulkAction( systemIndices, relativeTimeProvider ); + this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; Objects.requireNonNull(relativeTimeProvider); this.featureService = featureService; this.client = client; @@ -282,7 +289,7 @@ private void populateMissingTargets( for (DocWriteRequest request : bulkRequest.requests) { // Delete requests should not attempt to create the index (if the index does not exist), unless an external versioning is used. - if (request.opType() == OpType.DELETE + if (request.opType() == DocWriteRequest.OpType.DELETE && request.versionType() != VersionType.EXTERNAL && request.versionType() != VersionType.EXTERNAL_GTE) { continue; @@ -492,7 +499,7 @@ private void failRequestsWhenPrerequisiteActionFailed( static void prohibitAppendWritesInBackingIndices(DocWriteRequest writeRequest, IndexAbstraction indexAbstraction) { DocWriteRequest.OpType opType = writeRequest.opType(); - if ((opType == OpType.CREATE || opType == OpType.INDEX) == false) { + if ((opType == DocWriteRequest.OpType.CREATE || opType == DocWriteRequest.OpType.INDEX) == false) { // op type not create or index, then bail early return; } @@ -588,7 +595,8 @@ void executeBulk( relativeTimeNanosProvider, startTimeNanos, listener, - failureStoreMetrics + failureStoreMetrics, + dataStreamFailureStoreSettings ).run(); } @@ -596,7 +604,7 @@ void executeBulk( * See {@link #resolveFailureStore(String, Metadata, long)} */ // Visibility for testing - static Boolean resolveFailureInternal(String indexName, Metadata metadata, long epochMillis) { + Boolean resolveFailureInternal(String indexName, Metadata metadata, long epochMillis) { if (DataStream.isFailureStoreFeatureFlagEnabled() == false) { return null; } @@ -604,7 +612,7 @@ static Boolean resolveFailureInternal(String indexName, Metadata metadata, long if (resolution != null) { return resolution; } - return resolveFailureStoreFromTemplate(indexName, metadata); + return resolveFailureStoreFromTemplate(indexName, metadata, epochMillis); } @Override @@ -619,7 +627,7 @@ protected Boolean resolveFailureStore(String indexName, Metadata metadata, long * @param epochMillis A timestamp to use when resolving date math in the index name. * @return true if the given index name corresponds to an existing data stream with a failure store enabled. */ - private static Boolean resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) { + private Boolean resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) { if (indexName == null) { return null; } @@ -636,7 +644,7 @@ private static Boolean resolveFailureStoreFromMetadata(String indexName, Metadat DataStream targetDataStream = DataStream.resolveDataStream(indexAbstraction, metadata); // We will store the failure if the write target belongs to a data stream with a failure store. - return targetDataStream != null && targetDataStream.isFailureStoreEnabled(); + return targetDataStream != null && targetDataStream.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings); } /** @@ -644,18 +652,20 @@ private static Boolean resolveFailureStoreFromMetadata(String indexName, Metadat * a data stream feature, the method returns true/false only if it is a data stream template, otherwise null. * @param indexName The index name to check. * @param metadata Cluster state metadata. - * @return true the associated index template has failure store enabled, false if the failure store is disabled or it's not specified, - * and null if the template is not a data stream template. - * Visible for testing + * @param epochMillis A timestamp to use when resolving date math in the index name. + * @return true the associated index template has failure store enabled, false if the failure store is disabled, true or false according + * to the cluster setting if there is a data stream template with no failure store option specified, and null if no template is + * found or if the template is not a data stream template. */ @Nullable - static Boolean resolveFailureStoreFromTemplate(String indexName, Metadata metadata) { + private Boolean resolveFailureStoreFromTemplate(String indexName, Metadata metadata, long epochMillis) { if (indexName == null) { return null; } // Check to see if the index name matches any templates such that an index would have been attributed // We don't check v1 templates at all because failure stores can only exist on data streams via a v2 template + // N.B. This currently does date math resolution itself and does *not* use epochMillis (it gets the system time again) String template = MetadataIndexTemplateService.findV2Template(metadata, indexName, false); if (template != null) { // Check if this is a data stream template or if it is just a normal index. @@ -666,7 +676,12 @@ static Boolean resolveFailureStoreFromTemplate(String indexName, Metadata metada composableIndexTemplate, metadata.componentTemplates() ).mapAndGet(DataStreamOptions.Template::toDataStreamOptions); - return dataStreamOptions != null && dataStreamOptions.isFailureStoreEnabled(); + return DataStream.isFailureStoreEffectivelyEnabled( + dataStreamOptions, + dataStreamFailureStoreSettings, + IndexNameExpressionResolver.resolveDateMathExpression(indexName, epochMillis), + systemIndices + ); } } diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index 93c40ad18cc8a..883fc543749c2 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -234,6 +234,7 @@ public static class DataStreamInfo implements SimpleDiffable, To private final DataStream dataStream; private final ClusterHealthStatus dataStreamStatus; + private final boolean failureStoreEffectivelyEnabled; // Must be serialized independently of dataStream as depends on settings @Nullable private final String indexTemplate; @Nullable @@ -247,6 +248,7 @@ public static class DataStreamInfo implements SimpleDiffable, To public DataStreamInfo( DataStream dataStream, + boolean failureStoreEffectivelyEnabled, ClusterHealthStatus dataStreamStatus, @Nullable String indexTemplate, @Nullable String ilmPolicyName, @@ -256,6 +258,7 @@ public DataStreamInfo( @Nullable Long maximumTimestamp ) { this.dataStream = dataStream; + this.failureStoreEffectivelyEnabled = failureStoreEffectivelyEnabled; this.dataStreamStatus = dataStreamStatus; this.indexTemplate = indexTemplate; this.ilmPolicyName = ilmPolicyName; @@ -267,22 +270,32 @@ public DataStreamInfo( @SuppressWarnings("unchecked") DataStreamInfo(StreamInput in) throws IOException { - this( - DataStream.read(in), - ClusterHealthStatus.readFrom(in), - in.readOptionalString(), - in.readOptionalString(), - in.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0) ? in.readOptionalWriteable(TimeSeries::new) : null, - in.getTransportVersion().onOrAfter(V_8_11_X) ? in.readMap(Index::new, IndexProperties::new) : Map.of(), - in.getTransportVersion().onOrAfter(V_8_11_X) ? in.readBoolean() : true, - in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readOptionalVLong() : null - ); + this.dataStream = DataStream.read(in); + this.failureStoreEffectivelyEnabled = in.getTransportVersion() + .onOrAfter(TransportVersions.FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING) + ? in.readBoolean() + : dataStream.isFailureStoreExplicitlyEnabled(); // Revert to the behaviour before this field was added + this.dataStreamStatus = ClusterHealthStatus.readFrom(in); + this.indexTemplate = in.readOptionalString(); + this.ilmPolicyName = in.readOptionalString(); + this.timeSeries = in.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0) + ? in.readOptionalWriteable(TimeSeries::new) + : null; + this.indexSettingsValues = in.getTransportVersion().onOrAfter(V_8_11_X) + ? in.readMap(Index::new, IndexProperties::new) + : Map.of(); + this.templatePreferIlmValue = in.getTransportVersion().onOrAfter(V_8_11_X) ? in.readBoolean() : true; + this.maximumTimestamp = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readOptionalVLong() : null; } public DataStream getDataStream() { return dataStream; } + public boolean isFailureStoreEffectivelyEnabled() { + return failureStoreEffectivelyEnabled; + } + public ClusterHealthStatus getDataStreamStatus() { return dataStreamStatus; } @@ -318,6 +331,9 @@ public Long getMaximumTimestamp() { @Override public void writeTo(StreamOutput out) throws IOException { dataStream.writeTo(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING)) { + out.writeBoolean(failureStoreEffectivelyEnabled); + } dataStreamStatus.writeTo(out); out.writeOptionalString(indexTemplate); out.writeOptionalString(ilmPolicyName); @@ -398,7 +414,7 @@ public XContentBuilder toXContent( } if (DataStream.isFailureStoreFeatureFlagEnabled()) { builder.startObject(DataStream.FAILURE_STORE_FIELD.getPreferredName()); - builder.field(FAILURE_STORE_ENABLED.getPreferredName(), dataStream.isFailureStoreEnabled()); + builder.field(FAILURE_STORE_ENABLED.getPreferredName(), failureStoreEffectivelyEnabled); builder.field( DataStream.ROLLOVER_ON_WRITE_FIELD.getPreferredName(), dataStream.getFailureIndices().isRolloverOnWrite() @@ -477,6 +493,7 @@ public boolean equals(Object o) { DataStreamInfo that = (DataStreamInfo) o; return templatePreferIlmValue == that.templatePreferIlmValue && Objects.equals(dataStream, that.dataStream) + && failureStoreEffectivelyEnabled == that.failureStoreEffectivelyEnabled && dataStreamStatus == that.dataStreamStatus && Objects.equals(indexTemplate, that.indexTemplate) && Objects.equals(ilmPolicyName, that.ilmPolicyName) @@ -490,6 +507,7 @@ public int hashCode() { return Objects.hash( dataStream, dataStreamStatus, + failureStoreEffectivelyEnabled, indexTemplate, ilmPolicyName, timeSeries, diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 9f4231c25dfca..4343451256920 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -908,12 +908,18 @@ public IndexRequest setRequireAlias(boolean requireAlias) { } /** - * Transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire. + * Returns a transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire. N.B. If + * true, the failure store will be used regardless of whether the metadata indicates that the failure store is enabled. */ public boolean isWriteToFailureStore() { return writeToFailureStore; } + /** + * Sets a transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire. N.B. If + * true, the failure store will be used regardless of whether the metadata indicates that the failure store is enabled. It is the + * caller's responsibility to ensure that this is correct. + */ public IndexRequest setWriteToFailureStore(boolean writeToFailureStore) { this.writeToFailureStore = writeToFailureStore; return this; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index db602ef6ef291..c1b015dc3700b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -117,6 +118,7 @@ public static boolean isFailureStoreFeatureFlagEnabled() { private final DataStreamIndices backingIndices; private final DataStreamIndices failureIndices; + // visible for testing public DataStream( String name, List indices, @@ -150,7 +152,6 @@ public DataStream( ); } - // visible for testing DataStream( String name, long generation, @@ -299,7 +300,15 @@ public boolean rolloverOnWrite() { * @return true if it's a system index or has a dot-prefixed name. */ public boolean isInternal() { - return isSystem() || name.charAt(0) == '.'; + return isSystem() || isDotPrefixName(name); + } + + private static boolean isInternalName(String name, SystemIndices systemIndices) { + return isDotPrefixName(name) || systemIndices.isSystemDataStream(name); + } + + private static boolean isDotPrefixName(String name) { + return name.charAt(0) == '.'; } /** @@ -418,12 +427,55 @@ public boolean isAllowCustomRouting() { } /** - * Determines if this data stream has its failure store enabled or not. Currently, the failure store - * is enabled only when a user has explicitly requested it. - * @return true, if the user has explicitly enabled the failure store. + * Determines whether this data stream has its failure store enabled explicitly in its metadata. */ - public boolean isFailureStoreEnabled() { - return dataStreamOptions.isFailureStoreEnabled(); + public boolean isFailureStoreExplicitlyEnabled() { + return dataStreamOptions.failureStore() != null && Boolean.TRUE.equals(dataStreamOptions.failureStore().enabled()); + } + + /** + * Returns whether this data stream has its failure store enabled, either explicitly in its metadata or implicitly via settings. + * + *

If the failure store is either explicitly enabled or explicitly disabled in its options metadata, that value is returned. If not, + * it checks whether its name matches one of the patterns in the settings, and that the data stream is not internal (i.e. neither a + * dot-prefixed nor a system data stream). + * + * @param dataStreamFailureStoreSettings The settings to use to determine whether the failure store should be implicitly enabled + */ + public boolean isFailureStoreEffectivelyEnabled(DataStreamFailureStoreSettings dataStreamFailureStoreSettings) { + return isFailureStoreEffectivelyEnabled(dataStreamOptions, dataStreamFailureStoreSettings, name, isInternal()); + } + + /** + * Returns whether a data stream has its failure store enabled, either explicitly in its metadata or implicitly via settings, based + * on the given parameters. The logic is equivalent to that in + * {@link #isFailureStoreEffectivelyEnabled(DataStreamFailureStoreSettings)}. + * + * @param options The {@link DataStreamOptions} for the data stream (which may be null) + * @param dataStreamFailureStoreSettings The settings to use to determine whether the failure store should be implicitly enabled + * @param name The name of the data stream + * @param systemIndices The {@link SystemIndices} instance to use to determine whether this is a system data stream + */ + public static boolean isFailureStoreEffectivelyEnabled( + @Nullable DataStreamOptions options, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings, + String name, + SystemIndices systemIndices + ) { + return isFailureStoreEffectivelyEnabled(options, dataStreamFailureStoreSettings, name, isInternalName(name, systemIndices)); + } + + private static boolean isFailureStoreEffectivelyEnabled( + DataStreamOptions options, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings, + String name, + boolean isInternal + ) { + if (options != null && options.failureStore() != null && options.failureStore().enabled() != null) { + return options.failureStore().enabled(); + } else { + return (isInternal == false) && dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName(name); + } } @Nullable @@ -1106,7 +1158,7 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getTransportVersion() .between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, DataStream.ADD_DATA_STREAM_OPTIONS_VERSION)) { - out.writeBoolean(isFailureStoreEnabled()); + out.writeBoolean(isFailureStoreExplicitlyEnabled()); } if (out.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) { out.writeCollection(failureIndices.indices); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreSettings.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreSettings.java new file mode 100644 index 0000000000000..c5076d01eabb0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreSettings.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; + +import java.util.List; +import java.util.function.Predicate; + +import static org.elasticsearch.core.Predicates.never; + +/** + * Holder for the data stream global settings relating to the data stream failure store. This defines, validates, and monitors the settings. + */ +public class DataStreamFailureStoreSettings { + + private static final Logger logger = LogManager.getLogger(DataStreamFailureStoreSettings.class); + + public static final Setting> DATA_STREAM_FAILURE_STORED_ENABLED_SETTING = Setting.stringListSetting( + "data_streams.failure_store.enabled", + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private Predicate failureStoreEnabledByName; + + private DataStreamFailureStoreSettings() { + failureStoreEnabledByName = never(); + } + + /** + * Creates an instance and initialises the cluster settings listeners. + * + * @param clusterSettings The cluster settings to initialize the instance from and to watch for updates to + */ + public static DataStreamFailureStoreSettings create(ClusterSettings clusterSettings) { + DataStreamFailureStoreSettings dataStreamFailureStoreSettings = new DataStreamFailureStoreSettings(); + if (DataStream.isFailureStoreFeatureFlagEnabled()) { + clusterSettings.initializeAndWatch( + DATA_STREAM_FAILURE_STORED_ENABLED_SETTING, + dataStreamFailureStoreSettings::setEnabledByNamePatterns + ); + } + return dataStreamFailureStoreSettings; + } + + /** + * Returns whether the settings indicate that the failure store should be enabled by the cluster settings for the given name. + * + * @param name The data stream name + */ + public boolean failureStoreEnabledForDataStreamName(String name) { + assert DataStream.isFailureStoreFeatureFlagEnabled() : "Testing whether failure store is enabled should be behind by feature flag"; + return failureStoreEnabledByName.test(name); + } + + private void setEnabledByNamePatterns(List patterns) { + failureStoreEnabledByName = Regex.simpleMatcher(patterns.toArray(String[]::new)); + logger.info("Updated data stream name patterns for enabling failure store to [{}]", patterns); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java index 51e13c05e6892..423b698442581 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java @@ -70,16 +70,6 @@ public boolean isEmpty() { return failureStore == null; } - /** - * Determines if this data stream has its failure store enabled or not. Currently, the failure store - * is enabled only when a user has explicitly requested it. - * - * @return true, if the user has explicitly enabled the failure store. - */ - public boolean isFailureStoreEnabled() { - return failureStore != null && Boolean.TRUE.equals(failureStore.enabled()); - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(failureStore); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index 2ce91b66fa789..a2211cf8ea893 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -621,13 +621,11 @@ private static boolean shouldTrackConcreteIndex(Context context, Index index) { IndicesOptions options = context.getOptions(); if (DataStream.isFailureStoreFeatureFlagEnabled() && context.options.allowFailureIndices() == false) { DataStream parentDataStream = context.getState().metadata().getIndicesLookup().get(index.getName()).getParentDataStream(); - if (parentDataStream != null && parentDataStream.isFailureStoreEnabled()) { - if (parentDataStream.isFailureStoreIndex(index.getName())) { - if (options.ignoreUnavailable()) { - return false; - } else { - throw new FailureIndexNotSupportedException(index); - } + if (parentDataStream != null && parentDataStream.isFailureStoreIndex(index.getName())) { + if (options.ignoreUnavailable()) { + return false; + } else { + throw new FailureIndexNotSupportedException(index); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index a4fa139043e50..f8545c6cf1686 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -2486,7 +2486,6 @@ private static boolean assertContainsIndexIfDataStream(DataStream parent, IndexM assert parent == null || parent.getIndices().stream().anyMatch(index -> indexMetadata.getIndex().getName().equals(index.getName())) || (DataStream.isFailureStoreFeatureFlagEnabled() - && parent.isFailureStoreEnabled() && parent.getFailureIndices() .getIndices() .stream() @@ -2512,7 +2511,7 @@ private static void collectDataStreams( for (Index i : dataStream.getIndices()) { indexToDataStreamLookup.put(i.getName(), dataStream); } - if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.isFailureStoreEnabled()) { + if (DataStream.isFailureStoreFeatureFlagEnabled()) { for (Index i : dataStream.getFailureIndices().getIndices()) { indexToDataStreamLookup.put(i.getName(), dataStream); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 0de87c7226380..254646f8e71a9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -205,7 +205,8 @@ static ClusterState createDataStream( * @param request The create data stream request * @param backingIndices List of backing indices. May be empty * @param writeIndex Write index for the data stream. If null, a new write index will be created. - * @param initializeFailureStore Whether the failure store should be initialized + * @param initializeFailureStore Whether the failure store should be initialized (N.B. if true, failure store index creation will be + * performed regardless of whether the template indicates that the failure store is enabled) * @return Cluster state containing the new data stream */ static ClusterState createDataStream( @@ -265,12 +266,13 @@ static ClusterState createDataStream( ? MetadataIndexTemplateService.resolveDataStreamOptions(template, systemDataStreamDescriptor.getComponentTemplates()) : MetadataIndexTemplateService.resolveDataStreamOptions(template, metadata.componentTemplates()); final DataStreamOptions dataStreamOptions = dataStreamOptionsTemplate.mapAndGet(DataStreamOptions.Template::toDataStreamOptions); - var isFailureStoreEnabled = dataStreamOptions != null && dataStreamOptions.isFailureStoreEnabled(); // If we need to create a failure store, do so first. Do not reroute during the creation since we will do - // that as part of creating the backing index if required. + // that as part of creating the backing index if required. N.B. This is done if initializeFailureStore, + // regardless of whether the template indicates that the failure store is enabled: it is the caller's + // responsibility to check that before setting. IndexMetadata failureStoreIndex = null; - if (isFailureStoreEnabled && initializeFailureStore) { + if (initializeFailureStore) { if (isSystem) { throw new IllegalArgumentException("Failure stores are not supported on system data streams"); } @@ -308,8 +310,7 @@ static ClusterState createDataStream( } assert writeIndex != null; assert writeIndex.mapping() != null : "no mapping found for backing index [" + writeIndex.getIndex().getName() + "]"; - assert isFailureStoreEnabled == false || initializeFailureStore == false || failureStoreIndex != null - : "failure store should have an initial index"; + assert initializeFailureStore == false || failureStoreIndex != null : "failure store should have an initial index"; assert failureStoreIndex == null || failureStoreIndex.mapping() != null : "no mapping found for failure store [" + failureStoreIndex.getIndex().getName() + "]"; diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index a01571b8c237d..aecc750bd4e39 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -37,6 +37,8 @@ import org.elasticsearch.cluster.coordination.MasterHistory; import org.elasticsearch.cluster.coordination.NoMasterBlockService; import org.elasticsearch.cluster.coordination.Reconfigurator; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexGraveyard; @@ -133,8 +135,12 @@ import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.watcher.ResourceWatcherService; +import java.util.Objects; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toSet; /** * Encapsulates all valid cluster level settings. @@ -205,7 +211,7 @@ public void apply(Settings value, Settings current, Settings previous) { } } - public static final Set> BUILT_IN_CLUSTER_SETTINGS = Set.of( + public static final Set> BUILT_IN_CLUSTER_SETTINGS = Stream.of( AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, @@ -616,6 +622,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE, DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING, DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING, - ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME - ); + ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME, + DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null + ).filter(Objects::nonNull).collect(toSet()); } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 5cfe1c104d45e..212820594d43e 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.MasterHistoryService; import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings; import org.elasticsearch.cluster.metadata.IndexMetadataVerifier; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -627,7 +628,6 @@ private ScriptService createScriptService(SettingsModule settingsModule, ThreadP } private DataStreamGlobalRetentionSettings createDataStreamServicesAndGlobalRetentionResolver( - Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, @@ -637,6 +637,10 @@ private DataStreamGlobalRetentionSettings createDataStreamServicesAndGlobalReten clusterService.getClusterSettings() ); modules.bindToInstance(DataStreamGlobalRetentionSettings.class, dataStreamGlobalRetentionSettings); + modules.bindToInstance( + DataStreamFailureStoreSettings.class, + DataStreamFailureStoreSettings.create(clusterService.getClusterSettings()) + ); modules.bindToInstance( MetadataCreateDataStreamService.class, new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService) @@ -859,7 +863,6 @@ private void construct( ); final DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = createDataStreamServicesAndGlobalRetentionResolver( - settings, threadPool, clusterService, indicesService, diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java index cd154d2e5c50a..b4641a49d6977 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -33,6 +34,9 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler { private static final String PERSISTENT = "persistent"; private static final String TRANSIENT = "transient"; + // TODO: Remove this and use a single cluster feature / capability for the whole failure store feature when the feature flag is removed + private static final String DATA_STREAM_FAILURE_STORE_CLUSTER_SETTING_CAPABILITY = "data_stream_failure_store_cluster_setting"; + @Override public List routes() { return List.of(new Route(PUT, "/_cluster/settings")); @@ -73,4 +77,9 @@ protected Set responseParams() { public boolean canTripCircuitBreaker() { return false; } + + @Override + public Set supportedCapabilities() { + return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DATA_STREAM_FAILURE_STORE_CLUSTER_SETTING_CAPABILITY) : Set.of(); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index 1a30fae1ebc00..eb034196f0d38 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -753,7 +753,7 @@ public void testValidation() throws Exception { // ensure no replicate data stream .promoteDataStream(); rolloverTarget = dataStream.getName(); - if (dataStream.isFailureStoreEnabled() && randomBoolean()) { + if (dataStream.isFailureStoreExplicitlyEnabled() && randomBoolean()) { defaultSelectorOptions = IndicesOptions.SelectorOptions.FAILURES; sourceIndexName = dataStream.getFailureStoreWriteIndex().getName(); defaultRolloverIndexName = DataStream.getDefaultFailureStoreName( diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java index 9a12b05d1cfd8..9360ce1719634 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.coordination.NoMasterBlockService; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -40,6 +41,7 @@ import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -89,6 +91,7 @@ public class BulkOperationTests extends ESTestCase { private final String dataStreamName = "my_data_stream"; private final String fsDataStreamName = "my_failure_store_data_stream"; private final String fsRolloverDataStreamName = "my_failure_store_to_be_rolled_over_data_stream"; + private final String fsBySettingsDataStreamName = "my_failure_store_enabled_by_setting_data_stream"; private final IndexMetadata indexMetadata = IndexMetadata.builder(indexName) .settings( @@ -120,6 +123,12 @@ public class BulkOperationTests extends ESTestCase { private final IndexMetadata ds3FailureStore2 = DataStreamTestHelper.createFailureStore(fsRolloverDataStreamName, 2, millis) .numberOfShards(1) .build(); + private final IndexMetadata ds4BackingIndex1 = DataStreamTestHelper.createBackingIndex(fsBySettingsDataStreamName, 1, millis) + .numberOfShards(2) + .build(); + private final IndexMetadata ds4FailureStore1 = DataStreamTestHelper.createFailureStore(fsBySettingsDataStreamName, 1, millis) + .numberOfShards(1) + .build(); private final DataStream dataStream1 = DataStreamTestHelper.newInstance( dataStreamName, @@ -137,6 +146,11 @@ public class BulkOperationTests extends ESTestCase { DataStream.DataStreamIndices.failureIndicesBuilder(List.of(ds3FailureStore1.getIndex())).setRolloverOnWrite(true).build() ) .build(); + private final DataStream dataStream4 = DataStream.builder(fsBySettingsDataStreamName, List.of(ds4BackingIndex1.getIndex())) + .setGeneration(1) + .setDataStreamOptions(DataStreamOptions.EMPTY) + .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(List.of(ds4FailureStore1.getIndex())).build()) + .build(); private final ClusterState DEFAULT_STATE = ClusterState.builder(ClusterName.DEFAULT) .metadata( @@ -172,11 +186,24 @@ public class BulkOperationTests extends ESTestCase { ds3BackingIndex1.getIndex().getName(), ds3BackingIndex1, ds3FailureStore1.getIndex().getName(), - ds3FailureStore1 + ds3FailureStore1, + ds4BackingIndex1.getIndex().getName(), + ds4BackingIndex1, + ds4FailureStore1.getIndex().getName(), + ds4FailureStore1 ) ) .dataStreams( - Map.of(dataStreamName, dataStream1, fsDataStreamName, dataStream2, fsRolloverDataStreamName, dataStream3), + Map.of( + dataStreamName, + dataStream1, + fsDataStreamName, + dataStream2, + fsRolloverDataStreamName, + dataStream3, + fsBySettingsDataStreamName, + dataStream4 + ), Map.of() ) .build() @@ -409,6 +436,58 @@ public void testFailingDocumentRedirectsToFailureStore() throws Exception { assertThat(failedItem.getFailureStoreStatus(), equalTo(IndexDocFailureStoreStatus.USED)); } + public void testFailingDocumentRedirectsToFailureStoreWhenEnabledByClusterSetting() { + Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled()); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add( + new IndexRequest(fsBySettingsDataStreamName).id("1").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE) + ); + + NodeClient client = getNodeClient( + thatFailsDocuments( + Map.of(new IndexAndId(ds4BackingIndex1.getIndex().getName(), "1"), () -> new MapperException("mapping go boom")) + ) + ); + DataStreamFailureStoreSettings dataStreamFailureStoreSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), fsBySettingsDataStreamName) + .build() + ) + ); + + // Without the cluster setting, this bulk request should fail: + BulkResponse bulkItemResponsesWithoutClusterSetting = safeAwait(l -> newBulkOperation(client, bulkRequest, l).run()); + assertThat(bulkItemResponsesWithoutClusterSetting.hasFailures(), is(true)); + BulkItemResponse failedItem = Arrays.stream(bulkItemResponsesWithoutClusterSetting.getItems()) + .filter(BulkItemResponse::isFailed) + .findFirst() + .orElseThrow(() -> new AssertionError("Could not find redirected item")); + assertThat(failedItem.getFailure().getCause(), is(instanceOf(MapperException.class))); + assertThat(failedItem.getFailure().getCause().getMessage(), is(equalTo("mapping go boom"))); + + // With a cluster setting to enable the failure store for this data stream, the same request should be redirected: + BulkResponse bulkItemResponsesUsingClusterSetting = safeAwait( + l -> newBulkOperation( + DEFAULT_STATE, + client, + bulkRequest, + new AtomicArray<>(bulkRequest.numberOfActions()), + mockObserver(DEFAULT_STATE), + l, + new FailureStoreDocumentConverter(), + dataStreamFailureStoreSettings + ).run() + ); + assertThat(bulkItemResponsesUsingClusterSetting.hasFailures(), is(false)); + BulkItemResponse redirectedItem = Arrays.stream(bulkItemResponsesUsingClusterSetting.getItems()) + .filter(item -> item.getIndex().equals(ds4FailureStore1.getIndex().getName())) + .findFirst() + .orElseThrow(() -> new AssertionError("Could not find redirected item")); + assertThat(redirectedItem.getFailureStoreStatus(), equalTo(IndexDocFailureStoreStatus.USED)); + } + /** * A bulk operation to a data stream with a failure store enabled may still partially fail if the redirected documents experience * a shard-level failure while writing to the failure store indices. @@ -957,6 +1036,7 @@ private static BulkItemResponse requestToFailedResponse(BulkItemRequest itemRequ /** * Create a client that redirects expected actions to the provided function and fails if an unexpected operation happens. + * * @param onShardAction Called when TransportShardBulkAction is executed. * @return A node client for the test. */ @@ -966,6 +1046,7 @@ private NodeClient getNodeClient(BiConsumer listener, FailureStoreDocumentConverter failureStoreDocumentConverter + ) { + return newBulkOperation( + state, + client, + request, + existingResponses, + observer, + listener, + failureStoreDocumentConverter, + DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()) + ); + } + + private BulkOperation newBulkOperation( + ClusterState state, + NodeClient client, + BulkRequest request, + AtomicArray existingResponses, + ClusterStateObserver observer, + ActionListener listener, + FailureStoreDocumentConverter failureStoreDocumentConverter, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings ) { // Time provision long timeZero = TimeUnit.MILLISECONDS.toNanos(randomMillisUpToYear9999() - TimeUnit.DAYS.toMillis(1)); @@ -1105,7 +1208,8 @@ private BulkOperation newBulkOperation( listener, observer, failureStoreDocumentConverter, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + dataStreamFailureStoreSettings ); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 96b62056b6dc4..50885fc399c89 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; @@ -36,6 +37,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Nullable; @@ -157,7 +159,8 @@ class TestTransportBulkAction extends TransportBulkAction { TestIndexNameExpressionResolver.newInstance(), new IndexingPressure(SETTINGS), EmptySystemIndices.INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()) ); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 6bc08995b932e..0032093459a0d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -28,6 +28,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; +import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexAbstraction.ConcreteIndex; @@ -39,6 +41,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.features.FeatureService; @@ -84,6 +87,8 @@ public class TransportBulkActionTests extends ESTestCase { + private final ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(); + /** Services needed by bulk action */ private TransportService transportService; private ClusterService clusterService; @@ -112,7 +117,8 @@ class TestTransportBulkAction extends TransportBulkAction { new Resolver(), new IndexingPressure(Settings.EMPTY), EmptySystemIndices.INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + DataStreamFailureStoreSettings.create(clusterSettings) ); } @@ -392,36 +398,52 @@ public void testRejectionAfterCreateIndexIsPropagated() { public void testResolveFailureStoreFromMetadata() throws Exception { assumeThat(DataStream.isFailureStoreFeatureFlagEnabled(), is(true)); - String dataStreamWithFailureStore = "test-data-stream-failure-enabled"; - String dataStreamWithoutFailureStore = "test-data-stream-failure-disabled"; + String dataStreamWithFailureStoreEnabled = "test-data-stream-failure-enabled"; + String dataStreamWithFailureStoreDefault = "test-data-stream-failure-default"; + String dataStreamWithFailureStoreDisabled = "test-data-stream-failure-disabled"; long testTime = randomMillisUpToYear9999(); - IndexMetadata backingIndex1 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithFailureStore, testTime).build(); - IndexMetadata backingIndex2 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithoutFailureStore, testTime).build(); - IndexMetadata failureStoreIndex1 = DataStreamTestHelper.createFirstFailureStore(dataStreamWithFailureStore, testTime).build(); + IndexMetadata backingIndex1 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithFailureStoreEnabled, testTime).build(); + IndexMetadata backingIndex2 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithFailureStoreDefault, testTime).build(); + IndexMetadata backingIndex3 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithFailureStoreDisabled, testTime).build(); + IndexMetadata failureStoreIndex1 = DataStreamTestHelper.createFirstFailureStore(dataStreamWithFailureStoreEnabled, testTime) + .build(); Metadata metadata = Metadata.builder() .dataStreams( Map.of( - dataStreamWithFailureStore, + dataStreamWithFailureStoreEnabled, DataStreamTestHelper.newInstance( - dataStreamWithFailureStore, + dataStreamWithFailureStoreEnabled, List.of(backingIndex1.getIndex()), 1L, Map.of(), false, null, - List.of(failureStoreIndex1.getIndex()) + List.of(), + DataStreamOptions.FAILURE_STORE_ENABLED ), - dataStreamWithoutFailureStore, + dataStreamWithFailureStoreDefault, DataStreamTestHelper.newInstance( - dataStreamWithoutFailureStore, + dataStreamWithFailureStoreDefault, List.of(backingIndex2.getIndex()), 1L, Map.of(), false, null, - List.of() + List.of(), + DataStreamOptions.EMPTY + ), + dataStreamWithFailureStoreDisabled, + DataStreamTestHelper.newInstance( + dataStreamWithFailureStoreDisabled, + List.of(backingIndex3.getIndex()), + 1L, + Map.of(), + false, + null, + List.of(), + DataStreamOptions.FAILURE_STORE_DISABLED ) ), Map.of() @@ -432,6 +454,8 @@ public void testResolveFailureStoreFromMetadata() throws Exception { backingIndex1, backingIndex2.getIndex().getName(), backingIndex2, + backingIndex3.getIndex().getName(), + backingIndex3, failureStoreIndex1.getIndex().getName(), failureStoreIndex1 ) @@ -439,38 +463,56 @@ public void testResolveFailureStoreFromMetadata() throws Exception { .build(); // Data stream with failure store should store failures - assertThat(TransportBulkAction.resolveFailureInternal(dataStreamWithFailureStore, metadata, testTime), is(true)); - // Data stream without failure store should not - assertThat(TransportBulkAction.resolveFailureInternal(dataStreamWithoutFailureStore, metadata, testTime), is(false)); + assertThat(bulkAction.resolveFailureInternal(dataStreamWithFailureStoreEnabled, metadata, testTime), is(true)); + // Data stream with the default failure store options should not... + assertThat(bulkAction.resolveFailureInternal(dataStreamWithFailureStoreDefault, metadata, testTime), is(false)); + // ...unless we change the cluster setting to enable it that way. + clusterSettings.applySettings( + Settings.builder() + .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), dataStreamWithFailureStoreDefault) + .build() + ); + assertThat(bulkAction.resolveFailureInternal(dataStreamWithFailureStoreDefault, metadata, testTime), is(true)); + // Data stream with failure store explicitly disabled should not store failures even if it matches the cluster setting + clusterSettings.applySettings( + Settings.builder() + .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), dataStreamWithFailureStoreDisabled) + .build() + ); + assertThat(bulkAction.resolveFailureInternal(dataStreamWithFailureStoreDisabled, metadata, testTime), is(false)); // An index should not be considered for failure storage - assertThat(TransportBulkAction.resolveFailureInternal(backingIndex1.getIndex().getName(), metadata, testTime), is(nullValue())); + assertThat(bulkAction.resolveFailureInternal(backingIndex1.getIndex().getName(), metadata, testTime), is(nullValue())); // even if that index is itself a failure store - assertThat( - TransportBulkAction.resolveFailureInternal(failureStoreIndex1.getIndex().getName(), metadata, testTime), - is(nullValue()) - ); + assertThat(bulkAction.resolveFailureInternal(failureStoreIndex1.getIndex().getName(), metadata, testTime), is(nullValue())); } public void testResolveFailureStoreFromTemplate() throws Exception { assumeThat(DataStream.isFailureStoreFeatureFlagEnabled(), is(true)); - String dsTemplateWithFailureStore = "test-data-stream-failure-enabled"; - String dsTemplateWithoutFailureStore = "test-data-stream-failure-disabled"; + String dsTemplateWithFailureStoreEnabled = "test-data-stream-failure-enabled"; + String dsTemplateWithFailureStoreDefault = "test-data-stream-failure-default"; + String dsTemplateWithFailureStoreDisabled = "test-data-stream-failure-disabled"; String indexTemplate = "test-index"; long testTime = randomMillisUpToYear9999(); Metadata metadata = Metadata.builder() .indexTemplates( Map.of( - dsTemplateWithFailureStore, + dsTemplateWithFailureStoreEnabled, ComposableIndexTemplate.builder() - .indexPatterns(List.of(dsTemplateWithFailureStore + "-*")) + .indexPatterns(List.of(dsTemplateWithFailureStoreEnabled + "-*")) .template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .build(), - dsTemplateWithoutFailureStore, + dsTemplateWithFailureStoreDefault, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dsTemplateWithFailureStoreDefault + "-*")) + .template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(null))) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(), + dsTemplateWithFailureStoreDisabled, ComposableIndexTemplate.builder() - .indexPatterns(List.of(dsTemplateWithoutFailureStore + "-*")) + .indexPatterns(List.of(dsTemplateWithFailureStoreDisabled + "-*")) .template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(false))) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .build(), @@ -481,11 +523,36 @@ public void testResolveFailureStoreFromTemplate() throws Exception { .build(); // Data stream with failure store should store failures - assertThat(TransportBulkAction.resolveFailureInternal(dsTemplateWithFailureStore + "-1", metadata, testTime), is(true)); - // Data stream without failure store should not - assertThat(TransportBulkAction.resolveFailureInternal(dsTemplateWithoutFailureStore + "-1", metadata, testTime), is(false)); + assertThat(bulkAction.resolveFailureInternal(dsTemplateWithFailureStoreEnabled + "-1", metadata, testTime), is(true)); + // Same if date math is used + assertThat(bulkAction.resolveFailureInternal("<" + dsTemplateWithFailureStoreEnabled + "-{now}>", metadata, testTime), is(true)); + // Data stream with the default failure store options should not... + assertThat(bulkAction.resolveFailureInternal(dsTemplateWithFailureStoreDefault + "-1", metadata, testTime), is(false)); + assertThat(bulkAction.resolveFailureInternal("<" + dsTemplateWithFailureStoreDefault + "-{now}>", metadata, testTime), is(false)); + // ...unless we change the cluster setting to enable it that way. + clusterSettings.applySettings( + Settings.builder() + .put( + DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), + dsTemplateWithFailureStoreDefault + "*" + ) + .build() + ); + assertThat(bulkAction.resolveFailureInternal(dsTemplateWithFailureStoreDefault + "-1", metadata, testTime), is(true)); + assertThat(bulkAction.resolveFailureInternal("<" + dsTemplateWithFailureStoreDefault + "-{now}>", metadata, testTime), is(true)); + // Data stream with failure store explicitly disabled should not store failures even if it matches the cluster setting + clusterSettings.applySettings( + Settings.builder() + .put( + DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), + dsTemplateWithFailureStoreDisabled + "*" + ) + .build() + ); + assertThat(bulkAction.resolveFailureInternal(dsTemplateWithFailureStoreDisabled + "-1", metadata, testTime), is(false)); + assertThat(bulkAction.resolveFailureInternal("<" + dsTemplateWithFailureStoreDisabled + "-{now}>", metadata, testTime), is(false)); // An index template should not be considered for failure storage - assertThat(TransportBulkAction.resolveFailureInternal(indexTemplate + "-1", metadata, testTime), is(nullValue())); + assertThat(bulkAction.resolveFailureInternal(indexTemplate + "-1", metadata, testTime), is(nullValue())); } /** @@ -558,38 +625,6 @@ public void testFailuresDuringPrerequisiteActions() throws InterruptedException assertNull(bulkRequest.requests.get(2)); } - public void testFailureStoreFromTemplateResolution() { - Metadata metadata = Metadata.builder() - .indexTemplates( - Map.of( - "my-index-template", - ComposableIndexTemplate.builder().indexPatterns(List.of("my-index*")).build(), - "my-enabled-fs-template", - ComposableIndexTemplate.builder() - .indexPatterns(List.of("my-enabled*")) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) - .template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))) - .build(), - "my-disabled-fs-template", - ComposableIndexTemplate.builder() - .indexPatterns(List.of("my-disabled*")) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) - .template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(false))) - .build(), - "my-no-fs-template", - ComposableIndexTemplate.builder() - .indexPatterns(List.of("my-no*")) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) - .build() - ) - ) - .build(); - assertThat(TransportBulkAction.resolveFailureStoreFromTemplate("my-index", metadata), nullValue()); - assertThat(TransportBulkAction.resolveFailureStoreFromTemplate("my-enabled-fs", metadata), equalTo(true)); - assertThat(TransportBulkAction.resolveFailureStoreFromTemplate("my-disabled-fs", metadata), equalTo(false)); - assertThat(TransportBulkAction.resolveFailureStoreFromTemplate("my-no-fs", metadata), equalTo(false)); - } - private BulkRequest buildBulkRequest(List indices) { BulkRequest request = new BulkRequest(); for (String index : indices) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index b3d3ebe5e1357..2f033e4b5a383 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -21,11 +21,13 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -252,7 +254,8 @@ static class TestTransportBulkAction extends TransportBulkAction { new IndexingPressure(Settings.EMPTY), EmptySystemIndices.INSTANCE, relativeTimeProvider, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()) ); } } diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java index b0b3531f54b48..feb00728c858e 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java @@ -85,6 +85,7 @@ private static GetDataStreamAction.Response.DataStreamInfo newDataStreamInfo(boo DataStream dataStream = newDataStreamInstance(isSystem, retention); return new GetDataStreamAction.Response.DataStreamInfo( dataStream, + randomBoolean(), randomFrom(ClusterHealthStatus.values()), null, null, diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreSettingsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreSettingsTests.java new file mode 100644 index 0000000000000..709d026cae034 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreSettingsTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.util.stream.Stream; + +import static com.carrotsearch.randomizedtesting.generators.RandomStrings.randomAsciiAlphanumOfLengthBetween; +import static org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING; +import static org.hamcrest.Matchers.is; + +public class DataStreamFailureStoreSettingsTests extends ESTestCase { + + public void testFailureStoreEnabledForDataStreamName_defaultSettings() { + DataStreamFailureStoreSettings dataStreamFailureStoreSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings() + ); + + // The default should return false for any input. + // The following will include some illegal names, but it's still valid to test how the method treats them. + Stream.generate(() -> randomAsciiAlphanumOfLengthBetween(random(), 1, 20)) + .limit(100) + .forEach(name -> assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName(name), is(false))); + Stream.generate(() -> randomUnicodeOfLengthBetween(1, 20)) + .limit(100) + .forEach(name -> assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName(name), is(false))); + } + + public void testFailureStoreEnabledForDataStreamName_exactMatches() { + DataStreamFailureStoreSettings dataStreamFailureStoreSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings( + // Match exactly 'foo' and 'bar' — whitespace should be stripped: + Settings.builder().put(DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), " foo , bar ").build() + ) + ); + + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("foo"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("bar"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("food"), is(false)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("tbar"), is(false)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName(".foo"), is(false)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("barf"), is(false)); + } + + public void testFailureStoreEnabledForDataStreamName_wildcardMatches() { + DataStreamFailureStoreSettings dataStreamFailureStoreSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder().put(DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), " foo* , *bar , a*z ").build() + ) + ); + + // These tests aren't exhaustive as the library used is tested thoroughly, but they provide a basic check of the correct usage: + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("foo"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("bar"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("food"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("tbar"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("az"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("a123z"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName(".foo"), is(false)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("barf"), is(false)); + } + + public void testFailureStoreEnabledForDataStreamName_respondsToSettingsChange() { + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings( + Settings.builder().put(DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "foo").build() + ); + DataStreamFailureStoreSettings dataStreamFailureStoreSettings = DataStreamFailureStoreSettings.create(clusterSettings); + + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("foo"), is(true)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("bar"), is(false)); + + clusterSettings.applySettings(Settings.builder().put(DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "bar").build()); + + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("foo"), is(false)); + assertThat(dataStreamFailureStoreSettings.failureStoreEnabledForDataStreamName("bar"), is(true)); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index cfdcfe48c8d9a..f7f299683c3fc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; @@ -27,6 +28,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.AbstractXContentSerializingTestCase; import org.elasticsearch.test.ESTestCase; @@ -66,6 +68,8 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class DataStreamTests extends AbstractXContentSerializingTestCase { @@ -2280,6 +2284,157 @@ public void testInternalDataStream() { assertThat(dotPrefixedDataStream.isInternal(), is(true)); } + public void testIsFailureStoreExplicitlyEnabled() { + DataStream dataStreamNoFailureStoreOptions = createTestInstance().copy().setDataStreamOptions(DataStreamOptions.EMPTY).build(); + DataStream dataStreamFailureStoreDisabled = createTestInstance().copy() + .setDataStreamOptions(DataStreamOptions.FAILURE_STORE_DISABLED) + .build(); + DataStream dataStreamFailureStoreEnabled = createTestInstance().copy() + .setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED) + .build(); + assertThat(dataStreamNoFailureStoreOptions.isFailureStoreExplicitlyEnabled(), is(false)); + assertThat(dataStreamFailureStoreDisabled.isFailureStoreExplicitlyEnabled(), is(false)); + assertThat(dataStreamFailureStoreEnabled.isFailureStoreExplicitlyEnabled(), is(true)); + } + + public void testIsFailureStoreEffectivelyEnabled_instanceMethod() { + DataStream dataStreamNoFailureStoreOptions = createTestInstance().copy() + .setName("my-data-stream-no-failure-store-options") + .setDataStreamOptions(DataStreamOptions.EMPTY) + .build(); + DataStream dataStreamFailureStoreExplicitlyDisabled = createTestInstance().copy() + .setName("my-data-stream-failure-store-explicitly-disabled") + .setDataStreamOptions(DataStreamOptions.FAILURE_STORE_DISABLED) + .build(); + DataStream dataStreamFailureStoreExplicitlyEnabled = createTestInstance().copy() + .setName("my-data-stream-failure-store-explicitly-enabled") + .setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED) + .build(); + DataStream dotPrefixDataStreamNoFailureStoreOptions = createTestInstance().copy() + .setName(".my-data-stream-no-failure-store-options") + .setDataStreamOptions(DataStreamOptions.EMPTY) + .build(); + DataStream systemDataStreamNoFailureStoreOptions = createTestInstance().copy() + .setName("my-data-stream-system-no-failure-store-options") + .setDataStreamOptions(DataStreamOptions.EMPTY) + .setSystem(true) + .setHidden(true) // system indices must be hidden + .build(); + DataStreamFailureStoreSettings matchingSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put( + DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), + String.join(",", "my-data-stream-*", ".my-data-stream-*") + ) + .build() + ) + ); + DataStreamFailureStoreSettings nonMatchingSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "not-my-data-stream-*") + .build() + ) + ); + assertThat(dataStreamNoFailureStoreOptions.isFailureStoreEffectivelyEnabled(matchingSettings), is(true)); + assertThat(dataStreamNoFailureStoreOptions.isFailureStoreEffectivelyEnabled(nonMatchingSettings), is(false)); + assertThat(dataStreamFailureStoreExplicitlyDisabled.isFailureStoreEffectivelyEnabled(matchingSettings), is(false)); + assertThat(dataStreamFailureStoreExplicitlyDisabled.isFailureStoreEffectivelyEnabled(nonMatchingSettings), is(false)); + assertThat(dataStreamFailureStoreExplicitlyEnabled.isFailureStoreEffectivelyEnabled(matchingSettings), is(true)); + assertThat(dataStreamFailureStoreExplicitlyEnabled.isFailureStoreEffectivelyEnabled(nonMatchingSettings), is(true)); + assertThat(dotPrefixDataStreamNoFailureStoreOptions.isFailureStoreEffectivelyEnabled(matchingSettings), is(false)); + assertThat(dotPrefixDataStreamNoFailureStoreOptions.isFailureStoreEffectivelyEnabled(nonMatchingSettings), is(false)); + assertThat(systemDataStreamNoFailureStoreOptions.isFailureStoreEffectivelyEnabled(matchingSettings), is(false)); + assertThat(systemDataStreamNoFailureStoreOptions.isFailureStoreEffectivelyEnabled(nonMatchingSettings), is(false)); + } + + public void testIsFailureStoreEffectivelyEnabled_staticHelperMethod() { + String regularDataStreamName = "my-data-stream"; + String dotPrefixedDataStreamName = ".my-dot-prefixed-data-stream"; + String systemDataStreamName = "my-system-data-stream-name"; + DataStreamFailureStoreSettings matchingSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put( + DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), + String.join(",", regularDataStreamName, dotPrefixedDataStreamName, systemDataStreamName) + ) + .build() + ) + ); + DataStreamFailureStoreSettings nonMatchingSettings = DataStreamFailureStoreSettings.create( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "not-my-data-stream") + .build() + ) + ); + // At time of writing, SystemDataStreamDescriptor does not allow us to declare system data streams which aren't also dot-prefixed. + // But we code defensively to do the system data stream and dot-prefix tests independently, as implied in the requirements. + // We use a mock SystemIndices instance for testing, so that we can make it treat a non-dot-prefixed name as a system data stream. + SystemIndices systemIndices = mock(SystemIndices.class); + when(systemIndices.isSystemDataStream(systemDataStreamName)).thenReturn(true); + + assertThat( + DataStream.isFailureStoreEffectivelyEnabled(DataStreamOptions.EMPTY, matchingSettings, regularDataStreamName, systemIndices), + is(true) + ); + assertThat( + DataStream.isFailureStoreEffectivelyEnabled(DataStreamOptions.EMPTY, nonMatchingSettings, regularDataStreamName, systemIndices), + is(false) + ); + assertThat( + DataStream.isFailureStoreEffectivelyEnabled( + DataStreamOptions.EMPTY, + matchingSettings, + dotPrefixedDataStreamName, + systemIndices + ), + is(false) + ); + assertThat( + DataStream.isFailureStoreEffectivelyEnabled(DataStreamOptions.EMPTY, matchingSettings, systemDataStreamName, systemIndices), + is(false) + ); + assertThat( + DataStream.isFailureStoreEffectivelyEnabled( + DataStreamOptions.FAILURE_STORE_DISABLED, + matchingSettings, + regularDataStreamName, + systemIndices + ), + is(false) + ); + assertThat( + DataStream.isFailureStoreEffectivelyEnabled( + DataStreamOptions.FAILURE_STORE_DISABLED, + nonMatchingSettings, + regularDataStreamName, + systemIndices + ), + is(false) + ); + assertThat( + DataStream.isFailureStoreEffectivelyEnabled( + DataStreamOptions.FAILURE_STORE_ENABLED, + matchingSettings, + regularDataStreamName, + systemIndices + ), + is(true) + ); + assertThat( + DataStream.isFailureStoreEffectivelyEnabled( + DataStreamOptions.FAILURE_STORE_ENABLED, + nonMatchingSettings, + regularDataStreamName, + systemIndices + ), + is(true) + ); + } + private record DataStreamMetadata(Long creationTimeInMillis, Long rolloverTimeInMillis, Long originationTimeInMillis) { public static DataStreamMetadata dataStreamMetadata(Long creationTimeInMillis, Long rolloverTimeInMillis) { return new DataStreamMetadata(creationTimeInMillis, rolloverTimeInMillis, null); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index b7f33151961ea..7a07e407024ce 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -93,6 +93,7 @@ import org.elasticsearch.cluster.coordination.LeaderHeartbeatService; import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadataVerifier; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -2408,7 +2409,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { indexNameExpressionResolver, new IndexingPressure(settings), EmptySystemIndices.INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()) ) ); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction( diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 06e4e486e78b5..c3ce32d4ce333 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -146,13 +146,35 @@ public static DataStream newInstance( boolean replicated, @Nullable DataStreamLifecycle lifecycle, List failureStores + ) { + return newInstance( + name, + indices, + generation, + metadata, + replicated, + lifecycle, + failureStores, + failureStores.isEmpty() ? DataStreamOptions.EMPTY : DataStreamOptions.FAILURE_STORE_ENABLED + ); + } + + public static DataStream newInstance( + String name, + List indices, + long generation, + Map metadata, + boolean replicated, + DataStreamLifecycle lifecycle, + List failureStores, + DataStreamOptions dataStreamOptions ) { return DataStream.builder(name, indices) .setGeneration(generation) .setMetadata(metadata) .setReplicated(replicated) .setLifecycle(lifecycle) - .setDataStreamOptions(failureStores.isEmpty() ? DataStreamOptions.EMPTY : DataStreamOptions.FAILURE_STORE_ENABLED) + .setDataStreamOptions(dataStreamOptions) .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build()) .build(); } @@ -457,7 +479,7 @@ public static void getClusterStateWithDataStreams( Settings settings, int replicas, boolean replicated, - boolean storeFailures + Boolean storeFailures ) { builder.put( "template_1", @@ -466,7 +488,7 @@ public static void getClusterStateWithDataStreams( .template( Template.builder() .dataStreamOptions( - DataStream.isFailureStoreFeatureFlagEnabled() && storeFailures ? createDataStreamOptionsTemplate(true) : null + DataStream.isFailureStoreFeatureFlagEnabled() ? createDataStreamOptionsTemplate(storeFailures) : null ) ) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) @@ -484,7 +506,7 @@ public static void getClusterStateWithDataStreams( allIndices.addAll(backingIndices); List failureStores = new ArrayList<>(); - if (DataStream.isFailureStoreFeatureFlagEnabled() && storeFailures) { + if (DataStream.isFailureStoreFeatureFlagEnabled() && Boolean.TRUE.equals(storeFailures)) { for (int failureStoreNumber = 1; failureStoreNumber <= dsTuple.v2(); failureStoreNumber++) { failureStores.add( createIndexMetadata( diff --git a/x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java b/x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java index 0b6e187930c01..0fb3144e12040 100644 --- a/x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java +++ b/x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java @@ -9,6 +9,7 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -51,7 +52,8 @@ public void testDSXpackUsage() throws Exception { assertThat(dataStreams.get("data_streams"), equalTo(0)); assertThat(dataStreams, hasKey("failure_store")); Map failureStoreStats = (Map) dataStreams.get("failure_store"); - assertThat(failureStoreStats.get("enabled_count"), equalTo(0)); + assertThat(failureStoreStats.get("explicitly_enabled_count"), equalTo(0)); + assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(0)); assertThat(failureStoreStats.get("failure_indices_count"), equalTo(0)); assertBusy(() -> { Map logsTemplate = (Map) ((List) getLocation("/_index_template/logs").get("index_templates")).get(0); @@ -85,8 +87,21 @@ public void testDSXpackUsage() throws Exception { assertThat("got: " + dataStreams, dataStreams.get("data_streams"), equalTo(2)); assertThat("got: " + dataStreams, dataStreams.get("indices_count"), equalTo(3)); failureStoreStats = (Map) dataStreams.get("failure_store"); - assertThat(failureStoreStats.get("enabled_count"), equalTo(1)); + assertThat(failureStoreStats.get("explicitly_enabled_count"), equalTo(1)); + assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(1)); assertThat(failureStoreStats.get("failure_indices_count"), equalTo(1)); + + // Enable the failure store for logs-mysql-default using the cluster setting... + updateClusterSettings( + Settings.builder() + .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "logs-mysql-default") + .build() + ); + // ...and assert that it counts towards effectively_enabled_count but not explicitly_enabled_count: + dataStreams = (Map) getLocation("/_xpack/usage").get("data_streams"); + failureStoreStats = (Map) dataStreams.get("failure_store"); + assertThat(failureStoreStats.get("explicitly_enabled_count"), equalTo(1)); + assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(2)); } Map getLocation(String path) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java index 26f3bdd7654ca..7eece9177cf2b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.injection.guice.Inject; @@ -24,13 +25,16 @@ public class DataStreamUsageTransportAction extends XPackUsageFeatureTransportAction { + private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings; + @Inject public DataStreamUsageTransportAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + DataStreamFailureStoreSettings dataStreamFailureStoreSettings ) { super( XPackUsageFeatureAction.DATA_STREAMS.name(), @@ -40,6 +44,7 @@ public DataStreamUsageTransportAction( actionFilters, indexNameExpressionResolver ); + this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; } @Override @@ -51,13 +56,17 @@ protected void masterOperation( ) { final Map dataStreams = state.metadata().dataStreams(); long backingIndicesCounter = 0; - long failureStoreEnabledCounter = 0; + long failureStoreExplicitlyEnabledCounter = 0; + long failureStoreEffectivelyEnabledCounter = 0; long failureIndicesCounter = 0; for (DataStream ds : dataStreams.values()) { backingIndicesCounter += ds.getIndices().size(); if (DataStream.isFailureStoreFeatureFlagEnabled()) { - if (ds.isFailureStoreEnabled()) { - failureStoreEnabledCounter++; + if (ds.isFailureStoreExplicitlyEnabled()) { + failureStoreExplicitlyEnabledCounter++; + } + if (ds.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings)) { + failureStoreEffectivelyEnabledCounter++; } if (ds.getFailureIndices().getIndices().isEmpty() == false) { failureIndicesCounter += ds.getFailureIndices().getIndices().size(); @@ -67,7 +76,8 @@ protected void masterOperation( final DataStreamFeatureSetUsage.DataStreamStats stats = new DataStreamFeatureSetUsage.DataStreamStats( dataStreams.size(), backingIndicesCounter, - failureStoreEnabledCounter, + failureStoreExplicitlyEnabledCounter, + failureStoreEffectivelyEnabledCounter, failureIndicesCounter ); final DataStreamFeatureSetUsage usage = new DataStreamFeatureSetUsage(stats); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamFeatureSetUsage.java index 1a964f3c57dbb..f8cb9b913b4ae 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamFeatureSetUsage.java @@ -52,7 +52,8 @@ protected void innerXContent(XContentBuilder builder, Params params) throws IOEx builder.field("indices_count", streamStats.indicesBehindDataStream); if (DataStream.isFailureStoreFeatureFlagEnabled()) { builder.startObject("failure_store"); - builder.field("enabled_count", streamStats.failureStoreEnabledDataStreamCount); + builder.field("explicitly_enabled_count", streamStats.failureStoreExplicitlyEnabledDataStreamCount); + builder.field("effectively_enabled_count", streamStats.failureStoreEffectivelyEnabledDataStreamCount); builder.field("failure_indices_count", streamStats.failureStoreIndicesCount); builder.endObject(); } @@ -83,7 +84,8 @@ public boolean equals(Object obj) { public record DataStreamStats( long totalDataStreamCount, long indicesBehindDataStream, - long failureStoreEnabledDataStreamCount, + long failureStoreExplicitlyEnabledDataStreamCount, + long failureStoreEffectivelyEnabledDataStreamCount, long failureStoreIndicesCount ) implements Writeable { @@ -92,6 +94,7 @@ public DataStreamStats(StreamInput in) throws IOException { in.readVLong(), in.readVLong(), in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? in.readVLong() : 0, + in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING) ? in.readVLong() : 0, in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? in.readVLong() : 0 ); } @@ -101,7 +104,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(this.totalDataStreamCount); out.writeVLong(this.indicesBehindDataStream); if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) { - out.writeVLong(this.failureStoreEnabledDataStreamCount); + out.writeVLong(this.failureStoreExplicitlyEnabledDataStreamCount); + if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING)) { + out.writeVLong(failureStoreEffectivelyEnabledDataStreamCount); + } out.writeVLong(this.failureStoreIndicesCount); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/DataStreamFeatureSetUsageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/DataStreamFeatureSetUsageTests.java index 3ff36c52229e7..5f879d508a52b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/DataStreamFeatureSetUsageTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/DataStreamFeatureSetUsageTests.java @@ -20,6 +20,7 @@ protected DataStreamFeatureSetUsage createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong() ) );