From f18b9d5ac8981b01848bfb76156189bba1b51f1e Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Fri, 3 Sep 2021 09:42:48 -0400 Subject: [PATCH] Add segment sorter for data streams (#75195) It is beneficial to sort segments within a datastream's index by desc order of their max timestamp field, so that the most recent (in terms of timestamp) segments will be first. This allows to speed up sort query on @timestamp desc field, which is the most common type of query for datastreams, as we are mostly concerned with the recent data. This patch addressed this for writable indices. Segments' sorter is different from index sorting. An index sorter by itself is concerned about the order of docs within an individual segment (and not how the segments are organized), while the segment sorter is only used during search and allows to start docs collection with the "right" segment, so we can terminate the collection faster. This PR adds a property to IndexShard `isDataStreamIndex` that shows if a shard is a part of datastream. --- .../indices/IndexingMemoryControllerIT.java | 2 +- .../cluster/metadata/DataStream.java | 23 ++++ .../index/engine/EngineConfig.java | 15 ++- .../index/engine/InternalEngine.java | 5 + .../index/engine/ReadOnlyEngine.java | 2 + .../elasticsearch/index/shard/IndexShard.java | 7 +- .../index/engine/InternalEngineTests.java | 5 +- .../index/shard/IndexShardTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 3 +- .../IndexingMemoryControllerTests.java | 2 +- .../index/engine/EngineTestCase.java | 11 +- .../index/engine/FollowingEngineTests.java | 3 +- .../datastreams/DataStreamIT.java | 43 ++++++- .../xpack/MigrateToDataTiersIT.java | 2 - ...1_sort_segments_migrate_to_data_stream.yml | 107 ++++++++++++++++++ 15 files changed, 212 insertions(+), 20 deletions(-) create mode 100644 x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 19dc61ef36049..cb4481b42d1d8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index d176f0cc4ef2f..70557878772e6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -7,6 +7,9 @@ */ package org.elasticsearch.cluster.metadata; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.PointValues; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.core.Nullable; @@ -25,6 +28,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -36,6 +40,25 @@ public final class DataStream extends AbstractDiffable implements To public static final String BACKING_INDEX_PREFIX = ".ds-"; public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); + // Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations + public static Comparator DATASTREAM_LEAF_READERS_SORTER = + Comparator.comparingLong( + (LeafReader r) -> { + try { + PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + if (points != null) { + byte[] sortValue = points.getMaxPackedValue(); + return LongPoint.decodeDimension(sortValue, 0); + } else if (r.numDocs() == 0) { + // points can be null if the segment contains only deleted documents + return Long.MIN_VALUE; + } + } catch (IOException e) { + } + throw new IllegalStateException("Can't access [" + + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field for the data stream!"); + }) + .reversed(); private final LongSupplier timeProvider; private final String name; diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 9b7ba7986167b..1714dad668e2c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -9,6 +9,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; @@ -32,6 +33,7 @@ import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -70,6 +72,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; + private final Comparator leafSorter; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -131,7 +134,8 @@ public EngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) { + IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, + Comparator leafSorter) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -169,6 +173,7 @@ public EngineConfig( this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.snapshotCommitSupplier = snapshotCommitSupplier; + this.leafSorter = leafSorter; } /** @@ -353,4 +358,12 @@ public LongSupplier getPrimaryTermSupplier() { public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() { return snapshotCommitSupplier; } + + /** + * Returns how segments should be sorted for reading or @null if no sorting should be applied. + */ + @Nullable + public Comparator getLeafSorter() { + return leafSorter; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ebcdcd742f430..67014b0ca824c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2193,6 +2193,11 @@ private IndexWriterConfig getIndexWriterConfig() { if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); } + // Provide a custom leaf sorter, so that index readers opened from this writer + // will have its leaves sorted according the given leaf sorter. + if (engineConfig.getLeafSorter() != null) { + iwc.setLeafSorter(engineConfig.getLeafSorter()); + } return iwc; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 54c10dbef9449..0f2441dcbc621 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -199,6 +199,8 @@ protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader, } protected DirectoryReader open(IndexCommit commit) throws IOException { + // TODO: provide engineConfig.getLeafSorter() when opening a DirectoryReader from a commit + // should be available from Lucene v 8.10 assert Transports.assertNotTransportThread("opening index commit of a read-only engine"); if (lazilyLoadSoftDeletes) { return new LazySoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 71ea3f1b103d4..dc82261235d20 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -9,6 +9,7 @@ package org.elasticsearch.index.shard; import com.carrotsearch.hppc.ObjectLongMap; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.analysis.Analyzer; @@ -185,6 +186,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -283,6 +285,7 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; + private final boolean isDataStreamIndex; // if a shard is a part of data stream public IndexShard( final ShardRouting shardRouting, @@ -387,6 +390,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); + this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); } public ThreadPool getThreadPool() { @@ -2912,7 +2916,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { globalCheckpointSupplier, replicationTracker::getRetentionLeases, this::getOperationPrimaryTerm, - snapshotCommitSupplier); + snapshotCommitSupplier, + isDataStreamIndex ? DATASTREAM_LEAF_READERS_SORTER : null); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9386a47d78f7e..78299276d55cf 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2948,7 +2948,8 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! @@ -6021,7 +6022,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index caad76894d9a9..469ab57145fde 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3928,7 +3928,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, config.getLeafSorter()); return new InternalEngine(configWithWarmer); }); Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index a59e2defec763..bb022ddad0a41 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -139,7 +139,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index de7073d3cc15b..8aafddd59df43 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } ThreadPoolStats.Stats getRefreshThreadPoolStats() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 4b28476055856..cb21ca131024d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -238,7 +238,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -248,7 +248,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -258,7 +258,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override @@ -669,7 +669,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath) { @@ -683,7 +684,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index f2cd8a103b998..1fcecf918c606 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -272,7 +272,8 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } private static Store createStore( diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 199c990636e92..0568335ebd674 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -62,6 +62,9 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; @@ -1213,10 +1216,6 @@ public void testGetDataStream() throws Exception { assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue())); } - private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) { - assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date")); - } - private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping, Map expectedMapping) { GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet(); assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue()); @@ -1481,6 +1480,42 @@ public void testMultiThreadedRollover() throws Exception { ); } + // Test that datastream's segments by default are sorted on @timestamp desc + public void testSegmentsSortedOnTimestampDesc() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + // We index data in the increasing order of @timestamp field + int numDocs1 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs1); // 1st segment + int numDocs2 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs2); // 2nd segment + int numDocs3 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs3); // 3rd segment + int totalDocs = numDocs1 + numDocs2 + numDocs3; + + SearchSourceBuilder source = new SearchSourceBuilder(); + source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis")); + source.size(totalDocs); + SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value); + SearchHit[] hits = searchResponse.getHits().getHits(); + assertEquals(totalDocs, hits.length); + + // Test that when we read data, segments come in the reverse order with a segment with the latest date first + long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg + long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg + long timestamp3 = Long.valueOf(hits[0 + numDocs3 + numDocs2].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg + assertTrue(timestamp1 > timestamp2); + assertTrue(timestamp2 > timestamp3); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java index 88697dde12b7c..4d3942ae2445c 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java @@ -60,8 +60,6 @@ import static org.hamcrest.Matchers.nullValue; public class MigrateToDataTiersIT extends ESRestTestCase { - private static final Logger logger = LogManager.getLogger(MigrateToDataTiersIT.class); - private String index; private String policy; private String alias; diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml new file mode 100644 index 0000000000000..a3e121abdddfe --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml @@ -0,0 +1,107 @@ +--- +"Test that datastream index segments are sorted on timestamp field desc after data stream migration": + - skip: + #TODO: adjust version after backport + version: " - 7.99.99" + reason: "sorting segments was added in 7.15" + features: allowed_warnings + + - do: + indices.put_index_template: + name: my-template + body: + index_patterns: [ my_ds ] + data_stream: { } + template: + settings: + number_of_shards: 1 + number_of_replicas: 0 + + - do: + indices.create: + index: test_index1 + body: + aliases: + my_ds: + is_write_index: true + + # 1st segment + - do: + index: + index: my_ds + body: { "foo": "bar1", "@timestamp": "2021-08-01" } + refresh: true + + # 2nd segment + - do: + index: + index: my_ds + body: { "foo": "bar2", "@timestamp": "2021-08-02" } + refresh: true + + # test that segments are sorted as indexed by @timestamp ASC + - do: + search: + index: my_ds + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-01"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-02"] } + + # migrate to data-stream + - do: + indices.migrate_to_data_stream: + name: my_ds + - is_true: acknowledged + + # test that segments are still sorted as indexed by @timestamp ASC + # as we don't reopen existing shards and index readers after migration + - do: + search: + index: my_ds + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-01"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-02"] } + + # rollover data stream to create new backing index + - do: + indices.rollover: + alias: "my_ds" + - match: { rolled_over: true } + # save the new backing index names for later use + - set: { new_index: idx0name } + + # 1st segment in the new backing index + - do: + index: + index: my_ds + body: { "foo": "bar3", "@timestamp": "2021-08-03" } + refresh: true + + # 2nd segment in the new backing index + - do: + index: + index: my_ds + body: { "foo": "bar4", "@timestamp": "2021-08-04" } + refresh: true + + + # test that segments are sorted by @timestamp DESC in the new backing index, + # as the newly created index and shard pick up the index leaf sorter + - do: + search: + index: $idx0name + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-04"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-03"] } + + + - do: + indices.delete_data_stream: + name: my_ds + - is_true: acknowledged