Skip to content

Commit

Permalink
Add segment sorter for data streams (#75195)
Browse files Browse the repository at this point in the history
It is beneficial to sort segments within a datastream's index
by desc order of their max timestamp field, so
that the most recent (in terms of timestamp) segments
will be first.

This allows to speed up sort query on @timestamp desc field,
which is the most common type of query for datastreams,
as we are mostly concerned with the recent data.
This patch addressed this for writable indices.

Segments' sorter is different from index sorting.
An index sorter by itself is  concerned about the order of docs
within an individual segment (and not how the segments are organized),
while the segment sorter is only used during search and allows
to start docs collection with the "right" segment,
so we can terminate the collection faster.

This PR adds a property to IndexShard `isDataStreamIndex` that
shows if a shard is a part of datastream.
  • Loading branch information
mayya-sharipova authored Sep 3, 2021
1 parent 574d643 commit f18b9d5
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
*/
package org.elasticsearch.cluster.metadata;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PointValues;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.core.Nullable;
Expand All @@ -25,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand All @@ -36,6 +40,25 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To

public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
// Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
public static Comparator<LeafReader> DATASTREAM_LEAF_READERS_SORTER =
Comparator.comparingLong(
(LeafReader r) -> {
try {
PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD);
if (points != null) {
byte[] sortValue = points.getMaxPackedValue();
return LongPoint.decodeDimension(sortValue, 0);
} else if (r.numDocs() == 0) {
// points can be null if the segment contains only deleted documents
return Long.MIN_VALUE;
}
} catch (IOException e) {
}
throw new IllegalStateException("Can't access [" +
DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field for the data stream!");
})
.reversed();

private final LongSupplier timeProvider;
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -32,6 +33,7 @@
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -70,6 +72,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final Comparator<LeafReader> leafSorter;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -131,7 +134,8 @@ public EngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) {
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
Comparator<LeafReader> leafSorter) {
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -169,6 +173,7 @@ public EngineConfig(
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier;
this.snapshotCommitSupplier = snapshotCommitSupplier;
this.leafSorter = leafSorter;
}

/**
Expand Down Expand Up @@ -353,4 +358,12 @@ public LongSupplier getPrimaryTermSupplier() {
public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() {
return snapshotCommitSupplier;
}

/**
* Returns how segments should be sorted for reading or @null if no sorting should be applied.
*/
@Nullable
public Comparator<LeafReader> getLeafSorter() {
return leafSorter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,11 @@ private IndexWriterConfig getIndexWriterConfig() {
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
}
// Provide a custom leaf sorter, so that index readers opened from this writer
// will have its leaves sorted according the given leaf sorter.
if (engineConfig.getLeafSorter() != null) {
iwc.setLeafSorter(engineConfig.getLeafSorter());
}
return iwc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader,
}

protected DirectoryReader open(IndexCommit commit) throws IOException {
// TODO: provide engineConfig.getLeafSorter() when opening a DirectoryReader from a commit
// should be available from Lucene v 8.10
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
if (lazilyLoadSoftDeletes) {
return new LazySoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.index.shard;

import com.carrotsearch.hppc.ObjectLongMap;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.analysis.Analyzer;
Expand Down Expand Up @@ -185,6 +186,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

Expand Down Expand Up @@ -283,6 +285,7 @@ Runnable getGlobalCheckpointSyncer() {
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final boolean isDataStreamIndex; // if a shard is a part of data stream

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -387,6 +390,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled();
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -2912,7 +2916,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
this::getOperationPrimaryTerm,
snapshotCommitSupplier);
snapshotCommitSupplier,
isDataStreamIndex ? DATASTREAM_LEAF_READERS_SORTER : null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2948,7 +2948,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
() -> UNASSIGNED_SEQ_NO,
() -> RetentionLeases.EMPTY,
primaryTerm::get,
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
null);
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));

engine = createEngine(store, primaryTranslogDir); // and recover again!
Expand Down Expand Up @@ -6021,7 +6022,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
try (InternalEngine engine = createEngine(configWithWarmer)) {
assertThat(warmedUpReaders, empty());
assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3928,7 +3928,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception {
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, config.getLeafSorter());
return new InternalEngine(configWithWarmer);
});
Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
() -> SequenceNumbers.NO_OPS_PERFORMED,
() -> RetentionLeases.EMPTY,
() -> primaryTerm,
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
null);
engine = new InternalEngine(config);
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

ThreadPoolStats.Stats getRefreshThreadPoolStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
Expand All @@ -248,7 +248,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
Expand All @@ -258,7 +258,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

@Override
Expand Down Expand Up @@ -669,7 +669,8 @@ public EngineConfig config(
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTerm,
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
null);
}

protected EngineConfig config(EngineConfig config, Store store, Path translogPath) {
Expand All @@ -683,7 +684,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat
translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(),
config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(),
config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ public void onFailedEngine(String reason, Exception e) {
globalCheckpoint::longValue,
() -> RetentionLeases.EMPTY,
() -> primaryTerm.get(),
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
null);
}

private static Store createStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
Expand Down Expand Up @@ -1213,10 +1216,6 @@ public void testGetDataStream() throws Exception {
assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue()));
}

private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) {
assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date"));
}

private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping, Map<?, ?> expectedMapping) {
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet();
assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue());
Expand Down Expand Up @@ -1481,6 +1480,42 @@ public void testMultiThreadedRollover() throws Exception {
);
}

// Test that datastream's segments by default are sorted on @timestamp desc
public void testSegmentsSortedOnTimestampDesc() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null);
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo");
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();

// We index data in the increasing order of @timestamp field
int numDocs1 = randomIntBetween(2, 10);
indexDocs("metrics-foo", numDocs1); // 1st segment
int numDocs2 = randomIntBetween(2, 10);
indexDocs("metrics-foo", numDocs2); // 2nd segment
int numDocs3 = randomIntBetween(2, 10);
indexDocs("metrics-foo", numDocs3); // 3rd segment
int totalDocs = numDocs1 + numDocs2 + numDocs3;

SearchSourceBuilder source = new SearchSourceBuilder();
source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis"));
source.size(totalDocs);
SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
assertEquals(totalDocs, hits.length);

// Test that when we read data, segments come in the reverse order with a segment with the latest date first
long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg
long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg
long timestamp3 = Long.valueOf(hits[0 + numDocs3 + numDocs2].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg
assertTrue(timestamp1 > timestamp2);
assertTrue(timestamp2 > timestamp3);
}

private static void verifyResolvability(String dataStream, ActionRequestBuilder<?, ?> requestBuilder, boolean fail) {
verifyResolvability(dataStream, requestBuilder, fail, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
import static org.hamcrest.Matchers.nullValue;

public class MigrateToDataTiersIT extends ESRestTestCase {
private static final Logger logger = LogManager.getLogger(MigrateToDataTiersIT.class);

private String index;
private String policy;
private String alias;
Expand Down
Loading

0 comments on commit f18b9d5

Please sign in to comment.