Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.12] Data streams fix failure store delete (#104281) #104341

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/104281.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104281
summary: Data streams fix failure store delete
area: Data streams
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ static ClusterState removeDataStream(
DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName);
assert dataStream != null;
backingIndicesToRemove.addAll(dataStream.getIndices());
backingIndicesToRemove.addAll(dataStream.getFailureIndices());
}

// first delete the data streams and then the indices:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Assume;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -55,6 +56,30 @@ public void testDeleteDataStream() {
}
}

public void testDeleteDataStreamWithFailureStore() {
Assume.assumeTrue(DataStream.isFailureStoreEnabled());

final String dataStreamName = "my-data-stream";
final List<String> otherIndices = randomSubsetOf(List.of("foo", "bar", "baz"));

ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(
List.of(new Tuple<>(dataStreamName, 2)),
otherIndices,
System.currentTimeMillis(),
Settings.EMPTY,
1,
false,
true
);
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[] { dataStreamName });
ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(iner, cs, req, validator, Settings.EMPTY);
assertThat(newState.metadata().dataStreams().size(), equalTo(0));
assertThat(newState.metadata().indices().size(), equalTo(otherIndices.size()));
for (String indexName : otherIndices) {
assertThat(newState.metadata().indices().get(indexName).getIndex().getName(), equalTo(indexName));
}
}

public void testDeleteMultipleDataStreams() {
String[] dataStreamNames = { "foo", "bar", "baz", "eggplant" };
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public void testGetTimeSeriesMixedDataStream() {
instant.toEpochMilli(),
Settings.EMPTY,
0,
false,
false
);
DataStreamTestHelper.getClusterStateWithDataStream(mBuilder, dataStream1, List.of(new Tuple<>(twoHoursAgo, twoHoursAhead)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ setup:
name: failure-data-stream2
- is_true: acknowledged

- do:
indices.delete_index_template:
name: my-template4
- is_true: acknowledged

---
"Create data stream with invalid name":
- skip:
Expand Down Expand Up @@ -532,6 +537,80 @@ setup:
indices.get:
index: $idx0name

---
"Delete data stream with failure stores":
- skip:
version: " - 8.11.99"
reason: "data streams only supported in 8.12+"

- do:
allowed_warnings:
- "index template [my-template4] has index patterns [failure-data-stream1, failure-data-stream2] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template4] will take precedence during new index creation"
indices.put_index_template:
name: my-template4
body:
index_patterns: [ failure-data-stream1 ]
data_stream:
failure_store: true

- do:
indices.create_data_stream:
name: failure-data-stream1
- is_true: acknowledged

- do:
indices.create:
index: test_index
body:
settings:
number_of_shards: 1
number_of_replicas: 1

# save the backing index names for later use
- do:
indices.get_data_stream:
name: failure-data-stream1

- set: { data_streams.0.indices.0.index_name: idx0name }
- set: { data_streams.0.failure_indices.0.index_name: fs0name }

- do:
indices.get:
index: ['.ds-failure-data-stream1-*000001', 'test_index']

- is_true: test_index.settings
- is_true: .$idx0name.settings

- do:
indices.get_data_stream: {}
- match: { data_streams.0.name: failure-data-stream1 }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- match: { data_streams.0.generation: 1 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_indices: 1 }
- match: { data_streams.0.failure_indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
indices.delete_data_stream:
name: failure-data-stream1
- is_true: acknowledged

- do:
catch: missing
indices.get:
index: $idx0name

- do:
catch: missing
indices.get:
index: $fs0name

- do:
indices.delete_index_template:
name: my-template4
- is_true: acknowledged

---
"Delete data stream missing behaviour":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.elasticsearch.cluster.metadata.DataStream.BACKING_INDEX_PREFIX;
import static org.elasticsearch.cluster.metadata.DataStream.DATE_FORMATTER;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureStoreName;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.elasticsearch.test.ESTestCase.generateRandomStringArray;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
Expand Down Expand Up @@ -111,7 +112,19 @@ public static DataStream newInstance(
boolean replicated,
@Nullable DataStreamLifecycle lifecycle
) {
return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, false, List.of());
return newInstance(name, indices, generation, metadata, replicated, lifecycle, List.of());
}

public static DataStream newInstance(
String name,
List<Index> indices,
long generation,
Map<String, Object> metadata,
boolean replicated,
@Nullable DataStreamLifecycle lifecycle,
List<Index> failureStores
) {
return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, false, failureStores);
}

public static String getLegacyDefaultBackingIndexName(
Expand Down Expand Up @@ -317,9 +330,21 @@ public static ClusterState getClusterStateWithDataStreams(
Settings settings,
int replicas,
boolean replicated
) {
return getClusterStateWithDataStreams(dataStreams, indexNames, currentTime, settings, replicas, replicated, false);
}

public static ClusterState getClusterStateWithDataStreams(
List<Tuple<String, Integer>> dataStreams,
List<String> indexNames,
long currentTime,
Settings settings,
int replicas,
boolean replicated,
boolean storeFailures
) {
Metadata.Builder builder = Metadata.builder();
getClusterStateWithDataStreams(builder, dataStreams, indexNames, currentTime, settings, replicas, replicated);
getClusterStateWithDataStreams(builder, dataStreams, indexNames, currentTime, settings, replicas, replicated, storeFailures);
return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
}

Expand All @@ -330,13 +355,16 @@ public static void getClusterStateWithDataStreams(
long currentTime,
Settings settings,
int replicas,
boolean replicated
boolean replicated,
boolean storeFailures
) {
builder.put(
"template_1",
ComposableIndexTemplate.builder()
.indexPatterns(List.of("*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.dataStreamTemplate(
new ComposableIndexTemplate.DataStreamTemplate(false, false, DataStream.isFailureStoreEnabled() && storeFailures)
)
.build()
);

Expand All @@ -350,12 +378,29 @@ public static void getClusterStateWithDataStreams(
}
allIndices.addAll(backingIndices);

List<IndexMetadata> failureStores = new ArrayList<>();
if (DataStream.isFailureStoreEnabled() && storeFailures) {
for (int failureStoreNumber = 1; failureStoreNumber <= dsTuple.v2(); failureStoreNumber++) {
failureStores.add(
createIndexMetadata(
getDefaultFailureStoreName(dsTuple.v1(), failureStoreNumber, currentTime),
true,
settings,
replicas
)
);
}
allIndices.addAll(failureStores);
}

DataStream ds = DataStreamTestHelper.newInstance(
dsTuple.v1(),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
dsTuple.v2(),
null,
replicated
replicated,
null,
failureStores.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
);
builder.put(ds);
}
Expand Down