Skip to content

Commit

Permalink
Data streams fix failure store delete (#104281) (#104341)
Browse files Browse the repository at this point in the history
This PR adds the any failure store indices to the list of indices to be deleted when deleting a data stream.

(cherry picked from commit 2a79d78)
  • Loading branch information
jbaiera authored Jan 12, 2024
1 parent 5fba67e commit d79ce66
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104281.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104281
summary: Data streams fix failure store delete
area: Data streams
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ static ClusterState removeDataStream(
DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName);
assert dataStream != null;
backingIndicesToRemove.addAll(dataStream.getIndices());
backingIndicesToRemove.addAll(dataStream.getFailureIndices());
}

// first delete the data streams and then the indices:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Assume;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -55,6 +56,30 @@ public void testDeleteDataStream() {
}
}

public void testDeleteDataStreamWithFailureStore() {
Assume.assumeTrue(DataStream.isFailureStoreEnabled());

final String dataStreamName = "my-data-stream";
final List<String> otherIndices = randomSubsetOf(List.of("foo", "bar", "baz"));

ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(
List.of(new Tuple<>(dataStreamName, 2)),
otherIndices,
System.currentTimeMillis(),
Settings.EMPTY,
1,
false,
true
);
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[] { dataStreamName });
ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(iner, cs, req, validator, Settings.EMPTY);
assertThat(newState.metadata().dataStreams().size(), equalTo(0));
assertThat(newState.metadata().indices().size(), equalTo(otherIndices.size()));
for (String indexName : otherIndices) {
assertThat(newState.metadata().indices().get(indexName).getIndex().getName(), equalTo(indexName));
}
}

public void testDeleteMultipleDataStreams() {
String[] dataStreamNames = { "foo", "bar", "baz", "eggplant" };
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public void testGetTimeSeriesMixedDataStream() {
instant.toEpochMilli(),
Settings.EMPTY,
0,
false,
false
);
DataStreamTestHelper.getClusterStateWithDataStream(mBuilder, dataStream1, List.of(new Tuple<>(twoHoursAgo, twoHoursAhead)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ setup:
name: failure-data-stream2
- is_true: acknowledged

- do:
indices.delete_index_template:
name: my-template4
- is_true: acknowledged

---
"Create data stream with invalid name":
- skip:
Expand Down Expand Up @@ -532,6 +537,80 @@ setup:
indices.get:
index: $idx0name

---
"Delete data stream with failure stores":
- skip:
version: " - 8.11.99"
reason: "data streams only supported in 8.12+"

- do:
allowed_warnings:
- "index template [my-template4] has index patterns [failure-data-stream1, failure-data-stream2] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template4] will take precedence during new index creation"
indices.put_index_template:
name: my-template4
body:
index_patterns: [ failure-data-stream1 ]
data_stream:
failure_store: true

- do:
indices.create_data_stream:
name: failure-data-stream1
- is_true: acknowledged

- do:
indices.create:
index: test_index
body:
settings:
number_of_shards: 1
number_of_replicas: 1

# save the backing index names for later use
- do:
indices.get_data_stream:
name: failure-data-stream1

- set: { data_streams.0.indices.0.index_name: idx0name }
- set: { data_streams.0.failure_indices.0.index_name: fs0name }

- do:
indices.get:
index: ['.ds-failure-data-stream1-*000001', 'test_index']

- is_true: test_index.settings
- is_true: .$idx0name.settings

- do:
indices.get_data_stream: {}
- match: { data_streams.0.name: failure-data-stream1 }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- match: { data_streams.0.generation: 1 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_indices: 1 }
- match: { data_streams.0.failure_indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
indices.delete_data_stream:
name: failure-data-stream1
- is_true: acknowledged

- do:
catch: missing
indices.get:
index: $idx0name

- do:
catch: missing
indices.get:
index: $fs0name

- do:
indices.delete_index_template:
name: my-template4
- is_true: acknowledged

---
"Delete data stream missing behaviour":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.elasticsearch.cluster.metadata.DataStream.BACKING_INDEX_PREFIX;
import static org.elasticsearch.cluster.metadata.DataStream.DATE_FORMATTER;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureStoreName;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.elasticsearch.test.ESTestCase.generateRandomStringArray;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
Expand Down Expand Up @@ -111,7 +112,19 @@ public static DataStream newInstance(
boolean replicated,
@Nullable DataStreamLifecycle lifecycle
) {
return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, false, List.of());
return newInstance(name, indices, generation, metadata, replicated, lifecycle, List.of());
}

public static DataStream newInstance(
String name,
List<Index> indices,
long generation,
Map<String, Object> metadata,
boolean replicated,
@Nullable DataStreamLifecycle lifecycle,
List<Index> failureStores
) {
return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, false, failureStores);
}

public static String getLegacyDefaultBackingIndexName(
Expand Down Expand Up @@ -317,9 +330,21 @@ public static ClusterState getClusterStateWithDataStreams(
Settings settings,
int replicas,
boolean replicated
) {
return getClusterStateWithDataStreams(dataStreams, indexNames, currentTime, settings, replicas, replicated, false);
}

public static ClusterState getClusterStateWithDataStreams(
List<Tuple<String, Integer>> dataStreams,
List<String> indexNames,
long currentTime,
Settings settings,
int replicas,
boolean replicated,
boolean storeFailures
) {
Metadata.Builder builder = Metadata.builder();
getClusterStateWithDataStreams(builder, dataStreams, indexNames, currentTime, settings, replicas, replicated);
getClusterStateWithDataStreams(builder, dataStreams, indexNames, currentTime, settings, replicas, replicated, storeFailures);
return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
}

Expand All @@ -330,13 +355,16 @@ public static void getClusterStateWithDataStreams(
long currentTime,
Settings settings,
int replicas,
boolean replicated
boolean replicated,
boolean storeFailures
) {
builder.put(
"template_1",
ComposableIndexTemplate.builder()
.indexPatterns(List.of("*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.dataStreamTemplate(
new ComposableIndexTemplate.DataStreamTemplate(false, false, DataStream.isFailureStoreEnabled() && storeFailures)
)
.build()
);

Expand All @@ -350,12 +378,29 @@ public static void getClusterStateWithDataStreams(
}
allIndices.addAll(backingIndices);

List<IndexMetadata> failureStores = new ArrayList<>();
if (DataStream.isFailureStoreEnabled() && storeFailures) {
for (int failureStoreNumber = 1; failureStoreNumber <= dsTuple.v2(); failureStoreNumber++) {
failureStores.add(
createIndexMetadata(
getDefaultFailureStoreName(dsTuple.v1(), failureStoreNumber, currentTime),
true,
settings,
replicas
)
);
}
allIndices.addAll(failureStores);
}

DataStream ds = DataStreamTestHelper.newInstance(
dsTuple.v1(),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
dsTuple.v2(),
null,
replicated
replicated,
null,
failureStores.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
);
builder.put(ds);
}
Expand Down

0 comments on commit d79ce66

Please sign in to comment.