Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add segment sorter for data streams #75195

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5360987
Add default leaf sorter for data streams
mayya-sharipova Jul 9, 2021
97b9f72
Revert "Add default leaf sorter for data streams"
mayya-sharipova Jul 22, 2021
a9d44d9
Add segment sorter for data streams
mayya-sharipova Jul 26, 2021
8845d8d
Merge remote-tracking branch 'upstream/master' into datastreams-leaf-…
mayya-sharipova Aug 4, 2021
e73bb4c
IndexShard to use mappingLookup().isDataStreamTimestampFieldEnabled()
mayya-sharipova Aug 4, 2021
72fa961
Merge remote-tracking branch 'upstream/master' into datastreams-leaf-…
mayya-sharipova Aug 4, 2021
711a52b
Merge remote-tracking branch 'upstream/master' into datastreams-leaf-…
mayya-sharipova Aug 5, 2021
4365ed0
Check for datastream index only when MapperService is enabled
mayya-sharipova Aug 9, 2021
71befb8
Merge remote-tracking branch 'upstream/master' into datastreams-leaf-…
mayya-sharipova Aug 9, 2021
770a179
Temp
mayya-sharipova Aug 9, 2021
6a04a57
Throw an exception when datastream doesn't contain
mayya-sharipova Sep 2, 2021
730d40b
Merge remote-tracking branch 'upstream/master' into datastreams-leaf-…
mayya-sharipova Sep 2, 2021
8724cde
Remove temp debugging
mayya-sharipova Sep 2, 2021
fbb937b
Change IllegalArgument to IllegatState exception
mayya-sharipova Sep 2, 2021
77941bc
Return Long.MIN_VALUE for datastream sorter
mayya-sharipova Sep 2, 2021
4e0e2a2
Merge remote-tracking branch 'upstream/master' into datastreams-leaf-…
mayya-sharipova Sep 2, 2021
4c5bddc
Change assertion to if statement
mayya-sharipova Sep 3, 2021
6d8eefc
Merge remote-tracking branch 'upstream/master' into datastreams-leaf-…
mayya-sharipova Sep 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
*/
package org.elasticsearch.cluster.metadata;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PointValues;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.core.Nullable;
Expand All @@ -25,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand All @@ -36,6 +40,23 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To

public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
// Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
public static Comparator<LeafReader> DATASTREAM_LEAF_READERS_SORTER =
Comparator.comparingLong(
(LeafReader r) -> {
try {
PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD);
if (points != null) {
byte[] sortValue = points.getMaxPackedValue();
return LongPoint.decodeDimension(sortValue, 0);
}
} catch (IOException e) {
mayya-sharipova marked this conversation as resolved.
Show resolved Hide resolved
assert false : "Datastream index segment doesn't contain an expected " +
DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + " field!";
}
// this should not happen, as all data stream segments must contain @timestamp field
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have an assert here too ?

return Long.MAX_VALUE; })
.reversed();

private final LongSupplier timeProvider;
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -32,6 +33,7 @@
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -70,6 +72,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final Comparator<LeafReader> leafSorter;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -131,7 +134,8 @@ public EngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) {
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
Comparator<LeafReader> leafSorter) {
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -169,6 +173,7 @@ public EngineConfig(
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier;
this.snapshotCommitSupplier = snapshotCommitSupplier;
this.leafSorter = leafSorter;
}

/**
Expand Down Expand Up @@ -353,4 +358,12 @@ public LongSupplier getPrimaryTermSupplier() {
public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() {
return snapshotCommitSupplier;
}

/**
* Returns how segments should be sorted for reading or @null if no sorting should be applied.
*/
@Nullable
public Comparator<LeafReader> getLeafSorter() {
return leafSorter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,11 @@ private IndexWriterConfig getIndexWriterConfig() {
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
}
// Provide a custom leaf sorter, so that index readers opened from this writer
// will have its leaves sorted according the given leaf sorter.
if (engineConfig.getLeafSorter() != null) {
iwc.setLeafSorter(engineConfig.getLeafSorter());
}
return iwc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader,
}

protected DirectoryReader open(IndexCommit commit) throws IOException {
// TODO: provide engineConfig.getLeafSorter() when opening a DirectoryReader from a commit
// should be available from Lucene v 8.10
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
if (lazilyLoadSoftDeletes) {
return new LazySoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

Expand Down Expand Up @@ -283,6 +284,7 @@ Runnable getGlobalCheckpointSyncer() {
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final boolean isDataStreamIndex; // if a shard is a part of data stream

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -387,6 +389,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled();
mayya-sharipova marked this conversation as resolved.
Show resolved Hide resolved
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -2912,7 +2915,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
this::getOperationPrimaryTerm,
snapshotCommitSupplier);
snapshotCommitSupplier,
isDataStreamIndex ? DATASTREAM_LEAF_READERS_SORTER : null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2945,7 +2945,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
() -> UNASSIGNED_SEQ_NO,
() -> RetentionLeases.EMPTY,
primaryTerm::get,
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
null);
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));

engine = createEngine(store, primaryTranslogDir); // and recover again!
Expand Down Expand Up @@ -6018,7 +6019,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
try (InternalEngine engine = createEngine(configWithWarmer)) {
assertThat(warmedUpReaders, empty());
assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3925,7 +3925,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception {
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, config.getLeafSorter());
return new InternalEngine(configWithWarmer);
});
Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
() -> SequenceNumbers.NO_OPS_PERFORMED,
() -> RetentionLeases.EMPTY,
() -> primaryTerm,
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
null);
engine = new InternalEngine(config);
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

ThreadPoolStats.Stats getRefreshThreadPoolStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
Expand All @@ -248,7 +248,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
Expand All @@ -258,7 +258,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

@Override
Expand Down Expand Up @@ -669,7 +669,8 @@ public EngineConfig config(
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTerm,
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
null);
}

protected EngineConfig config(EngineConfig config, Store store, Path translogPath) {
Expand All @@ -683,7 +684,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat
translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(),
config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(),
config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
}

protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ public void onFailedEngine(String reason, Exception e) {
globalCheckpoint::longValue,
() -> RetentionLeases.EMPTY,
() -> primaryTerm.get(),
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
null);
}

private static Store createStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
Expand Down Expand Up @@ -1213,10 +1216,6 @@ public void testGetDataStream() throws Exception {
assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue()));
}

private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) {
assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date"));
}

private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping, Map<?, ?> expectedMapping) {
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet();
assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue());
Expand Down Expand Up @@ -1481,6 +1480,42 @@ public void testMultiThreadedRollover() throws Exception {
);
}

// Test that datastream's segments by default are sorted on @timestamp desc
public void testSegmentsSortedOnTimestampDesc() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null);
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo");
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();

// We index data in the increasing order of @timestamp field
int numDocs1 = randomIntBetween(2, 10);
indexDocs("metrics-foo", numDocs1); // 1st segment
int numDocs2 = randomIntBetween(2, 10);
indexDocs("metrics-foo", numDocs2); // 2nd segment
int numDocs3 = randomIntBetween(2, 10);
indexDocs("metrics-foo", numDocs3); // 3rd segment
int totalDocs = numDocs1 + numDocs2 + numDocs3;

SearchSourceBuilder source = new SearchSourceBuilder();
source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis"));
source.size(totalDocs);
SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
assertEquals(totalDocs, hits.length);

// Test that when we read data, segments come in the reverse order with a segment with the latest date first
long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg
long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg
long timestamp3 = Long.valueOf(hits[0 + numDocs3 + numDocs2].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg
assertTrue(timestamp1 > timestamp2);
assertTrue(timestamp2 > timestamp3);
}

private static void verifyResolvability(String dataStream, ActionRequestBuilder<?, ?> requestBuilder, boolean fail) {
verifyResolvability(dataStream, requestBuilder, fail, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
import static org.hamcrest.Matchers.nullValue;

public class MigrateToDataTiersIT extends ESRestTestCase {
private static final Logger logger = LogManager.getLogger(MigrateToDataTiersIT.class);

private String index;
private String policy;
private String alias;
Expand Down
Loading