From b83035183064d4684e9f0922255a679c384b7573 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Fri, 3 Sep 2021 11:33:40 -0400 Subject: [PATCH] Add segment sorter for data streams (#75195) (#77261) It is beneficial to sort segments within a datastream's index by desc order of their max timestamp field, so that the most recent (in terms of timestamp) segments will be first. This allows to speed up sort query on @timestamp desc field, which is the most common type of query for datastreams, as we are mostly concerned with the recent data. This patch addressed this for writable indices. Segments' sorter is different from index sorting. An index sorter by itself is concerned about the order of docs within an individual segment (and not how the segments are organized), while the segment sorter is only used during search and allows to start docs collection with the "right" segment, so we can terminate the collection faster. This PR adds a property to IndexShard `isDataStreamIndex` that shows if a shard is a part of datastream. Backport for #75195 --- .../indices/IndexingMemoryControllerIT.java | 2 +- .../cluster/metadata/DataStream.java | 23 ++++ .../index/engine/EngineConfig.java | 15 ++- .../index/engine/InternalEngine.java | 5 + .../index/engine/ReadOnlyEngine.java | 2 + .../elasticsearch/index/shard/IndexShard.java | 7 +- .../index/engine/InternalEngineTests.java | 5 +- .../index/shard/IndexShardTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 3 +- .../IndexingMemoryControllerTests.java | 2 +- .../index/engine/EngineTestCase.java | 11 +- .../index/engine/FollowingEngineTests.java | 3 +- .../datastreams/DataStreamIT.java | 43 ++++++- .../xpack/MigrateToDataTiersIT.java | 2 - ...1_sort_segments_migrate_to_data_stream.yml | 107 ++++++++++++++++++ 15 files changed, 212 insertions(+), 20 deletions(-) create mode 100644 x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 4c182321b2fc4..1322a8346dad4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -56,7 +56,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index a239a402d7826..6157b4b0edda7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -8,6 +8,9 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.Version; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.PointValues; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.core.Nullable; @@ -26,6 +29,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -37,6 +41,25 @@ public final class DataStream extends AbstractDiffable implements To public static final String BACKING_INDEX_PREFIX = ".ds-"; public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); + // Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations + public static Comparator DATASTREAM_LEAF_READERS_SORTER = + Comparator.comparingLong( + (LeafReader r) -> { + try { + PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + if (points != null) { + byte[] sortValue = points.getMaxPackedValue(); + return LongPoint.decodeDimension(sortValue, 0); + } else if (r.numDocs() == 0) { + // points can be null if the segment contains only deleted documents + return Long.MIN_VALUE; + } + } catch (IOException e) { + } + throw new IllegalStateException("Can't access [" + + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field for the data stream!"); + }) + .reversed(); /** * The version when data stream metadata, hidden and replicated data streams, and dates in backing index names was introduced. diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 2b678d6ac187f..c65382db05d87 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -9,6 +9,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; @@ -32,6 +33,7 @@ import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -70,6 +72,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; + private final Comparator leafSorter; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -141,7 +144,8 @@ public EngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) { + IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, + Comparator leafSorter) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -179,6 +183,7 @@ public EngineConfig( this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.snapshotCommitSupplier = snapshotCommitSupplier; + this.leafSorter = leafSorter; } /** @@ -371,4 +376,12 @@ public LongSupplier getPrimaryTermSupplier() { public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() { return snapshotCommitSupplier; } + + /** + * Returns how segments should be sorted for reading or @null if no sorting should be applied. + */ + @Nullable + public Comparator getLeafSorter() { + return leafSorter; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8939351af44f5..af10fd26d13b8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2346,6 +2346,11 @@ private IndexWriterConfig getIndexWriterConfig() { if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); } + // Provide a custom leaf sorter, so that index readers opened from this writer + // will have its leaves sorted according the given leaf sorter. + if (engineConfig.getLeafSorter() != null) { + iwc.setLeafSorter(engineConfig.getLeafSorter()); + } return iwc; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index e987fb72efa30..eb9fed1303ea3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -202,6 +202,8 @@ protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader, } protected DirectoryReader open(IndexCommit commit) throws IOException { + // TODO: provide engineConfig.getLeafSorter() when opening a DirectoryReader from a commit + // should be available from Lucene v 8.10 assert Transports.assertNotTransportThread("opening index commit of a read-only engine"); if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { if (lazilyLoadSoftDeletes) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index eee4d4f1efe61..e6250d70b6c21 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -9,6 +9,7 @@ package org.elasticsearch.index.shard; import com.carrotsearch.hppc.ObjectLongMap; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.analysis.Analyzer; @@ -187,6 +188,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -284,6 +286,7 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; + private final boolean isDataStreamIndex; // if a shard is a part of data stream public IndexShard( final ShardRouting shardRouting, @@ -387,6 +390,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); + this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); } public ThreadPool getThreadPool() { @@ -3010,7 +3014,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { globalCheckpointSupplier, replicationTracker::getRetentionLeases, this::getOperationPrimaryTerm, - snapshotCommitSupplier); + snapshotCommitSupplier, + isDataStreamIndex ? DATASTREAM_LEAF_READERS_SORTER : null); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c2e4676de3e2e..a115a0078d27c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3306,7 +3306,8 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! @@ -6400,7 +6401,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c9af31f72ad3c..0ea2e3b2571a7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4204,7 +4204,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, config.getLeafSorter()); return new InternalEngine(configWithWarmer); }); Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index e892458320d46..852b784bd2b77 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -138,7 +138,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 64b7be9601797..1282fb2a96f59 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } ThreadPoolStats.Stats getRefreshThreadPoolStats() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index d2dc76950d8ae..5eb263d23266a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -242,7 +242,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -252,7 +252,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -262,7 +262,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override @@ -674,7 +674,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath) { @@ -688,7 +689,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index cf8c150c3ff3f..d1dc97a34cb98 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -272,7 +272,8 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } private static Store createStore( diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 1eb535bcd15f4..549354ea78110 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -64,6 +64,9 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; @@ -1222,10 +1225,6 @@ public void testGetDataStream() throws Exception { assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue())); } - private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) { - assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date")); - } - private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping, java.util.Map expectedMapping) { GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet(); assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue()); @@ -1490,6 +1489,42 @@ public void testMultiThreadedRollover() throws Exception { ); } + // Test that datastream's segments by default are sorted on @timestamp desc + public void testSegmentsSortedOnTimestampDesc() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + // We index data in the increasing order of @timestamp field + int numDocs1 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs1); // 1st segment + int numDocs2 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs2); // 2nd segment + int numDocs3 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs3); // 3rd segment + int totalDocs = numDocs1 + numDocs2 + numDocs3; + + SearchSourceBuilder source = new SearchSourceBuilder(); + source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis")); + source.size(totalDocs); + SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value); + SearchHit[] hits = searchResponse.getHits().getHits(); + assertEquals(totalDocs, hits.length); + + // Test that when we read data, segments come in the reverse order with a segment with the latest date first + long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg + long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg + long timestamp3 = Long.valueOf(hits[0 + numDocs3 + numDocs2].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg + assertTrue(timestamp1 > timestamp2); + assertTrue(timestamp2 > timestamp3); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java index 88697dde12b7c..4d3942ae2445c 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java @@ -60,8 +60,6 @@ import static org.hamcrest.Matchers.nullValue; public class MigrateToDataTiersIT extends ESRestTestCase { - private static final Logger logger = LogManager.getLogger(MigrateToDataTiersIT.class); - private String index; private String policy; private String alias; diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml new file mode 100644 index 0000000000000..a3e121abdddfe --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml @@ -0,0 +1,107 @@ +--- +"Test that datastream index segments are sorted on timestamp field desc after data stream migration": + - skip: + #TODO: adjust version after backport + version: " - 7.99.99" + reason: "sorting segments was added in 7.15" + features: allowed_warnings + + - do: + indices.put_index_template: + name: my-template + body: + index_patterns: [ my_ds ] + data_stream: { } + template: + settings: + number_of_shards: 1 + number_of_replicas: 0 + + - do: + indices.create: + index: test_index1 + body: + aliases: + my_ds: + is_write_index: true + + # 1st segment + - do: + index: + index: my_ds + body: { "foo": "bar1", "@timestamp": "2021-08-01" } + refresh: true + + # 2nd segment + - do: + index: + index: my_ds + body: { "foo": "bar2", "@timestamp": "2021-08-02" } + refresh: true + + # test that segments are sorted as indexed by @timestamp ASC + - do: + search: + index: my_ds + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-01"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-02"] } + + # migrate to data-stream + - do: + indices.migrate_to_data_stream: + name: my_ds + - is_true: acknowledged + + # test that segments are still sorted as indexed by @timestamp ASC + # as we don't reopen existing shards and index readers after migration + - do: + search: + index: my_ds + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-01"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-02"] } + + # rollover data stream to create new backing index + - do: + indices.rollover: + alias: "my_ds" + - match: { rolled_over: true } + # save the new backing index names for later use + - set: { new_index: idx0name } + + # 1st segment in the new backing index + - do: + index: + index: my_ds + body: { "foo": "bar3", "@timestamp": "2021-08-03" } + refresh: true + + # 2nd segment in the new backing index + - do: + index: + index: my_ds + body: { "foo": "bar4", "@timestamp": "2021-08-04" } + refresh: true + + + # test that segments are sorted by @timestamp DESC in the new backing index, + # as the newly created index and shard pick up the index leaf sorter + - do: + search: + index: $idx0name + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-04"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-03"] } + + + - do: + indices.delete_data_stream: + name: my_ds + - is_true: acknowledged