Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster setting to enable failure store #118662

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b06c9e8
Add cluster setting to enable failure store
PeteGillinElastic Dec 10, 2024
c06ddae
Merge remote-tracking branch 'upstream/main' into failure-store-globa…
PeteGillinElastic Dec 13, 2024
9fb84be
Respond to review comments
PeteGillinElastic Dec 16, 2024
e63698c
Swap the order of the settings and `isInternal` checks to an order mo…
PeteGillinElastic Dec 16, 2024
e60fa31
Reword javadoc for readability
PeteGillinElastic Dec 16, 2024
1a09efe
Revert "Swap the order of the settings and `isInternal` checks to an …
PeteGillinElastic Dec 16, 2024
8599a60
Add more test cases in `DataStreamOptionsIT`
PeteGillinElastic Dec 17, 2024
b3ca98c
Make `TransportGetDataStreamsActionTests` more readable
PeteGillinElastic Dec 17, 2024
2bddabe
Add a `NodeFeature` and require it in the YAML test
PeteGillinElastic Dec 17, 2024
b1f0860
spotlessApply
PeteGillinElastic Dec 17, 2024
5e3af98
Make YAML test fully reset flag by setting it to null rather than emp…
PeteGillinElastic Dec 17, 2024
95f78d0
Simplify YAML tests using indices.create_data_stream
PeteGillinElastic Dec 17, 2024
3dff7f5
Merge remote-tracking branch 'upstream/main' into failure-store-globa…
PeteGillinElastic Dec 17, 2024
2e6ab06
Clarify some javadoc
PeteGillinElastic Dec 18, 2024
b122cba
Another javadoc clarification
PeteGillinElastic Dec 18, 2024
e98ff13
Do date math resolution on name before matching it against the settin…
PeteGillinElastic Dec 18, 2024
26f2adc
Merge remote-tracking branch 'upstream/main' into failure-store-globa…
PeteGillinElastic Dec 18, 2024
38996ac
Only register cluster setting if feature flag is enabled
PeteGillinElastic Dec 18, 2024
9dbe937
Merge remote-tracking branch 'upstream/main' into failure-store-globa…
PeteGillinElastic Dec 18, 2024
0dafe53
Revert "Add a `NodeFeature` and require it in the YAML test"
PeteGillinElastic Dec 18, 2024
c1a0f3f
Add a capability on the PUT /_cluster/settings API and require that i…
PeteGillinElastic Dec 18, 2024
89aea85
Merge remote-tracking branch 'upstream/main' into failure-store-globa…
PeteGillinElastic Dec 18, 2024
ecc2612
Merge remote-tracking branch 'upstream/main' into failure-store-globa…
PeteGillinElastic Dec 19, 2024
5e2a3a0
Merge remote-tracking branch 'upstream/main' into failure-store-globa…
PeteGillinElastic Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -122,13 +125,25 @@ public void testExplicitlyResetDataStreamOptions() throws IOException {
assertOK(client().performRequest(otherRequest));
}

public void testEnableDisableFailureStore() throws IOException {
public void testBehaviorWithEachFailureStoreOptionAndClusterSetting() throws IOException {
{
// Default data stream options
assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options")));
assertFailureStore(false, 1);
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME);
assertDataStreamOptions(null);
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
assertRedirectsDocWithBadMappingToFailureStore();
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream");
assertDataStreamOptions(null);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
setDataStreamFailureStoreClusterSetting(null); // should get same behaviour as when we set it to something non-matching
assertDataStreamOptions(null);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
}
{
// Data stream options with failure store enabled
Request enableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
enableRequest.setJsonEntity("""
{
Expand All @@ -137,11 +152,21 @@ public void testEnableDisableFailureStore() throws IOException {
}
}""");
assertAcknowledged(client().performRequest(enableRequest));
assertFailureStore(true, 1);
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME);
assertDataStreamOptions(true);
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
assertRedirectsDocWithBadMappingToFailureStore();
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream"); // should have no effect as enabled in options
assertDataStreamOptions(true);
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
assertRedirectsDocWithBadMappingToFailureStore();
gmarouli marked this conversation as resolved.
Show resolved Hide resolved
setDataStreamFailureStoreClusterSetting(null); // same as previous
assertDataStreamOptions(true);
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
assertRedirectsDocWithBadMappingToFailureStore();
}

{
// Data stream options with failure store disabled
Request disableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
disableRequest.setJsonEntity("""
{
Expand All @@ -150,13 +175,23 @@ public void testEnableDisableFailureStore() throws IOException {
}
}""");
assertAcknowledged(client().performRequest(disableRequest));
assertFailureStore(false, 1);
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME); // should have no effect as disabled in options
assertDataStreamOptions(false);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream");
gmarouli marked this conversation as resolved.
Show resolved Hide resolved
assertDataStreamOptions(false);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
setDataStreamFailureStoreClusterSetting(null);
assertDataStreamOptions(false);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
}
}

@SuppressWarnings("unchecked")
private void assertFailureStore(boolean failureStoreEnabled, int failureStoreSize) throws IOException {
private void assertFailureStoreValuesInGetDataStreamResponse(boolean failureStoreEnabled, int failureStoreSize) throws IOException {
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
assertThat(dataStreams.size(), is(1));
Expand Down Expand Up @@ -198,4 +233,32 @@ private List<String> getIndices(Map<String, Object> response) {
List<Map<String, String>> indices = (List<Map<String, String>>) response.get("indices");
return indices.stream().map(index -> index.get("index_name")).toList();
}

private static void setDataStreamFailureStoreClusterSetting(String value) throws IOException {
updateClusterSettings(
Settings.builder().put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), value).build()
);
}

private Response putDocumentWithBadMapping() throws IOException {
Request request = new Request("POST", DATA_STREAM_NAME + "/_doc");
request.setJsonEntity("""
{
"@timestamp": "not a timestamp",
"foo": "bar"
}
""");
return client().performRequest(request);
}

private void assertRedirectsDocWithBadMappingToFailureStore() throws IOException {
Response response = putDocumentWithBadMapping();
String failureStoreResponse = (String) entityAsMap(response).get("failure_store");
assertThat(failureStoreResponse, is("used"));
}

private void assertFailsDocWithBadMapping() {
ResponseException e = assertThrows(ResponseException.class, this::putDocumentWithBadMapping);
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(RestStatus.BAD_REQUEST.getStatus()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadAction
private final SystemIndices systemIndices;
private final ClusterSettings clusterSettings;
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
private final Client client;

@Inject
Expand All @@ -75,6 +77,7 @@ public TransportGetDataStreamsAction(
IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices,
DataStreamGlobalRetentionSettings globalRetentionSettings,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
Client client
) {
super(
Expand All @@ -91,6 +94,7 @@ public TransportGetDataStreamsAction(
this.systemIndices = systemIndices;
this.globalRetentionSettings = globalRetentionSettings;
clusterSettings = clusterService.getClusterSettings();
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
this.client = new OriginSettingClient(client, "stack");
}

Expand Down Expand Up @@ -122,6 +126,7 @@ public void onResponse(DataStreamsStatsAction.Response response) {
systemIndices,
clusterSettings,
globalRetentionSettings,
dataStreamFailureStoreSettings,
maxTimestamps
)
);
Expand All @@ -134,7 +139,16 @@ public void onFailure(Exception e) {
});
} else {
listener.onResponse(
innerOperation(state, request, indexNameExpressionResolver, systemIndices, clusterSettings, globalRetentionSettings, null)
innerOperation(
state,
request,
indexNameExpressionResolver,
systemIndices,
clusterSettings,
globalRetentionSettings,
dataStreamFailureStoreSettings,
null
)
);
}
}
Expand All @@ -146,11 +160,16 @@ static GetDataStreamAction.Response innerOperation(
SystemIndices systemIndices,
ClusterSettings clusterSettings,
DataStreamGlobalRetentionSettings globalRetentionSettings,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
@Nullable Map<String, Long> maxTimestamps
) {
List<DataStream> dataStreams = getDataStreams(state, indexNameExpressionResolver, request);
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
for (DataStream dataStream : dataStreams) {
// For this action, we are returning whether the failure store is effectively enabled, either in metadata or by cluster setting.
// Users can use the get data stream options API to find out whether it is explicitly enabled in metadata.
boolean failureStoreEffectivelyEnabled = DataStream.isFailureStoreFeatureFlagEnabled()
&& dataStream.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings);
gmarouli marked this conversation as resolved.
Show resolved Hide resolved
final String indexTemplate;
boolean indexTemplatePreferIlmValue = true;
String ilmPolicyName = null;
Expand Down Expand Up @@ -254,6 +273,7 @@ public int compareTo(IndexInfo o) {
dataStreamInfos.add(
new GetDataStreamAction.Response.DataStreamInfo(
dataStream,
failureStoreEffectivelyEnabled,
streamHealth.getStatus(),
indexTemplate,
ilmPolicyName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti

Response.DataStreamInfo dataStreamInfo = new Response.DataStreamInfo(
logs,
true,
ClusterHealthStatus.GREEN,
"index-template",
null,
Expand Down Expand Up @@ -205,6 +206,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti

Response.DataStreamInfo dataStreamInfo = new Response.DataStreamInfo(
logs,
true,
ClusterHealthStatus.GREEN,
"index-template",
null,
Expand Down Expand Up @@ -282,14 +284,15 @@ public void testManagedByDisplayValuesDontAccidentalyChange() {

private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance) {
var dataStream = instance.getDataStream();
var failureStoreEffectivelyEnabled = instance.isFailureStoreEffectivelyEnabled();
var status = instance.getDataStreamStatus();
var indexTemplate = instance.getIndexTemplate();
var ilmPolicyName = instance.getIlmPolicy();
var timeSeries = instance.getTimeSeries();
var indexSettings = instance.getIndexSettingsValues();
var templatePreferIlm = instance.templatePreferIlmValue();
var maximumTimestamp = instance.getMaximumTimestamp();
switch (randomIntBetween(0, 7)) {
switch (randomIntBetween(0, 8)) {
case 0 -> dataStream = randomValueOtherThan(dataStream, DataStreamTestHelper::randomInstance);
case 1 -> status = randomValueOtherThan(status, () -> randomFrom(ClusterHealthStatus.values()));
case 2 -> indexTemplate = randomBoolean() && indexTemplate != null ? null : randomAlphaOfLengthBetween(2, 10);
Expand All @@ -314,9 +317,11 @@ private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance)
case 7 -> maximumTimestamp = (maximumTimestamp == null)
? randomNonNegativeLong()
: (usually() ? randomValueOtherThan(maximumTimestamp, ESTestCase::randomNonNegativeLong) : null);
case 8 -> failureStoreEffectivelyEnabled = failureStoreEffectivelyEnabled ? false : true;
}
return new Response.DataStreamInfo(
dataStream,
failureStoreEffectivelyEnabled,
status,
indexTemplate,
ilmPolicyName,
Expand Down Expand Up @@ -355,6 +360,7 @@ private Response.DataStreamInfo generateRandomDataStreamInfo() {
List<Tuple<Instant, Instant>> timeSeries = randomBoolean() ? generateRandomTimeSeries() : null;
return new Response.DataStreamInfo(
DataStreamTestHelper.randomInstance(),
randomBoolean(),
ClusterHealthStatus.GREEN,
randomAlphaOfLengthBetween(2, 10),
randomAlphaOfLengthBetween(2, 10),
Expand Down
Loading