diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 19dc61ef36049..cb4481b42d1d8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index d176f0cc4ef2f..70557878772e6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -7,6 +7,9 @@ */ package org.elasticsearch.cluster.metadata; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.PointValues; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.core.Nullable; @@ -25,6 +28,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -36,6 +40,25 @@ public final class DataStream extends AbstractDiffable implements To public static final String BACKING_INDEX_PREFIX = ".ds-"; public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); + // Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations + public static Comparator DATASTREAM_LEAF_READERS_SORTER = + Comparator.comparingLong( + (LeafReader r) -> { + try { + PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + if (points != null) { + byte[] sortValue = points.getMaxPackedValue(); + return LongPoint.decodeDimension(sortValue, 0); + } else if (r.numDocs() == 0) { + // points can be null if the segment contains only deleted documents + return Long.MIN_VALUE; + } + } catch (IOException e) { + } + throw new IllegalStateException("Can't access [" + + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field for the data stream!"); + }) + .reversed(); private final LongSupplier timeProvider; private final String name; diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 9b7ba7986167b..1714dad668e2c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -9,6 +9,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; @@ -32,6 +33,7 @@ import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -70,6 +72,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; + private final Comparator leafSorter; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -131,7 +134,8 @@ public EngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) { + IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, + Comparator leafSorter) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -169,6 +173,7 @@ public EngineConfig( this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.snapshotCommitSupplier = snapshotCommitSupplier; + this.leafSorter = leafSorter; } /** @@ -353,4 +358,12 @@ public LongSupplier getPrimaryTermSupplier() { public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() { return snapshotCommitSupplier; } + + /** + * Returns how segments should be sorted for reading or @null if no sorting should be applied. + */ + @Nullable + public Comparator getLeafSorter() { + return leafSorter; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ebcdcd742f430..67014b0ca824c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2193,6 +2193,11 @@ private IndexWriterConfig getIndexWriterConfig() { if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); } + // Provide a custom leaf sorter, so that index readers opened from this writer + // will have its leaves sorted according the given leaf sorter. + if (engineConfig.getLeafSorter() != null) { + iwc.setLeafSorter(engineConfig.getLeafSorter()); + } return iwc; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 54c10dbef9449..0f2441dcbc621 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -199,6 +199,8 @@ protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader, } protected DirectoryReader open(IndexCommit commit) throws IOException { + // TODO: provide engineConfig.getLeafSorter() when opening a DirectoryReader from a commit + // should be available from Lucene v 8.10 assert Transports.assertNotTransportThread("opening index commit of a read-only engine"); if (lazilyLoadSoftDeletes) { return new LazySoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 71ea3f1b103d4..dc82261235d20 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -9,6 +9,7 @@ package org.elasticsearch.index.shard; import com.carrotsearch.hppc.ObjectLongMap; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.analysis.Analyzer; @@ -185,6 +186,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -283,6 +285,7 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; + private final boolean isDataStreamIndex; // if a shard is a part of data stream public IndexShard( final ShardRouting shardRouting, @@ -387,6 +390,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); + this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); } public ThreadPool getThreadPool() { @@ -2912,7 +2916,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { globalCheckpointSupplier, replicationTracker::getRetentionLeases, this::getOperationPrimaryTerm, - snapshotCommitSupplier); + snapshotCommitSupplier, + isDataStreamIndex ? DATASTREAM_LEAF_READERS_SORTER : null); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9386a47d78f7e..78299276d55cf 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2948,7 +2948,8 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! @@ -6021,7 +6022,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index caad76894d9a9..469ab57145fde 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3928,7 +3928,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, config.getLeafSorter()); return new InternalEngine(configWithWarmer); }); Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index a59e2defec763..bb022ddad0a41 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -139,7 +139,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index de7073d3cc15b..8aafddd59df43 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } ThreadPoolStats.Stats getRefreshThreadPoolStats() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 4b28476055856..cb21ca131024d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -238,7 +238,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -248,7 +248,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -258,7 +258,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override @@ -669,7 +669,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath) { @@ -683,7 +684,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index f2cd8a103b998..1fcecf918c606 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -272,7 +272,8 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } private static Store createStore( diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 199c990636e92..0568335ebd674 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -62,6 +62,9 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; @@ -1213,10 +1216,6 @@ public void testGetDataStream() throws Exception { assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue())); } - private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) { - assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date")); - } - private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping, Map expectedMapping) { GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet(); assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue()); @@ -1481,6 +1480,42 @@ public void testMultiThreadedRollover() throws Exception { ); } + // Test that datastream's segments by default are sorted on @timestamp desc + public void testSegmentsSortedOnTimestampDesc() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + // We index data in the increasing order of @timestamp field + int numDocs1 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs1); // 1st segment + int numDocs2 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs2); // 2nd segment + int numDocs3 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs3); // 3rd segment + int totalDocs = numDocs1 + numDocs2 + numDocs3; + + SearchSourceBuilder source = new SearchSourceBuilder(); + source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis")); + source.size(totalDocs); + SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value); + SearchHit[] hits = searchResponse.getHits().getHits(); + assertEquals(totalDocs, hits.length); + + // Test that when we read data, segments come in the reverse order with a segment with the latest date first + long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg + long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg + long timestamp3 = Long.valueOf(hits[0 + numDocs3 + numDocs2].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg + assertTrue(timestamp1 > timestamp2); + assertTrue(timestamp2 > timestamp3); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java index 88697dde12b7c..4d3942ae2445c 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java @@ -60,8 +60,6 @@ import static org.hamcrest.Matchers.nullValue; public class MigrateToDataTiersIT extends ESRestTestCase { - private static final Logger logger = LogManager.getLogger(MigrateToDataTiersIT.class); - private String index; private String policy; private String alias; diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml new file mode 100644 index 0000000000000..a3e121abdddfe --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml @@ -0,0 +1,107 @@ +--- +"Test that datastream index segments are sorted on timestamp field desc after data stream migration": + - skip: + #TODO: adjust version after backport + version: " - 7.99.99" + reason: "sorting segments was added in 7.15" + features: allowed_warnings + + - do: + indices.put_index_template: + name: my-template + body: + index_patterns: [ my_ds ] + data_stream: { } + template: + settings: + number_of_shards: 1 + number_of_replicas: 0 + + - do: + indices.create: + index: test_index1 + body: + aliases: + my_ds: + is_write_index: true + + # 1st segment + - do: + index: + index: my_ds + body: { "foo": "bar1", "@timestamp": "2021-08-01" } + refresh: true + + # 2nd segment + - do: + index: + index: my_ds + body: { "foo": "bar2", "@timestamp": "2021-08-02" } + refresh: true + + # test that segments are sorted as indexed by @timestamp ASC + - do: + search: + index: my_ds + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-01"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-02"] } + + # migrate to data-stream + - do: + indices.migrate_to_data_stream: + name: my_ds + - is_true: acknowledged + + # test that segments are still sorted as indexed by @timestamp ASC + # as we don't reopen existing shards and index readers after migration + - do: + search: + index: my_ds + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-01"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-02"] } + + # rollover data stream to create new backing index + - do: + indices.rollover: + alias: "my_ds" + - match: { rolled_over: true } + # save the new backing index names for later use + - set: { new_index: idx0name } + + # 1st segment in the new backing index + - do: + index: + index: my_ds + body: { "foo": "bar3", "@timestamp": "2021-08-03" } + refresh: true + + # 2nd segment in the new backing index + - do: + index: + index: my_ds + body: { "foo": "bar4", "@timestamp": "2021-08-04" } + refresh: true + + + # test that segments are sorted by @timestamp DESC in the new backing index, + # as the newly created index and shard pick up the index leaf sorter + - do: + search: + index: $idx0name + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-04"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-03"] } + + + - do: + indices.delete_data_stream: + name: my_ds + - is_true: acknowledged