From 53609871addeb9a7918192bb58debcabc561730f Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Fri, 9 Jul 2021 15:07:36 -0400 Subject: [PATCH 01/11] Add default leaf sorter for data streams It is beneficial to add leaf sorter for data streams by desc order of their max timestamp field, so that the most recent (in terms of timestamp) segments will be first. This allows to speed up sort query on @timestamp desc field, which is the most common type of query for datastreams, as we are mostly concerned with the recent data. This patch addressed for writable indices. This path also adds a new flag isDataStreamIndex to IndexMetadata that shows if this index is a part of datastream or not. TODO: - Should we also to this for frozen or readonly indices? --- .../indices/IndexingMemoryControllerIT.java | 2 +- .../cluster/metadata/DataStream.java | 19 +++++++ .../cluster/metadata/IndexMetadata.java | 53 +++++++++++++++++-- .../metadata/MetadataCreateIndexService.java | 6 ++- .../SystemIndexMetadataUpgradeService.java | 8 ++- .../index/engine/EngineConfig.java | 15 +++++- .../index/engine/InternalEngine.java | 5 ++ .../elasticsearch/index/shard/IndexShard.java | 14 +++-- .../reroute/ClusterRerouteResponseTests.java | 6 ++- .../cluster/ClusterStateTests.java | 12 +++-- .../cluster/metadata/IndexMetadataTests.java | 5 ++ .../MetadataCreateIndexServiceTests.java | 2 +- .../metadata/ToAndFromJsonMetadataTests.java | 10 ++-- .../index/engine/InternalEngineTests.java | 5 +- .../index/shard/IndexShardTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 3 +- .../IndexingMemoryControllerTests.java | 2 +- .../metadata/DataStreamTestHelper.java | 6 ++- .../index/engine/EngineTestCase.java | 11 ++-- .../index/engine/FollowingEngineTests.java | 3 +- .../datastreams/DataStreamIT.java | 39 ++++++++++++++ 21 files changed, 191 insertions(+), 37 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 19dc61ef36049..d8ee45cfabb0c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), null); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index d176f0cc4ef2f..248ccb5c73c75 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -7,6 +7,9 @@ */ package org.elasticsearch.cluster.metadata; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.PointValues; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.core.Nullable; @@ -25,6 +28,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -36,6 +40,21 @@ public final class DataStream extends AbstractDiffable implements To public static final String BACKING_INDEX_PREFIX = ".ds-"; public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); + // Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations + public static Comparator DATASTREAM_LEAF_READERS_SORTER = + Comparator.comparingLong( + (LeafReader r) -> { + try { + PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + if (points != null) { + byte[] sortValue = points.getMaxPackedValue(); + return LongPoint.decodeDimension(sortValue, 0); + } + } catch (IOException e) { + } + // this should not happen, as all data stream segments must contain timestamp field + return Long.MAX_VALUE; }) + .reversed(); private final LongSupplier timeProvider; private final String name; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 1dd640d0a8d35..a6534dcde66a2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -339,11 +339,13 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; static final String KEY_TIMESTAMP_RANGE = "timestamp_range"; + static final String KEY_DATASTREAM_INDEX = "datastream_index"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; static final Version SYSTEM_INDEX_FLAG_ADDED = Version.V_7_10_0; + static final Version DATASTREAM_INDEX_FLAG_ADDED = Version.CURRENT; //TODO: correct to 7.15 private final int routingNumShards; private final int routingFactor; @@ -387,6 +389,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final ActiveShardCount waitForActiveShards; private final ImmutableOpenMap rolloverInfos; private final boolean isSystem; + private final boolean isDataStreamIndex; private final IndexLongFieldRange timestampRange; @@ -415,7 +418,8 @@ private IndexMetadata( final ActiveShardCount waitForActiveShards, final ImmutableOpenMap rolloverInfos, final boolean isSystem, - final IndexLongFieldRange timestampRange) { + final IndexLongFieldRange timestampRange, + final boolean isDataStreamIndex) { this.index = index; this.version = version; @@ -448,6 +452,7 @@ private IndexMetadata( this.rolloverInfos = rolloverInfos; this.isSystem = isSystem; this.timestampRange = timestampRange; + this.isDataStreamIndex = isDataStreamIndex; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -674,6 +679,9 @@ public boolean equals(Object o) { if (isSystem != that.isSystem) { return false; } + if (isDataStreamIndex != that.isDataStreamIndex) { + return false; + } return true; } @@ -692,6 +700,7 @@ public int hashCode() { result = 31 * result + inSyncAllocationIds.hashCode(); result = 31 * result + rolloverInfos.hashCode(); result = 31 * result + Boolean.hashCode(isSystem); + result = 31 * result + Boolean.hashCode(isDataStreamIndex); return result; } @@ -733,6 +742,7 @@ private static class IndexMetadataDiff implements Diff { private final Diff> rolloverInfos; private final boolean isSystem; private final IndexLongFieldRange timestampRange; + private final boolean isDataStreamIndex; IndexMetadataDiff(IndexMetadata before, IndexMetadata after) { index = after.index.getName(); @@ -751,6 +761,7 @@ private static class IndexMetadataDiff implements Diff { DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance()); rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer()); isSystem = after.isSystem; + isDataStreamIndex = after.isDataStreamIndex; timestampRange = after.timestampRange; } @@ -790,6 +801,11 @@ private static class IndexMetadataDiff implements Diff { isSystem = false; } timestampRange = IndexLongFieldRange.readFrom(in); + if (in.getVersion().onOrAfter(DATASTREAM_INDEX_FLAG_ADDED)) { + isDataStreamIndex = in.readBoolean(); + } else { + isDataStreamIndex = false; + } } @Override @@ -814,6 +830,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(isSystem); } timestampRange.writeTo(out); + if (out.getVersion().onOrAfter(DATASTREAM_INDEX_FLAG_ADDED)) { + out.writeBoolean(isDataStreamIndex); + } } @Override @@ -833,6 +852,7 @@ public IndexMetadata apply(IndexMetadata part) { builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds)); builder.rolloverInfos.putAll(rolloverInfos.apply(part.rolloverInfos)); builder.system(isSystem); + builder.dataStreamIndex(isDataStreamIndex); builder.timestampRange(timestampRange); return builder.build(); } @@ -880,6 +900,9 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException { builder.system(in.readBoolean()); } builder.timestampRange(IndexLongFieldRange.readFrom(in)); + if (in.getVersion().onOrAfter(DATASTREAM_INDEX_FLAG_ADDED)) { + builder.dataStreamIndex(in.readBoolean()); + } return builder.build(); } @@ -922,12 +945,19 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(isSystem); } timestampRange.writeTo(out); + if (out.getVersion().onOrAfter(DATASTREAM_INDEX_FLAG_ADDED)) { + out.writeBoolean(isDataStreamIndex); + } } public boolean isSystem() { return isSystem; } + public boolean isDataStreamIndex() { + return isDataStreamIndex; + } + public static Builder builder(String index) { return new Builder(index); } @@ -954,6 +984,7 @@ public static class Builder { private Integer routingNumShards; private boolean isSystem; private IndexLongFieldRange timestampRange = IndexLongFieldRange.NO_SHARDS; + private boolean isDataStreamIndex; public Builder(String index) { this.index = index; @@ -963,6 +994,7 @@ public Builder(String index) { this.inSyncAllocationIds = ImmutableOpenIntMap.builder(); this.rolloverInfos = ImmutableOpenMap.builder(); this.isSystem = false; + this.isDataStreamIndex = false; } public Builder(IndexMetadata indexMetadata) { @@ -982,6 +1014,7 @@ public Builder(IndexMetadata indexMetadata) { this.rolloverInfos = ImmutableOpenMap.builder(indexMetadata.rolloverInfos); this.isSystem = indexMetadata.isSystem; this.timestampRange = indexMetadata.timestampRange; + this.isDataStreamIndex = indexMetadata.isDataStreamIndex; } public Builder index(String index) { @@ -1203,6 +1236,16 @@ public IndexLongFieldRange getTimestampRange() { return timestampRange; } + // Sets if this index is a part of a datastream + public Builder dataStreamIndex(boolean isDataStreamIndex) { + this.isDataStreamIndex = isDataStreamIndex; + return this; + } + + public boolean isDataStreamIndex() { + return isDataStreamIndex; + } + public IndexMetadata build() { ImmutableOpenMap.Builder tmpAliases = aliases; Settings tmpSettings = settings; @@ -1307,7 +1350,8 @@ public IndexMetadata build() { waitForActiveShards, rolloverInfos.build(), isSystem, - timestampRange); + timestampRange, + isDataStreamIndex); } public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException { @@ -1408,11 +1452,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build } builder.endObject(); builder.field(KEY_SYSTEM, indexMetadata.isSystem); - builder.startObject(KEY_TIMESTAMP_RANGE); indexMetadata.timestampRange.toXContent(builder, params); builder.endObject(); - + builder.field(KEY_DATASTREAM_INDEX, indexMetadata.isDataStreamIndex); builder.endObject(); } @@ -1536,6 +1579,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti builder.setRoutingNumShards(parser.intValue()); } else if (KEY_SYSTEM.equals(currentFieldName)) { builder.system(parser.booleanValue()); + } else if (KEY_DATASTREAM_INDEX.equals(currentFieldName)) { + builder.dataStreamIndex(parser.booleanValue()); } else { throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 66abae64a7a6d..e8aaeea81feb3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -422,7 +422,7 @@ private ClusterState applyCreateIndexWithTemporaryService(final ClusterState cur try { indexMetadata = buildIndexMetadata(request.index(), aliases, indexService.mapperService()::documentMapper, temporaryIndexMeta.getSettings(), temporaryIndexMeta.getRoutingNumShards(), sourceMetadata, - temporaryIndexMeta.isSystem()); + temporaryIndexMeta.isSystem(), temporaryIndexMeta.isDataStreamIndex()); } catch (Exception e) { logger.info("failed to build index metadata [{}]", request.index()); throw e; @@ -457,6 +457,7 @@ private IndexMetadata buildAndValidateTemporaryIndexMetadata(final Settings aggr tmpImdBuilder.setRoutingNumShards(routingNumShards); tmpImdBuilder.settings(indexSettings); tmpImdBuilder.system(isSystem); + tmpImdBuilder.dataStreamIndex(request.dataStreamName() != null); // Set up everything, now locally create the index to see that things are ok, and apply IndexMetadata tempMetadata = tmpImdBuilder.build(); @@ -914,9 +915,10 @@ static ClusterState clusterStateCreateIndex(ClusterState currentState, Set aliases, Supplier documentMapperSupplier, Settings indexSettings, int routingNumShards, - @Nullable IndexMetadata sourceMetadata, boolean isSystem) { + @Nullable IndexMetadata sourceMetadata, boolean isSystem, boolean isDataStreamIndex) { IndexMetadata.Builder indexMetadataBuilder = createIndexMetadataBuilder(indexName, sourceMetadata, indexSettings, routingNumShards); indexMetadataBuilder.system(isSystem); + indexMetadataBuilder.dataStreamIndex(isDataStreamIndex); // now, update the mappings with the actual source Map mappingsMetadata = new HashMap<>(); DocumentMapper mapper = documentMapperSupplier.get(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java index 4fd926218f1c3..8d8995bb518b1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java @@ -82,8 +82,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { final List updatedMetadata = new ArrayList<>(); for (ObjectObjectCursor cursor : indexMetadataMap) { if (cursor.value != lastIndexMetadataMap.get(cursor.key)) { - final boolean isSystem = systemIndices.isSystemIndex(cursor.value.getIndex()) || - systemIndices.isSystemIndexBackingDataStream(cursor.value.getIndex().getName()); + final boolean isSystemDataStream = systemIndices.isSystemIndexBackingDataStream(cursor.value.getIndex().getName()); + final boolean isSystem = systemIndices.isSystemIndex(cursor.value.getIndex()) || isSystemDataStream; IndexMetadata.Builder builder = IndexMetadata.builder(cursor.value); boolean updated = false; if (isSystem != cursor.value.isSystem()) { @@ -96,6 +96,10 @@ public ClusterState execute(ClusterState currentState) throws Exception { .put(IndexMetadata.SETTING_INDEX_HIDDEN, false)); updated = true; } + if (isSystemDataStream != cursor.value.isDataStreamIndex()) { + builder.dataStreamIndex(isSystemDataStream); + updated = true; + } if (updated) { updatedMetadata.add(builder.build()); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 9b7ba7986167b..1714dad668e2c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -9,6 +9,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; @@ -32,6 +33,7 @@ import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -70,6 +72,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; + private final Comparator leafSorter; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -131,7 +134,8 @@ public EngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) { + IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, + Comparator leafSorter) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -169,6 +173,7 @@ public EngineConfig( this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.snapshotCommitSupplier = snapshotCommitSupplier; + this.leafSorter = leafSorter; } /** @@ -353,4 +358,12 @@ public LongSupplier getPrimaryTermSupplier() { public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() { return snapshotCommitSupplier; } + + /** + * Returns how segments should be sorted for reading or @null if no sorting should be applied. + */ + @Nullable + public Comparator getLeafSorter() { + return leafSorter; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3e2ca3f905e88..2c11305fe279e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2182,6 +2182,11 @@ private IndexWriterConfig getIndexWriterConfig() { if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); } + // Provide a custom leaf sorter, so that index readers opened from this writer + // will have its leaves sorted according the given leaf sorter. + if (engineConfig.getLeafSorter() != null) { + iwc.setLeafSorter(engineConfig.getLeafSorter()); + } return iwc; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 71ea3f1b103d4..60d0054141a58 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -185,6 +185,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -440,6 +441,14 @@ public boolean isSystem() { return indexSettings.getIndexMetadata().isSystem(); } + /** + * Returns if this index shard for an index that is part of a datastream + * @return if this index part of datastream or not. + */ + public boolean isDataStreamIndex() { + return indexSettings.getIndexMetadata().isDataStreamIndex(); + } + /** * USE THIS METHOD WITH CARE! * Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about @@ -1776,8 +1785,6 @@ public ShardLongFieldRange getTimestampRange() { if (mappedFieldType instanceof DateFieldMapper.DateFieldType == false) { return ShardLongFieldRange.UNKNOWN; // field missing or not a date } - final DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) mappedFieldType; - final ShardLongFieldRange rawTimestampFieldRange; try { rawTimestampFieldRange = getEngine().getRawFieldRange(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); @@ -2912,7 +2919,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { globalCheckpointSupplier, replicationTracker::getRetentionLeases, this::getOperationPrimaryTerm, - snapshotCommitSupplier); + snapshotCommitSupplier, + isDataStreamIndex() ? DATASTREAM_LEAF_READERS_SORTER : null); } /** diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java index a5a90501a0cb3..1c98761a24632 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java @@ -128,7 +128,8 @@ public void testToXContent() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -229,7 +230,8 @@ public void testToXContent() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 13184246d0da3..18ffede686252 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -249,7 +249,8 @@ public void testToXContent() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -448,7 +449,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -654,7 +656,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -784,7 +787,8 @@ public void testToXContentSameTypeName() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java index 727b43e4cbdce..cf3010c9b309d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java @@ -66,6 +66,7 @@ public void testIndexMetadataSerialization() throws IOException { Integer numShard = randomFrom(1, 2, 4, 8, 16); int numberOfReplicas = randomIntBetween(0, 10); final boolean system = randomBoolean(); + final boolean dataStreamIndex = randomBoolean(); Map customMap = new HashMap<>(); customMap.put(randomAlphaOfLength(5), randomAlphaOfLength(10)); customMap.put(randomAlphaOfLength(10), randomAlphaOfLength(15)); @@ -79,6 +80,7 @@ public void testIndexMetadataSerialization() throws IOException { .primaryTerm(0, 2) .setRoutingNumShards(32) .system(system) + .dataStreamIndex(dataStreamIndex) .putCustom("my_custom", customMap) .putRolloverInfo( new RolloverInfo(randomAlphaOfLength(5), @@ -90,6 +92,7 @@ public void testIndexMetadataSerialization() throws IOException { ), randomNonNegativeLong())).build(); assertEquals(system, metadata.isSystem()); + assertEquals(dataStreamIndex, metadata.isDataStreamIndex()); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -109,6 +112,7 @@ public void testIndexMetadataSerialization() throws IOException { assertEquals(metadata.getRoutingFactor(), fromXContentMeta.getRoutingFactor()); assertEquals(metadata.primaryTerm(0), fromXContentMeta.primaryTerm(0)); assertEquals(metadata.isSystem(), fromXContentMeta.isSystem()); + assertEquals(metadata.isDataStreamIndex(), fromXContentMeta.isDataStreamIndex()); ImmutableOpenMap.Builder expectedCustomBuilder = ImmutableOpenMap.builder(); expectedCustomBuilder.put("my_custom", new DiffableStringMap(customMap)); ImmutableOpenMap expectedCustom = expectedCustomBuilder.build(); @@ -133,6 +137,7 @@ public void testIndexMetadataSerialization() throws IOException { assertEquals(deserialized.getCustomData(), expectedCustom); assertEquals(metadata.getCustomData(), deserialized.getCustomData()); assertEquals(metadata.isSystem(), deserialized.isSystem()); + assertEquals(metadata.isDataStreamIndex(), deserialized.isDataStreamIndex()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 5da822595a037..9f3656f3de887 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -874,7 +874,7 @@ public void testBuildIndexMetadata() { .put(SETTING_NUMBER_OF_SHARDS, 1) .build(); List aliases = List.of(AliasMetadata.builder("alias1").build()); - IndexMetadata indexMetadata = buildIndexMetadata("test", aliases, () -> null, indexSettings, 4, sourceIndexMetadata, false); + IndexMetadata indexMetadata = buildIndexMetadata("test", aliases, () -> null, indexSettings, 4, sourceIndexMetadata, false, false); assertThat(indexMetadata.getAliases().size(), is(1)); assertThat(indexMetadata.getAliases().keys().iterator().next().value, is("alias1")); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java index 505c77a1217eb..19f2565f64ebf 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java @@ -152,7 +152,6 @@ public void testSimpleJsonFromAndTo() throws IOException { } private static final String MAPPING_SOURCE1 = "{\"mapping1\":{\"text1\":{\"type\":\"string\"}}}"; - private static final String MAPPING_SOURCE2 = "{\"mapping2\":{\"text2\":{\"type\":\"string\"}}}"; private static final String ALIAS_FILTER1 = "{\"field1\":\"value1\"}"; private static final String ALIAS_FILTER2 = "{\"field2\":\"value2\"}"; @@ -289,7 +288,8 @@ public void testToXContentAPI_SameTypeName() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -449,7 +449,8 @@ public void testToXContentAPI_FlatSettingTrue_ReduceMappingFalse() throws IOExce " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -554,7 +555,8 @@ public void testToXContentAPI_FlatSettingFalse_ReduceMappingTrue() throws IOExce " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " }\n" + + " },\n" + + " \"datastream_index\" : false\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7d65ca3d0d16c..135cb6522058d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2946,7 +2946,8 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! @@ -6020,7 +6021,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), null); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d39a7261378ce..8c570db4a5ac1 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4093,7 +4093,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, null); return new InternalEngine(configWithWarmer); }); Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index a59e2defec763..bb022ddad0a41 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -139,7 +139,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index de7073d3cc15b..9ca05f4aaa4d6 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), null); } ThreadPoolStats.Stats getRefreshThreadPoolStats() { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 3ea4671316f79..47c5caaebd0f0 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -79,14 +79,16 @@ public static IndexMetadata.Builder createBackingIndex(String dataStreamName, in return IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, generation, epochMillis)) .settings(SETTINGS) .numberOfShards(NUMBER_OF_SHARDS) - .numberOfReplicas(NUMBER_OF_REPLICAS); + .numberOfReplicas(NUMBER_OF_REPLICAS) + .dataStreamIndex(true); } public static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index) { return IndexMetadata.builder(index.getName()) .settings(Settings.builder().put(SETTINGS.build()).put(SETTING_INDEX_UUID, index.getUUID())) .numberOfShards(NUMBER_OF_SHARDS) - .numberOfReplicas(NUMBER_OF_REPLICAS); + .numberOfReplicas(NUMBER_OF_REPLICAS) + .dataStreamIndex(true); } public static DataStream.TimestampField createTimestampField(String fieldName) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f061b12e790d3..edb9547e73a23 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -239,7 +239,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -249,7 +249,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -259,7 +259,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override @@ -672,7 +672,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath) { @@ -686,7 +687,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), null); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index e0d43cab06cbb..0bff20277e67b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -270,7 +270,8 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } private static Store createStore( diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 0189ff997b32a..0fcf027329aac 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -62,6 +62,9 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; @@ -1367,6 +1370,42 @@ public void testMultiThreadedRollover() throws Exception { ); } + // Test that datastream's segments by default are sorted on @timestamp desc + public void testReaderOnTimestampDesc() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + // We index data in the increasing order of @timestamp field + int numDocs1 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs1); // 1st segment + int numDocs2 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs2); // 2nd segment + int numDocs3 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs3); // 3rd segment + int totalDocs = numDocs1 + numDocs2 + numDocs3; + + SearchSourceBuilder source = new SearchSourceBuilder(); + source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis")); + source.size(totalDocs); + SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value); + SearchHit[] hits = searchResponse.getHits().getHits(); + assertEquals(totalDocs, hits.length); + + // Test that when we read data, segments come in the reverse order with a segment with the latest date first + long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg + long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg + long timestamp3 = Long.valueOf(hits[0 + numDocs2 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg + assertTrue(timestamp1 > timestamp2); + assertTrue(timestamp2 > timestamp3); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } From 97b9f72c079311c46b789db18708f6cd24d52a3f Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Thu, 22 Jul 2021 07:54:57 -0400 Subject: [PATCH 02/11] Revert "Add default leaf sorter for data streams" This reverts commit 53609871addeb9a7918192bb58debcabc561730f. As we want to dynamically specify in an index belongs to data stream. --- .../indices/IndexingMemoryControllerIT.java | 2 +- .../cluster/metadata/DataStream.java | 19 ------- .../cluster/metadata/IndexMetadata.java | 53 ++----------------- .../metadata/MetadataCreateIndexService.java | 6 +-- .../SystemIndexMetadataUpgradeService.java | 8 +-- .../index/engine/EngineConfig.java | 15 +----- .../index/engine/InternalEngine.java | 5 -- .../elasticsearch/index/shard/IndexShard.java | 14 ++--- .../reroute/ClusterRerouteResponseTests.java | 6 +-- .../cluster/ClusterStateTests.java | 12 ++--- .../cluster/metadata/IndexMetadataTests.java | 5 -- .../MetadataCreateIndexServiceTests.java | 2 +- .../metadata/ToAndFromJsonMetadataTests.java | 10 ++-- .../index/engine/InternalEngineTests.java | 5 +- .../index/shard/IndexShardTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 3 +- .../IndexingMemoryControllerTests.java | 2 +- .../metadata/DataStreamTestHelper.java | 6 +-- .../index/engine/EngineTestCase.java | 11 ++-- .../index/engine/FollowingEngineTests.java | 3 +- .../datastreams/DataStreamIT.java | 39 -------------- 21 files changed, 37 insertions(+), 191 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index d8ee45cfabb0c..19dc61ef36049 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), null); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 248ccb5c73c75..d176f0cc4ef2f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -7,9 +7,6 @@ */ package org.elasticsearch.cluster.metadata; -import org.apache.lucene.document.LongPoint; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.PointValues; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.core.Nullable; @@ -28,7 +25,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -40,21 +36,6 @@ public final class DataStream extends AbstractDiffable implements To public static final String BACKING_INDEX_PREFIX = ".ds-"; public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); - // Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations - public static Comparator DATASTREAM_LEAF_READERS_SORTER = - Comparator.comparingLong( - (LeafReader r) -> { - try { - PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); - if (points != null) { - byte[] sortValue = points.getMaxPackedValue(); - return LongPoint.decodeDimension(sortValue, 0); - } - } catch (IOException e) { - } - // this should not happen, as all data stream segments must contain timestamp field - return Long.MAX_VALUE; }) - .reversed(); private final LongSupplier timeProvider; private final String name; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index a6534dcde66a2..1dd640d0a8d35 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -339,13 +339,11 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; static final String KEY_TIMESTAMP_RANGE = "timestamp_range"; - static final String KEY_DATASTREAM_INDEX = "datastream_index"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; static final Version SYSTEM_INDEX_FLAG_ADDED = Version.V_7_10_0; - static final Version DATASTREAM_INDEX_FLAG_ADDED = Version.CURRENT; //TODO: correct to 7.15 private final int routingNumShards; private final int routingFactor; @@ -389,7 +387,6 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final ActiveShardCount waitForActiveShards; private final ImmutableOpenMap rolloverInfos; private final boolean isSystem; - private final boolean isDataStreamIndex; private final IndexLongFieldRange timestampRange; @@ -418,8 +415,7 @@ private IndexMetadata( final ActiveShardCount waitForActiveShards, final ImmutableOpenMap rolloverInfos, final boolean isSystem, - final IndexLongFieldRange timestampRange, - final boolean isDataStreamIndex) { + final IndexLongFieldRange timestampRange) { this.index = index; this.version = version; @@ -452,7 +448,6 @@ private IndexMetadata( this.rolloverInfos = rolloverInfos; this.isSystem = isSystem; this.timestampRange = timestampRange; - this.isDataStreamIndex = isDataStreamIndex; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -679,9 +674,6 @@ public boolean equals(Object o) { if (isSystem != that.isSystem) { return false; } - if (isDataStreamIndex != that.isDataStreamIndex) { - return false; - } return true; } @@ -700,7 +692,6 @@ public int hashCode() { result = 31 * result + inSyncAllocationIds.hashCode(); result = 31 * result + rolloverInfos.hashCode(); result = 31 * result + Boolean.hashCode(isSystem); - result = 31 * result + Boolean.hashCode(isDataStreamIndex); return result; } @@ -742,7 +733,6 @@ private static class IndexMetadataDiff implements Diff { private final Diff> rolloverInfos; private final boolean isSystem; private final IndexLongFieldRange timestampRange; - private final boolean isDataStreamIndex; IndexMetadataDiff(IndexMetadata before, IndexMetadata after) { index = after.index.getName(); @@ -761,7 +751,6 @@ private static class IndexMetadataDiff implements Diff { DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance()); rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer()); isSystem = after.isSystem; - isDataStreamIndex = after.isDataStreamIndex; timestampRange = after.timestampRange; } @@ -801,11 +790,6 @@ private static class IndexMetadataDiff implements Diff { isSystem = false; } timestampRange = IndexLongFieldRange.readFrom(in); - if (in.getVersion().onOrAfter(DATASTREAM_INDEX_FLAG_ADDED)) { - isDataStreamIndex = in.readBoolean(); - } else { - isDataStreamIndex = false; - } } @Override @@ -830,9 +814,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(isSystem); } timestampRange.writeTo(out); - if (out.getVersion().onOrAfter(DATASTREAM_INDEX_FLAG_ADDED)) { - out.writeBoolean(isDataStreamIndex); - } } @Override @@ -852,7 +833,6 @@ public IndexMetadata apply(IndexMetadata part) { builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds)); builder.rolloverInfos.putAll(rolloverInfos.apply(part.rolloverInfos)); builder.system(isSystem); - builder.dataStreamIndex(isDataStreamIndex); builder.timestampRange(timestampRange); return builder.build(); } @@ -900,9 +880,6 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException { builder.system(in.readBoolean()); } builder.timestampRange(IndexLongFieldRange.readFrom(in)); - if (in.getVersion().onOrAfter(DATASTREAM_INDEX_FLAG_ADDED)) { - builder.dataStreamIndex(in.readBoolean()); - } return builder.build(); } @@ -945,19 +922,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(isSystem); } timestampRange.writeTo(out); - if (out.getVersion().onOrAfter(DATASTREAM_INDEX_FLAG_ADDED)) { - out.writeBoolean(isDataStreamIndex); - } } public boolean isSystem() { return isSystem; } - public boolean isDataStreamIndex() { - return isDataStreamIndex; - } - public static Builder builder(String index) { return new Builder(index); } @@ -984,7 +954,6 @@ public static class Builder { private Integer routingNumShards; private boolean isSystem; private IndexLongFieldRange timestampRange = IndexLongFieldRange.NO_SHARDS; - private boolean isDataStreamIndex; public Builder(String index) { this.index = index; @@ -994,7 +963,6 @@ public Builder(String index) { this.inSyncAllocationIds = ImmutableOpenIntMap.builder(); this.rolloverInfos = ImmutableOpenMap.builder(); this.isSystem = false; - this.isDataStreamIndex = false; } public Builder(IndexMetadata indexMetadata) { @@ -1014,7 +982,6 @@ public Builder(IndexMetadata indexMetadata) { this.rolloverInfos = ImmutableOpenMap.builder(indexMetadata.rolloverInfos); this.isSystem = indexMetadata.isSystem; this.timestampRange = indexMetadata.timestampRange; - this.isDataStreamIndex = indexMetadata.isDataStreamIndex; } public Builder index(String index) { @@ -1236,16 +1203,6 @@ public IndexLongFieldRange getTimestampRange() { return timestampRange; } - // Sets if this index is a part of a datastream - public Builder dataStreamIndex(boolean isDataStreamIndex) { - this.isDataStreamIndex = isDataStreamIndex; - return this; - } - - public boolean isDataStreamIndex() { - return isDataStreamIndex; - } - public IndexMetadata build() { ImmutableOpenMap.Builder tmpAliases = aliases; Settings tmpSettings = settings; @@ -1350,8 +1307,7 @@ public IndexMetadata build() { waitForActiveShards, rolloverInfos.build(), isSystem, - timestampRange, - isDataStreamIndex); + timestampRange); } public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException { @@ -1452,10 +1408,11 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build } builder.endObject(); builder.field(KEY_SYSTEM, indexMetadata.isSystem); + builder.startObject(KEY_TIMESTAMP_RANGE); indexMetadata.timestampRange.toXContent(builder, params); builder.endObject(); - builder.field(KEY_DATASTREAM_INDEX, indexMetadata.isDataStreamIndex); + builder.endObject(); } @@ -1579,8 +1536,6 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti builder.setRoutingNumShards(parser.intValue()); } else if (KEY_SYSTEM.equals(currentFieldName)) { builder.system(parser.booleanValue()); - } else if (KEY_DATASTREAM_INDEX.equals(currentFieldName)) { - builder.dataStreamIndex(parser.booleanValue()); } else { throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index e8aaeea81feb3..66abae64a7a6d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -422,7 +422,7 @@ private ClusterState applyCreateIndexWithTemporaryService(final ClusterState cur try { indexMetadata = buildIndexMetadata(request.index(), aliases, indexService.mapperService()::documentMapper, temporaryIndexMeta.getSettings(), temporaryIndexMeta.getRoutingNumShards(), sourceMetadata, - temporaryIndexMeta.isSystem(), temporaryIndexMeta.isDataStreamIndex()); + temporaryIndexMeta.isSystem()); } catch (Exception e) { logger.info("failed to build index metadata [{}]", request.index()); throw e; @@ -457,7 +457,6 @@ private IndexMetadata buildAndValidateTemporaryIndexMetadata(final Settings aggr tmpImdBuilder.setRoutingNumShards(routingNumShards); tmpImdBuilder.settings(indexSettings); tmpImdBuilder.system(isSystem); - tmpImdBuilder.dataStreamIndex(request.dataStreamName() != null); // Set up everything, now locally create the index to see that things are ok, and apply IndexMetadata tempMetadata = tmpImdBuilder.build(); @@ -915,10 +914,9 @@ static ClusterState clusterStateCreateIndex(ClusterState currentState, Set aliases, Supplier documentMapperSupplier, Settings indexSettings, int routingNumShards, - @Nullable IndexMetadata sourceMetadata, boolean isSystem, boolean isDataStreamIndex) { + @Nullable IndexMetadata sourceMetadata, boolean isSystem) { IndexMetadata.Builder indexMetadataBuilder = createIndexMetadataBuilder(indexName, sourceMetadata, indexSettings, routingNumShards); indexMetadataBuilder.system(isSystem); - indexMetadataBuilder.dataStreamIndex(isDataStreamIndex); // now, update the mappings with the actual source Map mappingsMetadata = new HashMap<>(); DocumentMapper mapper = documentMapperSupplier.get(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java index 8d8995bb518b1..4fd926218f1c3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java @@ -82,8 +82,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { final List updatedMetadata = new ArrayList<>(); for (ObjectObjectCursor cursor : indexMetadataMap) { if (cursor.value != lastIndexMetadataMap.get(cursor.key)) { - final boolean isSystemDataStream = systemIndices.isSystemIndexBackingDataStream(cursor.value.getIndex().getName()); - final boolean isSystem = systemIndices.isSystemIndex(cursor.value.getIndex()) || isSystemDataStream; + final boolean isSystem = systemIndices.isSystemIndex(cursor.value.getIndex()) || + systemIndices.isSystemIndexBackingDataStream(cursor.value.getIndex().getName()); IndexMetadata.Builder builder = IndexMetadata.builder(cursor.value); boolean updated = false; if (isSystem != cursor.value.isSystem()) { @@ -96,10 +96,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { .put(IndexMetadata.SETTING_INDEX_HIDDEN, false)); updated = true; } - if (isSystemDataStream != cursor.value.isDataStreamIndex()) { - builder.dataStreamIndex(isSystemDataStream); - updated = true; - } if (updated) { updatedMetadata.add(builder.build()); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 1714dad668e2c..9b7ba7986167b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -9,7 +9,6 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; -import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; @@ -33,7 +32,6 @@ import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -72,7 +70,6 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; - private final Comparator leafSorter; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -134,8 +131,7 @@ public EngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, - Comparator leafSorter) { + IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -173,7 +169,6 @@ public EngineConfig( this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.snapshotCommitSupplier = snapshotCommitSupplier; - this.leafSorter = leafSorter; } /** @@ -358,12 +353,4 @@ public LongSupplier getPrimaryTermSupplier() { public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() { return snapshotCommitSupplier; } - - /** - * Returns how segments should be sorted for reading or @null if no sorting should be applied. - */ - @Nullable - public Comparator getLeafSorter() { - return leafSorter; - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2c11305fe279e..3e2ca3f905e88 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2182,11 +2182,6 @@ private IndexWriterConfig getIndexWriterConfig() { if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); } - // Provide a custom leaf sorter, so that index readers opened from this writer - // will have its leaves sorted according the given leaf sorter. - if (engineConfig.getLeafSorter() != null) { - iwc.setLeafSorter(engineConfig.getLeafSorter()); - } return iwc; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 60d0054141a58..71ea3f1b103d4 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -185,7 +185,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -441,14 +440,6 @@ public boolean isSystem() { return indexSettings.getIndexMetadata().isSystem(); } - /** - * Returns if this index shard for an index that is part of a datastream - * @return if this index part of datastream or not. - */ - public boolean isDataStreamIndex() { - return indexSettings.getIndexMetadata().isDataStreamIndex(); - } - /** * USE THIS METHOD WITH CARE! * Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about @@ -1785,6 +1776,8 @@ public ShardLongFieldRange getTimestampRange() { if (mappedFieldType instanceof DateFieldMapper.DateFieldType == false) { return ShardLongFieldRange.UNKNOWN; // field missing or not a date } + final DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) mappedFieldType; + final ShardLongFieldRange rawTimestampFieldRange; try { rawTimestampFieldRange = getEngine().getRawFieldRange(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); @@ -2919,8 +2912,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { globalCheckpointSupplier, replicationTracker::getRetentionLeases, this::getOperationPrimaryTerm, - snapshotCommitSupplier, - isDataStreamIndex() ? DATASTREAM_LEAF_READERS_SORTER : null); + snapshotCommitSupplier); } /** diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java index 1c98761a24632..a5a90501a0cb3 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java @@ -128,8 +128,7 @@ public void testToXContent() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -230,8 +229,7 @@ public void testToXContent() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 18ffede686252..13184246d0da3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -249,8 +249,7 @@ public void testToXContent() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -449,8 +448,7 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -656,8 +654,7 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -787,8 +784,7 @@ public void testToXContentSameTypeName() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java index cf3010c9b309d..727b43e4cbdce 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java @@ -66,7 +66,6 @@ public void testIndexMetadataSerialization() throws IOException { Integer numShard = randomFrom(1, 2, 4, 8, 16); int numberOfReplicas = randomIntBetween(0, 10); final boolean system = randomBoolean(); - final boolean dataStreamIndex = randomBoolean(); Map customMap = new HashMap<>(); customMap.put(randomAlphaOfLength(5), randomAlphaOfLength(10)); customMap.put(randomAlphaOfLength(10), randomAlphaOfLength(15)); @@ -80,7 +79,6 @@ public void testIndexMetadataSerialization() throws IOException { .primaryTerm(0, 2) .setRoutingNumShards(32) .system(system) - .dataStreamIndex(dataStreamIndex) .putCustom("my_custom", customMap) .putRolloverInfo( new RolloverInfo(randomAlphaOfLength(5), @@ -92,7 +90,6 @@ public void testIndexMetadataSerialization() throws IOException { ), randomNonNegativeLong())).build(); assertEquals(system, metadata.isSystem()); - assertEquals(dataStreamIndex, metadata.isDataStreamIndex()); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -112,7 +109,6 @@ public void testIndexMetadataSerialization() throws IOException { assertEquals(metadata.getRoutingFactor(), fromXContentMeta.getRoutingFactor()); assertEquals(metadata.primaryTerm(0), fromXContentMeta.primaryTerm(0)); assertEquals(metadata.isSystem(), fromXContentMeta.isSystem()); - assertEquals(metadata.isDataStreamIndex(), fromXContentMeta.isDataStreamIndex()); ImmutableOpenMap.Builder expectedCustomBuilder = ImmutableOpenMap.builder(); expectedCustomBuilder.put("my_custom", new DiffableStringMap(customMap)); ImmutableOpenMap expectedCustom = expectedCustomBuilder.build(); @@ -137,7 +133,6 @@ public void testIndexMetadataSerialization() throws IOException { assertEquals(deserialized.getCustomData(), expectedCustom); assertEquals(metadata.getCustomData(), deserialized.getCustomData()); assertEquals(metadata.isSystem(), deserialized.isSystem()); - assertEquals(metadata.isDataStreamIndex(), deserialized.isDataStreamIndex()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 9f3656f3de887..5da822595a037 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -874,7 +874,7 @@ public void testBuildIndexMetadata() { .put(SETTING_NUMBER_OF_SHARDS, 1) .build(); List aliases = List.of(AliasMetadata.builder("alias1").build()); - IndexMetadata indexMetadata = buildIndexMetadata("test", aliases, () -> null, indexSettings, 4, sourceIndexMetadata, false, false); + IndexMetadata indexMetadata = buildIndexMetadata("test", aliases, () -> null, indexSettings, 4, sourceIndexMetadata, false); assertThat(indexMetadata.getAliases().size(), is(1)); assertThat(indexMetadata.getAliases().keys().iterator().next().value, is("alias1")); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java index 19f2565f64ebf..505c77a1217eb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java @@ -152,6 +152,7 @@ public void testSimpleJsonFromAndTo() throws IOException { } private static final String MAPPING_SOURCE1 = "{\"mapping1\":{\"text1\":{\"type\":\"string\"}}}"; + private static final String MAPPING_SOURCE2 = "{\"mapping2\":{\"text2\":{\"type\":\"string\"}}}"; private static final String ALIAS_FILTER1 = "{\"field1\":\"value1\"}"; private static final String ALIAS_FILTER2 = "{\"field2\":\"value2\"}"; @@ -288,8 +289,7 @@ public void testToXContentAPI_SameTypeName() throws IOException { " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -449,8 +449,7 @@ public void testToXContentAPI_FlatSettingTrue_ReduceMappingFalse() throws IOExce " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -555,8 +554,7 @@ public void testToXContentAPI_FlatSettingFalse_ReduceMappingTrue() throws IOExce " \"system\" : false,\n" + " \"timestamp_range\" : {\n" + " \"shards\" : [ ]\n" + - " },\n" + - " \"datastream_index\" : false\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 135cb6522058d..7d65ca3d0d16c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2946,8 +2946,7 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, - null); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! @@ -6021,7 +6020,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), null); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8c570db4a5ac1..d39a7261378ce 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4093,7 +4093,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, null); + config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); return new InternalEngine(configWithWarmer); }); Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index bb022ddad0a41..a59e2defec763 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -139,8 +139,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, - null); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 9ca05f4aaa4d6..de7073d3cc15b 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), null); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); } ThreadPoolStats.Stats getRefreshThreadPoolStats() { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 47c5caaebd0f0..3ea4671316f79 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -79,16 +79,14 @@ public static IndexMetadata.Builder createBackingIndex(String dataStreamName, in return IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, generation, epochMillis)) .settings(SETTINGS) .numberOfShards(NUMBER_OF_SHARDS) - .numberOfReplicas(NUMBER_OF_REPLICAS) - .dataStreamIndex(true); + .numberOfReplicas(NUMBER_OF_REPLICAS); } public static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index) { return IndexMetadata.builder(index.getName()) .settings(Settings.builder().put(SETTINGS.build()).put(SETTING_INDEX_UUID, index.getUUID())) .numberOfShards(NUMBER_OF_SHARDS) - .numberOfReplicas(NUMBER_OF_REPLICAS) - .dataStreamIndex(true); + .numberOfReplicas(NUMBER_OF_REPLICAS); } public static DataStream.TimestampField createTimestampField(String fieldName) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index edb9547e73a23..f061b12e790d3 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -239,7 +239,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -249,7 +249,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -259,7 +259,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); } @Override @@ -672,8 +672,7 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, - null); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath) { @@ -687,7 +686,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), null); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 0bff20277e67b..e0d43cab06cbb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -270,8 +270,7 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, - null); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); } private static Store createStore( diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 0fcf027329aac..0189ff997b32a 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -62,9 +62,6 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; @@ -1370,42 +1367,6 @@ public void testMultiThreadedRollover() throws Exception { ); } - // Test that datastream's segments by default are sorted on @timestamp desc - public void testReaderOnTimestampDesc() throws Exception { - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .build(); - putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null); - CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); - client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); - - // We index data in the increasing order of @timestamp field - int numDocs1 = randomIntBetween(2, 10); - indexDocs("metrics-foo", numDocs1); // 1st segment - int numDocs2 = randomIntBetween(2, 10); - indexDocs("metrics-foo", numDocs2); // 2nd segment - int numDocs3 = randomIntBetween(2, 10); - indexDocs("metrics-foo", numDocs3); // 3rd segment - int totalDocs = numDocs1 + numDocs2 + numDocs3; - - SearchSourceBuilder source = new SearchSourceBuilder(); - source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis")); - source.size(totalDocs); - SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source); - SearchResponse searchResponse = client().search(searchRequest).actionGet(); - assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value); - SearchHit[] hits = searchResponse.getHits().getHits(); - assertEquals(totalDocs, hits.length); - - // Test that when we read data, segments come in the reverse order with a segment with the latest date first - long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg - long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg - long timestamp3 = Long.valueOf(hits[0 + numDocs2 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg - assertTrue(timestamp1 > timestamp2); - assertTrue(timestamp2 > timestamp3); - } - private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } From a9d44d97d793246644ecc955b8889aef5cf86738 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Mon, 26 Jul 2021 15:26:27 -0400 Subject: [PATCH 03/11] Add segment sorter for data streams It is beneficial to sort segments within a datastream's index by desc order of their max timestamp field, so that the most recent (in terms of timestamp) segments will be first. This allows to speed up sort query on @timestamp desc field, which is the most common type of query for datastreams, as we are mostly concerned with the recent data. This patch addressed this for writable indices. Segments' sorter is different from index sorting. An index sorter by itself is only concerned about the order of docs within an individual segment (and not how the segments are organized), while the segment sorter is only used during search and allows to start docs collection with the "right" segment, so we can terminate the collection faster. This PR adds a property to IndexShard `isDataStreamIndex` that shows if a shard is a part of datastream. --- .../index/shard/IndexShardIT.java | 3 +- .../indices/IndexingMemoryControllerIT.java | 2 +- .../cluster/metadata/DataStream.java | 21 +++++++++ .../org/elasticsearch/index/IndexService.java | 6 ++- .../index/engine/EngineConfig.java | 15 ++++++- .../index/engine/InternalEngine.java | 5 +++ .../elasticsearch/index/shard/IndexShard.java | 9 +++- .../elasticsearch/indices/IndicesService.java | 5 ++- .../cluster/IndicesClusterStateService.java | 10 ++++- .../index/engine/InternalEngineTests.java | 5 ++- .../index/shard/IndexShardTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 3 +- .../IndexingMemoryControllerTests.java | 2 +- ...dicesLifecycleListenerSingleNodeTests.java | 2 +- ...actIndicesClusterStateServiceTestCase.java | 3 +- .../index/engine/EngineTestCase.java | 11 ++--- .../index/shard/IndexShardTestCase.java | 3 +- .../index/engine/FollowingEngineTests.java | 3 +- .../datastreams/DataStreamIT.java | 43 +++++++++++++++++-- .../xpack/MigrateToDataTiersIT.java | 2 - 20 files changed, 124 insertions(+), 31 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 961fa7cfd4fbf..2c9b2c82f41c3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -650,7 +650,8 @@ public static final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + false); } private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 19dc61ef36049..cb4481b42d1d8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index d176f0cc4ef2f..5fe331ce31980 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -7,6 +7,9 @@ */ package org.elasticsearch.cluster.metadata; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.PointValues; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.core.Nullable; @@ -25,6 +28,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -36,6 +40,23 @@ public final class DataStream extends AbstractDiffable implements To public static final String BACKING_INDEX_PREFIX = ".ds-"; public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); + // Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations + public static Comparator DATASTREAM_LEAF_READERS_SORTER = + Comparator.comparingLong( + (LeafReader r) -> { + try { + PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + if (points != null) { + byte[] sortValue = points.getMaxPackedValue(); + return LongPoint.decodeDimension(sortValue, 0); + } + } catch (IOException e) { + assert false : "Datastream index segment doesn't contain an expected " + + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + " field!"; + } + // this should not happen, as all data stream segments must contain @timestamp field + return Long.MAX_VALUE; }) + .reversed(); private final LongSupplier timeProvider; private final String name; diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index f19941cf6e378..b92eeb1528a16 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -386,7 +386,8 @@ private long getAvgShardSizeInBytes() throws IOException { public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException { + final RetentionLeaseSyncer retentionLeaseSyncer, + final boolean isDataStreamIndex) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* * TODO: we execute this in parallel but it's a synced method. Yet, we might @@ -478,7 +479,8 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - snapshotCommitSupplier); + snapshotCommitSupplier, + isDataStreamIndex); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); shards = Maps.copyMapWithAddedEntry(shards, shardId.id(), indexShard); diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 9b7ba7986167b..1714dad668e2c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -9,6 +9,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; @@ -32,6 +33,7 @@ import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -70,6 +72,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; + private final Comparator leafSorter; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -131,7 +134,8 @@ public EngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) { + IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, + Comparator leafSorter) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -169,6 +173,7 @@ public EngineConfig( this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.snapshotCommitSupplier = snapshotCommitSupplier; + this.leafSorter = leafSorter; } /** @@ -353,4 +358,12 @@ public LongSupplier getPrimaryTermSupplier() { public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() { return snapshotCommitSupplier; } + + /** + * Returns how segments should be sorted for reading or @null if no sorting should be applied. + */ + @Nullable + public Comparator getLeafSorter() { + return leafSorter; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3e2ca3f905e88..2c11305fe279e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2182,6 +2182,11 @@ private IndexWriterConfig getIndexWriterConfig() { if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); } + // Provide a custom leaf sorter, so that index readers opened from this writer + // will have its leaves sorted according the given leaf sorter. + if (engineConfig.getLeafSorter() != null) { + iwc.setLeafSorter(engineConfig.getLeafSorter()); + } return iwc; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 71ea3f1b103d4..b973e064b5832 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -185,6 +185,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -283,6 +284,7 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; + private final boolean isDataStreamIndex; // if a shard is a part of data stream public IndexShard( final ShardRouting shardRouting, @@ -304,7 +306,8 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) throws IOException { + final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, + boolean isDataStreamIndex) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; @@ -387,6 +390,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); + this.isDataStreamIndex = isDataStreamIndex; } public ThreadPool getThreadPool() { @@ -2912,7 +2916,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { globalCheckpointSupplier, replicationTracker::getRetentionLeases, this::getOperationPrimaryTerm, - snapshotCommitSupplier); + snapshotCommitSupplier, + isDataStreamIndex ? DATASTREAM_LEAF_READERS_SORTER : null); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 395793d7e99e9..7074b0680c913 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -767,13 +767,14 @@ public IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode) throws IOException { + final DiscoveryNode sourceNode, + final boolean isDataStreamIndex) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, isDataStreamIndex); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 22465754fa4ee..d4fbfc3dbe678 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -576,6 +577,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR try { final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id()); logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); + IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(shardRouting.getIndexName()); + final boolean isDataStreamIndex = (indexAbstraction != null) && (indexAbstraction.getParentDataStream() != null); indicesService.createShard( shardRouting, recoveryTargetService, @@ -585,7 +588,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR this::updateGlobalCheckpointForShard, retentionLeaseSyncer, nodes.getLocalNode(), - sourceNode); + sourceNode, + isDataStreamIndex); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); } @@ -900,6 +904,7 @@ U createIndex(IndexMetadata indexMetadata, * @param retentionLeaseSyncer a callback when this shard syncs retention leases * @param targetNode the node where this shard will be recovered * @param sourceNode the source node to recover this shard from (it might be null) + * @param isDataStreamIndex true if an shard belongs to an index that is a part of a data stream. * @return a new shard * @throws IOException if an I/O exception occurs when creating the shard */ @@ -912,7 +917,8 @@ T createShard( Consumer globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode) throws IOException; + @Nullable DiscoveryNode sourceNode, + boolean isDataStreamIndex) throws IOException; /** * Returns shard for the specified id if it exists otherwise returns null. diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7d65ca3d0d16c..72240b1e3d3df 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2946,7 +2946,8 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! @@ -6020,7 +6021,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d39a7261378ce..43d91a2bc706d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4093,7 +4093,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, config.getLeafSorter()); return new InternalEngine(configWithWarmer); }); Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index a59e2defec763..bb022ddad0a41 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -139,7 +139,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index de7073d3cc15b..8aafddd59df43 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } ThreadPoolStats.Stats getRefreshThreadPoolStats() { diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 72129f29a3e9f..89a4b645b2c8b 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -120,7 +120,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY); + IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, false); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 6bfef1300718d..8874882022329 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -228,7 +228,8 @@ public MockIndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode) throws IOException { + final DiscoveryNode sourceNode, + final boolean isDataStreamIndex) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f061b12e790d3..1a405dfd1b071 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -239,7 +239,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -249,7 +249,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -259,7 +259,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } @Override @@ -672,7 +672,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath) { @@ -686,7 +687,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier()); + config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter()); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index fe7552862f35b..d7467d8d7a469 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -395,7 +395,8 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + false); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; } finally { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index e0d43cab06cbb..0bff20277e67b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -270,7 +270,8 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + null); } private static Store createStore( diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 0189ff997b32a..62cbe6500993a 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -62,6 +62,9 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; @@ -1099,10 +1102,6 @@ public void testGetDataStream() throws Exception { assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue())); } - private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) { - assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date")); - } - private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping, Map expectedMapping) { GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet(); assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue()); @@ -1367,6 +1366,42 @@ public void testMultiThreadedRollover() throws Exception { ); } + // Test that datastream's segments by default are sorted on @timestamp desc + public void testSegmentsSortedOnTimestampDesc() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + // We index data in the increasing order of @timestamp field + int numDocs1 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs1); // 1st segment + int numDocs2 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs2); // 2nd segment + int numDocs3 = randomIntBetween(2, 10); + indexDocs("metrics-foo", numDocs3); // 3rd segment + int totalDocs = numDocs1 + numDocs2 + numDocs3; + + SearchSourceBuilder source = new SearchSourceBuilder(); + source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis")); + source.size(totalDocs); + SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value); + SearchHit[] hits = searchResponse.getHits().getHits(); + assertEquals(totalDocs, hits.length); + + // Test that when we read data, segments come in the reverse order with a segment with the latest date first + long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg + long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg + long timestamp3 = Long.valueOf(hits[0 + numDocs3 + numDocs2].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg + assertTrue(timestamp1 > timestamp2); + assertTrue(timestamp2 > timestamp3); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java index 9f934097959b2..5ecf3a1087b73 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java @@ -60,8 +60,6 @@ import static org.hamcrest.Matchers.nullValue; public class MigrateToDataTiersIT extends ESRestTestCase { - private static final Logger logger = LogManager.getLogger(MigrateToDataTiersIT.class); - private String index; private String policy; private String alias; From e73bb4c2ec33f310d4b3bad93dce880614c96212 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Wed, 4 Aug 2021 13:51:06 -0400 Subject: [PATCH 04/11] IndexShard to use mappingLookup().isDataStreamTimestampFieldEnabled() after merging of PR#75906 --- .../index/shard/IndexShardIT.java | 3 +- .../org/elasticsearch/index/IndexService.java | 6 +- .../index/engine/ReadOnlyEngine.java | 2 + .../elasticsearch/index/shard/IndexShard.java | 5 +- .../elasticsearch/indices/IndicesService.java | 5 +- .../cluster/IndicesClusterStateService.java | 10 +- ...dicesLifecycleListenerSingleNodeTests.java | 2 +- ...actIndicesClusterStateServiceTestCase.java | 3 +- .../index/shard/IndexShardTestCase.java | 3 +- ...1_sort_segments_migrate_to_data_stream.yml | 107 ++++++++++++++++++ 10 files changed, 121 insertions(+), 25 deletions(-) create mode 100644 x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 534f18a1dff4f..2f90c28d509b7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -578,8 +578,7 @@ public static final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, - false); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); } private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 958d182815567..cac67a18ef04a 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -385,8 +385,7 @@ private long getAvgShardSizeInBytes() throws IOException { public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer, - final boolean isDataStreamIndex) throws IOException { + final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* * TODO: we execute this in parallel but it's a synced method. Yet, we might @@ -463,8 +462,7 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - snapshotCommitSupplier, - isDataStreamIndex); + snapshotCommitSupplier); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); shards = Maps.copyMapWithAddedEntry(shards, shardId.id(), indexShard); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 54c10dbef9449..0f2441dcbc621 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -199,6 +199,8 @@ protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader, } protected DirectoryReader open(IndexCommit commit) throws IOException { + // TODO: provide engineConfig.getLeafSorter() when opening a DirectoryReader from a commit + // should be available from Lucene v 8.10 assert Transports.assertNotTransportThread("opening index commit of a read-only engine"); if (lazilyLoadSoftDeletes) { return new LazySoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b973e064b5832..74dcb6057b86e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -306,8 +306,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, - boolean isDataStreamIndex) throws IOException { + final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; @@ -390,7 +389,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); - this.isDataStreamIndex = isDataStreamIndex; + this.isDataStreamIndex = mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); } public ThreadPool getThreadPool() { diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index fdbffa8f3b76b..bd602415568af 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -771,14 +771,13 @@ public IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode, - final boolean isDataStreamIndex) throws IOException { + final DiscoveryNode sourceNode) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, isDataStreamIndex); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index f0b558ce36bc1..7e3750ec40251 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -20,7 +20,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -577,8 +576,6 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR try { final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id()); logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); - IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(shardRouting.getIndexName()); - final boolean isDataStreamIndex = (indexAbstraction != null) && (indexAbstraction.getParentDataStream() != null); indicesService.createShard( shardRouting, recoveryTargetService, @@ -588,8 +585,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR this::updateGlobalCheckpointForShard, retentionLeaseSyncer, nodes.getLocalNode(), - sourceNode, - isDataStreamIndex); + sourceNode); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); } @@ -904,7 +900,6 @@ U createIndex(IndexMetadata indexMetadata, * @param retentionLeaseSyncer a callback when this shard syncs retention leases * @param targetNode the node where this shard will be recovered * @param sourceNode the source node to recover this shard from (it might be null) - * @param isDataStreamIndex true if an shard belongs to an index that is a part of a data stream. * @return a new shard * @throws IOException if an I/O exception occurs when creating the shard */ @@ -917,8 +912,7 @@ T createShard( Consumer globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode, - boolean isDataStreamIndex) throws IOException; + @Nullable DiscoveryNode sourceNode) throws IOException; /** * Returns shard for the specified id if it exists otherwise returns null. diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 89a4b645b2c8b..72129f29a3e9f 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -120,7 +120,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, false); + IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 8874882022329..6bfef1300718d 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -228,8 +228,7 @@ public MockIndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode, - final boolean isDataStreamIndex) throws IOException { + final DiscoveryNode sourceNode) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 0241bd7158b57..73316ce5ed52a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -396,8 +396,7 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, - false); + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; } finally { diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml new file mode 100644 index 0000000000000..a3e121abdddfe --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/131_sort_segments_migrate_to_data_stream.yml @@ -0,0 +1,107 @@ +--- +"Test that datastream index segments are sorted on timestamp field desc after data stream migration": + - skip: + #TODO: adjust version after backport + version: " - 7.99.99" + reason: "sorting segments was added in 7.15" + features: allowed_warnings + + - do: + indices.put_index_template: + name: my-template + body: + index_patterns: [ my_ds ] + data_stream: { } + template: + settings: + number_of_shards: 1 + number_of_replicas: 0 + + - do: + indices.create: + index: test_index1 + body: + aliases: + my_ds: + is_write_index: true + + # 1st segment + - do: + index: + index: my_ds + body: { "foo": "bar1", "@timestamp": "2021-08-01" } + refresh: true + + # 2nd segment + - do: + index: + index: my_ds + body: { "foo": "bar2", "@timestamp": "2021-08-02" } + refresh: true + + # test that segments are sorted as indexed by @timestamp ASC + - do: + search: + index: my_ds + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-01"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-02"] } + + # migrate to data-stream + - do: + indices.migrate_to_data_stream: + name: my_ds + - is_true: acknowledged + + # test that segments are still sorted as indexed by @timestamp ASC + # as we don't reopen existing shards and index readers after migration + - do: + search: + index: my_ds + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-01"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-02"] } + + # rollover data stream to create new backing index + - do: + indices.rollover: + alias: "my_ds" + - match: { rolled_over: true } + # save the new backing index names for later use + - set: { new_index: idx0name } + + # 1st segment in the new backing index + - do: + index: + index: my_ds + body: { "foo": "bar3", "@timestamp": "2021-08-03" } + refresh: true + + # 2nd segment in the new backing index + - do: + index: + index: my_ds + body: { "foo": "bar4", "@timestamp": "2021-08-04" } + refresh: true + + + # test that segments are sorted by @timestamp DESC in the new backing index, + # as the newly created index and shard pick up the index leaf sorter + - do: + search: + index: $idx0name + body: + fields: [{ "field":"@timestamp", "format":"yyyy-MM-dd" }] + - match: { hits.total.value: 2 } + - match: { hits.hits.0.fields.@timestamp: ["2021-08-04"] } + - match: { hits.hits.1.fields.@timestamp: ["2021-08-03"] } + + + - do: + indices.delete_data_stream: + name: my_ds + - is_true: acknowledged From 4365ed05c001fc7011a4fe5b950c55ec5f3d7116 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Mon, 9 Aug 2021 14:57:54 -0400 Subject: [PATCH 05/11] Check for datastream index only when MapperService is enabled --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 74dcb6057b86e..7af1ced5cedfa 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -389,7 +389,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); - this.isDataStreamIndex = mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); + this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); } public ThreadPool getThreadPool() { From 770a179c23a5d533266494510a4e4bc43a4732f7 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Mon, 9 Aug 2021 16:21:36 -0400 Subject: [PATCH 06/11] Temp --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7af1ced5cedfa..050a17d543338 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -9,6 +9,8 @@ package org.elasticsearch.index.shard; import com.carrotsearch.hppc.ObjectLongMap; + +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.analysis.Analyzer; @@ -389,7 +391,8 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); - this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); + //this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); + this.isDataStreamIndex = true; } public ThreadPool getThreadPool() { @@ -2893,6 +2896,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { this.warmer.warm(reader); } }; + logger.log(Level.INFO, "*** dataStreamIndex[" + isDataStreamIndex + "]"); return new EngineConfig( shardId, threadPool, From 6a04a5729b68d27dd8ca27fb1dee57825c875e5f Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Thu, 2 Sep 2021 10:56:23 -0400 Subject: [PATCH 07/11] Throw an exception when datastream doesn't contain @timestamp field. --- .../org/elasticsearch/cluster/metadata/DataStream.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 5fe331ce31980..8d6a2dc3b4c16 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -51,11 +51,10 @@ public final class DataStream extends AbstractDiffable implements To return LongPoint.decodeDimension(sortValue, 0); } } catch (IOException e) { - assert false : "Datastream index segment doesn't contain an expected " + - DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + " field!"; } - // this should not happen, as all data stream segments must contain @timestamp field - return Long.MAX_VALUE; }) + throw new IllegalArgumentException("Datastream index segment doesn't contain an expected " + + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + " field!"); + }) .reversed(); private final LongSupplier timeProvider; From 8724cdec24680bb50d53b9626f02782dc36d7fd5 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Thu, 2 Sep 2021 11:09:16 -0400 Subject: [PATCH 08/11] Remove temp debugging --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 050a17d543338..dc82261235d20 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -10,7 +10,6 @@ import com.carrotsearch.hppc.ObjectLongMap; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.analysis.Analyzer; @@ -391,8 +390,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); - //this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); - this.isDataStreamIndex = true; + this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled(); } public ThreadPool getThreadPool() { @@ -2896,7 +2894,6 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { this.warmer.warm(reader); } }; - logger.log(Level.INFO, "*** dataStreamIndex[" + isDataStreamIndex + "]"); return new EngineConfig( shardId, threadPool, From fbb937b2d67096b33f59c1527979dfd327603d88 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Thu, 2 Sep 2021 11:12:09 -0400 Subject: [PATCH 09/11] Change IllegalArgument to IllegatState exception --- .../java/org/elasticsearch/cluster/metadata/DataStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 8d6a2dc3b4c16..1f87b4d7ed295 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -52,8 +52,8 @@ public final class DataStream extends AbstractDiffable implements To } } catch (IOException e) { } - throw new IllegalArgumentException("Datastream index segment doesn't contain an expected " + - DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + " field!"); + throw new IllegalStateException("Datastream index segment doesn't contain an expected [" + + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field!"); }) .reversed(); From 77941bcbe11d8126baad7dd16dd88cca0376007d Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Thu, 2 Sep 2021 13:40:40 -0400 Subject: [PATCH 10/11] Return Long.MIN_VALUE for datastream sorter if the segment doesn't contain documents. --- .../org/elasticsearch/cluster/metadata/DataStream.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 1f87b4d7ed295..fabb502f76898 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -51,9 +51,12 @@ public final class DataStream extends AbstractDiffable implements To return LongPoint.decodeDimension(sortValue, 0); } } catch (IOException e) { + throw new IllegalStateException("Failed reading [" + + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field for the data stream!"); } - throw new IllegalStateException("Datastream index segment doesn't contain an expected [" + - DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field!"); + // this may happen if the segment contains only deleted documents + assert(r.numDocs() == 0); + return Long.MIN_VALUE; }) .reversed(); From 4c5bddc72400e1a4af9da6292754ea7cf8c8ace4 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Fri, 3 Sep 2021 07:21:49 -0400 Subject: [PATCH 11/11] Change assertion to if statement --- .../org/elasticsearch/cluster/metadata/DataStream.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index fabb502f76898..70557878772e6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -49,14 +49,14 @@ public final class DataStream extends AbstractDiffable implements To if (points != null) { byte[] sortValue = points.getMaxPackedValue(); return LongPoint.decodeDimension(sortValue, 0); + } else if (r.numDocs() == 0) { + // points can be null if the segment contains only deleted documents + return Long.MIN_VALUE; } } catch (IOException e) { - throw new IllegalStateException("Failed reading [" + - DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field for the data stream!"); } - // this may happen if the segment contains only deleted documents - assert(r.numDocs() == 0); - return Long.MIN_VALUE; + throw new IllegalStateException("Can't access [" + + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field for the data stream!"); }) .reversed();