From 90e837c1994881fbbae92c6cb2aaecac0b80fd28 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 17 Feb 2022 15:16:26 +0100 Subject: [PATCH] Allow regular data streams to be migrated to tsdb data streams. (#83843) A regular data stream can be migrated to a tsdb data stream if in template that created the data stream, the `index_mode` field is set to `time_series` and the data stream's `index_mode` property is either not specified or set to `standard`. Then on the next rollover the data stream is migrated to be a tsdb data stream. When that happens the data stream's `index_mode` property is set to `time_series` and the new backing index's `index.mode` index setting is also set to `time_series`. Closes #83520 --- .../datastreams/TsdbDataStreamRestIT.java | 131 +++++++++++++ .../DataStreamIndexSettingsProvider.java | 12 +- ...etadataDataStreamRolloverServiceTests.java | 183 +++++++++++++++++- .../rollover/MetadataRolloverService.java | 4 +- .../cluster/metadata/DataStream.java | 49 +++-- .../cluster/metadata/DataStreamTests.java | 61 +++++- .../metadata/DataStreamTestHelper.java | 4 + .../ReactiveStorageDeciderService.java | 2 +- 8 files changed, 425 insertions(+), 21 deletions(-) diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java index 14ea7ddc3793d..b3a1629176770 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.datastreams; import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.test.rest.ESRestTestCase; @@ -15,16 +16,19 @@ import java.io.IOException; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Map; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class TsdbDataStreamRestIT extends ESRestTestCase { @@ -84,6 +88,57 @@ public class TsdbDataStreamRestIT extends ESRestTestCase { } }"""; + private static final String NON_TSDB_TEMPLATE = """ + { + "index_patterns": ["k8s*"], + "template": { + "settings":{ + "index": { + "number_of_replicas": 0, + "number_of_shards": 2 + } + }, + "mappings":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "metricset": { + "type": "keyword" + }, + "k8s": { + "properties": { + "pod": { + "properties": { + "uid": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "ip": { + "type": "ip" + }, + "network": { + "properties": { + "tx": { + "type": "long" + }, + "rx": { + "type": "long" + } + } + } + } + } + } + } + } + } + }, + "data_stream": {} + }"""; + private static final String DOC = """ { "@timestamp": "$time", @@ -235,6 +290,82 @@ public void testSubsequentRollovers() throws Exception { } } + public void testMigrateRegularDataStreamToTsdbDataStream() throws Exception { + // Create a non tsdb template + var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1"); + putComposableIndexTemplateRequest.setJsonEntity(NON_TSDB_TEMPLATE); + assertOK(client().performRequest(putComposableIndexTemplateRequest)); + + // Index a few docs and sometimes rollover + int numRollovers = 4; + int numDocs = 32; + var currentTime = Instant.now(); + var currentMinus30Days = currentTime.minus(30, ChronoUnit.DAYS); + for (int i = 0; i < numRollovers; i++) { + for (int j = 0; j < numDocs; j++) { + var indexRequest = new Request("POST", "/k8s/_doc"); + var time = Instant.ofEpochMilli(randomLongBetween(currentMinus30Days.toEpochMilli(), currentTime.toEpochMilli())); + indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time))); + var response = client().performRequest(indexRequest); + assertOK(response); + var responseBody = entityAsMap(response); + // i rollovers and +1 offset: + assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", i + 1)); + } + var rolloverRequest = new Request("POST", "/k8s/_rollover"); + var rolloverResponse = client().performRequest(rolloverRequest); + assertOK(rolloverResponse); + var rolloverResponseBody = entityAsMap(rolloverResponse); + assertThat(rolloverResponseBody.get("rolled_over"), is(true)); + } + + var getDataStreamsRequest = new Request("GET", "/_data_stream"); + var getDataStreamResponse = client().performRequest(getDataStreamsRequest); + assertOK(getDataStreamResponse); + var dataStreams = entityAsMap(getDataStreamResponse); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s")); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(5)); + for (int i = 0; i < 5; i++) { + String backingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices." + i + ".index_name"); + assertThat(backingIndex, backingIndexEqualTo("k8s", i + 1)); + var indices = getIndex(backingIndex); + var escapedBackingIndex = backingIndex.replace(".", "\\."); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s")); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), nullValue()); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), nullValue()); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), nullValue()); + } + + // Update template + putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1"); + putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE); + assertOK(client().performRequest(putComposableIndexTemplateRequest)); + + var rolloverRequest = new Request("POST", "/k8s/_rollover"); + var rolloverResponse = client().performRequest(rolloverRequest); + assertOK(rolloverResponse); + var rolloverResponseBody = entityAsMap(rolloverResponse); + assertThat(rolloverResponseBody.get("rolled_over"), is(true)); + var newIndex = (String) rolloverResponseBody.get("new_index"); + assertThat(newIndex, backingIndexEqualTo("k8s", 6)); + + // Ingest documents that will land in the new tsdb backing index: + for (int i = 0; i < numDocs; i++) { + var indexRequest = new Request("POST", "/k8s/_doc"); + indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentTime))); + var response = client().performRequest(indexRequest); + assertOK(response); + var responseBody = entityAsMap(response); + assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", 6)); + } + + // Fail if documents target older non tsdb backing index: + var indexRequest = new Request("POST", "/k8s/_doc"); + indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentMinus30Days))); + var e = expectThrows(ResponseException.class, () -> client().performRequest(indexRequest)); + assertThat(e.getMessage(), containsString("is outside of ranges of currently writable indices")); + } + private static Map getIndex(String indexName) throws IOException { var getIndexRequest = new Request("GET", "/" + indexName + "?human"); var response = client().performRequest(getIndexRequest); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java index 41126f6d9f0df..531dcc1ac9a19 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java @@ -36,8 +36,16 @@ public Settings getAdditionalIndexSettings( ) { if (dataStreamName != null) { DataStream dataStream = metadata.dataStreams().get(dataStreamName); + // First backing index is created and then data stream is rolled over (in a single cluster state update). + // So at this point we can't check index_mode==time_series, + // so checking that index_mode==null|standard and templateIndexMode == TIME_SERIES + boolean migrating = dataStream != null + && (dataStream.getIndexMode() == null || dataStream.getIndexMode() == IndexMode.STANDARD) + && templateIndexMode == IndexMode.TIME_SERIES; IndexMode indexMode; - if (dataStream != null) { + if (migrating) { + indexMode = IndexMode.TIME_SERIES; + } else if (dataStream != null) { indexMode = dataStream.getIndexMode(); } else { indexMode = templateIndexMode; @@ -50,7 +58,7 @@ public Settings getAdditionalIndexSettings( TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings); final Instant start; final Instant end; - if (dataStream == null) { + if (dataStream == null || migrating) { start = resolvedAt.minusMillis(lookAheadTime.getMillis()); end = resolvedAt.plusMillis(lookAheadTime.getMillis()); } else { diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java index 84c897f4717be..d7d2652481534 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java @@ -44,6 +44,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; public class MetadataDataStreamRolloverServiceTests extends ESTestCase { @@ -63,7 +64,7 @@ public void testRolloverClusterStateForDataStream() throws Exception { IndexMode.TIME_SERIES ); ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*")) - .template(new Template(Settings.builder().put("index.mode", "time_series").build(), null, null)) + .template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null)) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES)) .build(); Metadata.Builder builder = Metadata.builder(); @@ -75,6 +76,7 @@ public void testRolloverClusterStateForDataStream() throws Exception { .put("index.hidden", true) .put(SETTING_INDEX_UUID, dataStream.getWriteIndex().getUUID()) .put("index.mode", "time_series") + .put("index.routing_path", "uid") .put("index.time_series.start_time", FORMATTER.format(now.minus(4, ChronoUnit.HOURS))) .put("index.time_series.end_time", FORMATTER.format(now.minus(2, ChronoUnit.HOURS))) ) @@ -144,4 +146,183 @@ public void testRolloverClusterStateForDataStream() throws Exception { } } + public void testRolloverAndMigrateDataStream() throws Exception { + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + String dataStreamName = "logs-my-app"; + IndexMode dsIndexMode = randomBoolean() ? null : IndexMode.STANDARD; + final DataStream dataStream = new DataStream( + dataStreamName, + new DataStream.TimestampField("@timestamp"), + List.of(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1, now.toEpochMilli()), "uuid")), + 1, + null, + false, + false, + false, + false, + dsIndexMode + ); + ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*")) + .template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES)) + .build(); + Metadata.Builder builder = Metadata.builder(); + builder.put("template", template); + Settings.Builder indexSettings = ESTestCase.settings(Version.CURRENT) + .put("index.hidden", true) + .put(SETTING_INDEX_UUID, dataStream.getWriteIndex().getUUID()); + if (dsIndexMode != null) { + indexSettings.put("index.mode", dsIndexMode.getName()); + } + builder.put( + IndexMetadata.builder(dataStream.getWriteIndex().getName()).settings(indexSettings).numberOfShards(1).numberOfReplicas(0) + ); + builder.put(dataStream); + final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + + ThreadPool testThreadPool = new TestThreadPool(getTestName()); + try { + MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( + dataStream, + testThreadPool, + Set.of(new DataStreamIndexSettingsProvider()), + xContentRegistry() + ); + MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); + List> metConditions = Collections.singletonList(condition); + CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_"); + + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + createIndexRequest, + metConditions, + now, + randomBoolean(), + false + ); + + String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration()); + String newIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1); + assertEquals(sourceIndexName, rolloverResult.sourceIndexName()); + assertEquals(newIndexName, rolloverResult.rolloverIndexName()); + Metadata rolloverMetadata = rolloverResult.clusterState().metadata(); + assertEquals(dataStream.getIndices().size() + 1, rolloverMetadata.indices().size()); + + // Assert data stream's index_mode has been changed to time_series. + assertThat(rolloverMetadata.dataStreams().get(dataStreamName), notNullValue()); + assertThat(rolloverMetadata.dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.TIME_SERIES)); + + // Nothing changed for the original backing index: + IndexMetadata im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(0)); + assertThat(IndexSettings.MODE.get(im.getSettings()), equalTo(IndexMode.STANDARD)); + assertThat(IndexSettings.TIME_SERIES_START_TIME.exists(im.getSettings()), is(false)); + assertThat(IndexSettings.TIME_SERIES_END_TIME.exists(im.getSettings()), is(false)); + // New backing index is a tsdb index: + im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(1)); + assertThat(IndexSettings.MODE.get(im.getSettings()), equalTo(IndexMode.TIME_SERIES)); + Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings()); + Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); + assertThat(startTime.isBefore(endTime), is(true)); + assertThat(startTime, equalTo(now.minus(2, ChronoUnit.HOURS))); + assertThat(endTime, equalTo(now.plus(2, ChronoUnit.HOURS))); + } finally { + testThreadPool.shutdown(); + } + } + + public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExistingDataStreams() throws Exception { + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + String dataStreamName = "logs-my-app"; + final DataStream dataStream = new DataStream( + dataStreamName, + new DataStream.TimestampField("@timestamp"), + List.of(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1, now.toEpochMilli()), "uuid")), + 1, + null, + false, + false, + false, + false, + IndexMode.TIME_SERIES + ); + ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*")) + .template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, randomBoolean() ? IndexMode.STANDARD : null)) + .build(); + Metadata.Builder builder = Metadata.builder(); + builder.put("template", template); + builder.put( + IndexMetadata.builder(dataStream.getWriteIndex().getName()) + .settings( + ESTestCase.settings(Version.CURRENT) + .put("index.hidden", true) + .put(SETTING_INDEX_UUID, dataStream.getWriteIndex().getUUID()) + .put("index.mode", "time_series") + .put("index.routing_path", "uid") + .put("index.time_series.start_time", FORMATTER.format(now.minus(4, ChronoUnit.HOURS))) + .put("index.time_series.end_time", FORMATTER.format(now.minus(2, ChronoUnit.HOURS))) + ) + .numberOfShards(1) + .numberOfReplicas(0) + ); + builder.put(dataStream); + final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + + ThreadPool testThreadPool = new TestThreadPool(getTestName()); + try { + MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( + dataStream, + testThreadPool, + Set.of(new DataStreamIndexSettingsProvider()), + xContentRegistry() + ); + MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); + List> metConditions = Collections.singletonList(condition); + CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_"); + + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + createIndexRequest, + metConditions, + now, + randomBoolean(), + false + ); + + String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration()); + String newIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1); + assertEquals(sourceIndexName, rolloverResult.sourceIndexName()); + assertEquals(newIndexName, rolloverResult.rolloverIndexName()); + Metadata rolloverMetadata = rolloverResult.clusterState().metadata(); + assertEquals(dataStream.getIndices().size() + 1, rolloverMetadata.indices().size()); + + // Assert data stream's index_mode remains time_series. + assertThat(rolloverMetadata.dataStreams().get(dataStreamName), notNullValue()); + assertThat(rolloverMetadata.dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.TIME_SERIES)); + + // Nothing changed for the original tsdb backing index: + IndexMetadata im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(0)); + assertThat(IndexSettings.MODE.exists(im.getSettings()), is(true)); + Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings()); + Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); + assertThat(startTime.isBefore(endTime), is(true)); + assertThat(startTime, equalTo(now.minus(4, ChronoUnit.HOURS))); + assertThat(endTime, equalTo(now.minus(2, ChronoUnit.HOURS))); + // New backing index is also a tsdb index: + im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(1)); + assertThat(IndexSettings.MODE.get(im.getSettings()), equalTo(IndexMode.TIME_SERIES)); + startTime = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings()); + endTime = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); + assertThat(startTime.isBefore(endTime), is(true)); + assertThat(startTime, equalTo(now.minus(2, ChronoUnit.HOURS))); + assertThat(endTime, equalTo(now.plus(2, ChronoUnit.HOURS))); + } finally { + testThreadPool.shutdown(); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index a0b0f7341ce10..cdea744b6fec1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -278,7 +278,9 @@ private RolloverResult rolloverDataStream( currentState, createIndexClusterStateRequest, silent, - (builder, indexMetadata) -> builder.put(ds.rollover(indexMetadata.getIndex(), newGeneration)) + (builder, indexMetadata) -> builder.put( + ds.rollover(indexMetadata.getIndex(), newGeneration, templateV2.getDataStreamTemplate().getIndexMode()) + ) ); RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 3e1ef209c3ea6..f16812a1570ef 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -171,8 +171,14 @@ public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) { Index index = indices.get(i); IndexMetadata im = metadata.index(index); - // TODO: make start and end time fields in IndexMetadata class. + // TODO: make index_mode, start and end time fields in IndexMetadata class. // (this to avoid the overhead that occurs when reading a setting) + if (IndexSettings.MODE.get(im.getSettings()) != IndexMode.TIME_SERIES) { + // Not a tsdb backing index, so skip. + // (This can happen is this is a migrated tsdb data stream) + continue; + } + Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings()); Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); // Check should be in sync with DataStreamTimestampFieldMapper#validateTimestamp(...) method @@ -192,12 +198,19 @@ public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) { public void validate(Function imSupplier) { if (indexMode == IndexMode.TIME_SERIES) { // Get a sorted overview of each backing index with there start and end time range: - var startAndEndTimes = indices.stream().map(index -> imSupplier.apply(index.getName())).map(im -> { - Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings()); - Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); - assert end.isAfter(start); // This is also validated by TIME_SERIES_END_TIME setting. - return new Tuple<>(im.getIndex().getName(), new Tuple<>(start, end)); - }) + var startAndEndTimes = indices.stream() + .map(index -> imSupplier.apply(index.getName())) + .filter( + // Migrated tsdb data streams have non tsdb backing indices: + im -> IndexSettings.TIME_SERIES_START_TIME.exists(im.getSettings()) + && IndexSettings.TIME_SERIES_END_TIME.exists(im.getSettings()) + ) + .map(im -> { + Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings()); + Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); + assert end.isAfter(start); // This is also validated by TIME_SERIES_END_TIME setting. + return new Tuple<>(im.getIndex().getName(), new Tuple<>(start, end)); + }) .sorted(Comparator.comparing(entry -> entry.v2().v1())) // Sort by start time .collect(Collectors.toList()); @@ -265,21 +278,29 @@ public IndexMode getIndexMode() { * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. * - * @param writeIndex new write index - * @param generation new generation + * @param writeIndex new write index + * @param generation new generation + * @param indexModeFromTemplate the index mode as is defined in the template that created this data stream * * @return new {@code DataStream} instance with the rollover operation applied */ - public DataStream rollover(Index writeIndex, long generation) { + public DataStream rollover(Index writeIndex, long generation, IndexMode indexModeFromTemplate) { ensureNotReplicated(); - return unsafeRollover(writeIndex, generation); + return unsafeRollover(writeIndex, generation, indexModeFromTemplate); } /** - * Like {@link #rollover(Index, long)}, but does no validation, use with care only. + * Like {@link #rollover(Index, long, IndexMode)}, but does no validation, use with care only. */ - public DataStream unsafeRollover(Index writeIndex, long generation) { + public DataStream unsafeRollover(Index writeIndex, long generation, IndexMode indexModeFromTemplate) { + IndexMode indexMode = this.indexMode; + // This allows for migrating a data stream to be a tsdb data stream: + // (only if index_mode=null|standard then allow it to be set to time_series) + if ((indexMode == null || indexMode == IndexMode.STANDARD) && indexModeFromTemplate == IndexMode.TIME_SERIES) { + indexMode = IndexMode.TIME_SERIES; + } + List backingIndices = new ArrayList<>(indices); backingIndices.add(writeIndex); return new DataStream( @@ -298,7 +319,7 @@ public DataStream unsafeRollover(Index writeIndex, long generation) { /** * Performs a dummy rollover on a {@code DataStream} instance and returns the tuple of the next write index name and next generation - * that this {@code DataStream} should roll over to using {@link #rollover(Index, long)}. + * that this {@code DataStream} should roll over to using {@link #rollover(Index, long, IndexMode)}. * * @param clusterMetadata Cluster metadata * diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 472d69b19c619..63b73cbaa77cd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -60,7 +61,7 @@ protected DataStream createTestInstance() { public void testRollover() { DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream(); Tuple newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA); - final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2()); + final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); @@ -86,13 +87,69 @@ public void testRolloverWithConflictingBackingIndexName() { } final Tuple newCoordinates = ds.nextWriteIndexAndGeneration(builder.build()); - final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2()); + final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); assertTrue(rolledDs.getIndices().containsAll(ds.getIndices())); assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex())); + assertThat(rolledDs.getIndexMode(), equalTo(ds.getIndexMode())); + } + + public void testRolloverIndexMode() { + IndexMode indexMode = randomBoolean() ? IndexMode.STANDARD : null; + DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream(); + // Unsure index_mode=null + ds = new DataStream( + ds.getName(), + ds.getTimeStampField(), + ds.getIndices(), + ds.getGeneration(), + ds.getMetadata(), + ds.isHidden(), + ds.isReplicated(), + ds.isSystem(), + ds.isAllowCustomRouting(), + indexMode + ); + var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA); + + var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), IndexMode.TIME_SERIES); + assertThat(rolledDs.getName(), equalTo(ds.getName())); + assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField())); + assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); + assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); + assertTrue(rolledDs.getIndices().containsAll(ds.getIndices())); + assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex())); + assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES)); + } + + public void testRolloverIndexMode_keepIndexMode() { + DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream(); + ds = new DataStream( + ds.getName(), + ds.getTimeStampField(), + ds.getIndices(), + ds.getGeneration(), + ds.getMetadata(), + ds.isHidden(), + ds.isReplicated(), + ds.isSystem(), + ds.isAllowCustomRouting(), + IndexMode.TIME_SERIES + ); + var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA); + + IndexMode indexMode = randomBoolean() ? IndexMode.STANDARD : null; + var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), indexMode); + assertThat(rolledDs.getName(), equalTo(ds.getName())); + assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField())); + assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); + assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); + assertTrue(rolledDs.getIndices().containsAll(ds.getIndices())); + assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex())); + assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES)); } public void testRemoveBackingIndex() { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 584a2a130afac..3e81b71292390 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -289,6 +289,10 @@ public static ClusterState getClusterStateWithDataStreams( boolean replicated ) { Metadata.Builder builder = Metadata.builder(); + builder.put( + "template_1", + new ComposableIndexTemplate(List.of("*"), null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate()) + ); List allIndices = new ArrayList<>(); for (Tuple dsTuple : dataStreams) { diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index 9a10b5a98952c..1989fece08243 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -558,7 +558,7 @@ private SingleForecast forecast(Metadata metadata, IndexAbstraction.DataStream s for (int i = 0; i < numberNewIndices; ++i) { final String uuid = UUIDs.randomBase64UUID(); final Tuple rolledDataStreamInfo = dataStream.unsafeNextWriteIndexAndGeneration(state.metadata()); - dataStream = dataStream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2()); + dataStream = dataStream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2(), null); // this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices // not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries.