Skip to content

Commit

Permalink
Allow regular data streams to be migrated to tsdb data streams. (elas…
Browse files Browse the repository at this point in the history
…tic#83843)

A regular data stream can be migrated to a tsdb data stream if in template that created the data stream, the `index_mode` field is set to `time_series` and the data stream's `index_mode` property is either not specified or set to `standard`. Then on the next rollover the data stream is migrated to be a tsdb data stream.

When that happens the data stream's `index_mode` property is set to `time_series` and the new backing index's `index.mode` index setting is also set to `time_series`.

Closes elastic#83520
  • Loading branch information
martijnvg authored and probakowski committed Feb 23, 2022
1 parent d9fbed5 commit 90e837c
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,27 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class TsdbDataStreamRestIT extends ESRestTestCase {

Expand Down Expand Up @@ -84,6 +88,57 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
}
}""";

private static final String NON_TSDB_TEMPLATE = """
{
"index_patterns": ["k8s*"],
"template": {
"settings":{
"index": {
"number_of_replicas": 0,
"number_of_shards": 2
}
},
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword"
},
"k8s": {
"properties": {
"pod": {
"properties": {
"uid": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"ip": {
"type": "ip"
},
"network": {
"properties": {
"tx": {
"type": "long"
},
"rx": {
"type": "long"
}
}
}
}
}
}
}
}
}
},
"data_stream": {}
}""";

private static final String DOC = """
{
"@timestamp": "$time",
Expand Down Expand Up @@ -235,6 +290,82 @@ public void testSubsequentRollovers() throws Exception {
}
}

public void testMigrateRegularDataStreamToTsdbDataStream() throws Exception {
// Create a non tsdb template
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(NON_TSDB_TEMPLATE);
assertOK(client().performRequest(putComposableIndexTemplateRequest));

// Index a few docs and sometimes rollover
int numRollovers = 4;
int numDocs = 32;
var currentTime = Instant.now();
var currentMinus30Days = currentTime.minus(30, ChronoUnit.DAYS);
for (int i = 0; i < numRollovers; i++) {
for (int j = 0; j < numDocs; j++) {
var indexRequest = new Request("POST", "/k8s/_doc");
var time = Instant.ofEpochMilli(randomLongBetween(currentMinus30Days.toEpochMilli(), currentTime.toEpochMilli()));
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
var response = client().performRequest(indexRequest);
assertOK(response);
var responseBody = entityAsMap(response);
// i rollovers and +1 offset:
assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", i + 1));
}
var rolloverRequest = new Request("POST", "/k8s/_rollover");
var rolloverResponse = client().performRequest(rolloverRequest);
assertOK(rolloverResponse);
var rolloverResponseBody = entityAsMap(rolloverResponse);
assertThat(rolloverResponseBody.get("rolled_over"), is(true));
}

var getDataStreamsRequest = new Request("GET", "/_data_stream");
var getDataStreamResponse = client().performRequest(getDataStreamsRequest);
assertOK(getDataStreamResponse);
var dataStreams = entityAsMap(getDataStreamResponse);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(5));
for (int i = 0; i < 5; i++) {
String backingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices." + i + ".index_name");
assertThat(backingIndex, backingIndexEqualTo("k8s", i + 1));
var indices = getIndex(backingIndex);
var escapedBackingIndex = backingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), nullValue());
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), nullValue());
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), nullValue());
}

// Update template
putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
assertOK(client().performRequest(putComposableIndexTemplateRequest));

var rolloverRequest = new Request("POST", "/k8s/_rollover");
var rolloverResponse = client().performRequest(rolloverRequest);
assertOK(rolloverResponse);
var rolloverResponseBody = entityAsMap(rolloverResponse);
assertThat(rolloverResponseBody.get("rolled_over"), is(true));
var newIndex = (String) rolloverResponseBody.get("new_index");
assertThat(newIndex, backingIndexEqualTo("k8s", 6));

// Ingest documents that will land in the new tsdb backing index:
for (int i = 0; i < numDocs; i++) {
var indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentTime)));
var response = client().performRequest(indexRequest);
assertOK(response);
var responseBody = entityAsMap(response);
assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", 6));
}

// Fail if documents target older non tsdb backing index:
var indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentMinus30Days)));
var e = expectThrows(ResponseException.class, () -> client().performRequest(indexRequest));
assertThat(e.getMessage(), containsString("is outside of ranges of currently writable indices"));
}

private static Map<?, ?> getIndex(String indexName) throws IOException {
var getIndexRequest = new Request("GET", "/" + indexName + "?human");
var response = client().performRequest(getIndexRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ public Settings getAdditionalIndexSettings(
) {
if (dataStreamName != null) {
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
// First backing index is created and then data stream is rolled over (in a single cluster state update).
// So at this point we can't check index_mode==time_series,
// so checking that index_mode==null|standard and templateIndexMode == TIME_SERIES
boolean migrating = dataStream != null
&& (dataStream.getIndexMode() == null || dataStream.getIndexMode() == IndexMode.STANDARD)
&& templateIndexMode == IndexMode.TIME_SERIES;
IndexMode indexMode;
if (dataStream != null) {
if (migrating) {
indexMode = IndexMode.TIME_SERIES;
} else if (dataStream != null) {
indexMode = dataStream.getIndexMode();
} else {
indexMode = templateIndexMode;
Expand All @@ -50,7 +58,7 @@ public Settings getAdditionalIndexSettings(
TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings);
final Instant start;
final Instant end;
if (dataStream == null) {
if (dataStream == null || migrating) {
start = resolvedAt.minusMillis(lookAheadTime.getMillis());
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
} else {
Expand Down
Loading

0 comments on commit 90e837c

Please sign in to comment.