Skip to content

Commit

Permalink
Revert "Revert "Time series based workload desc order optimization th…
Browse files Browse the repository at this point in the history
…rough reverse segment read (#7244)" (#7892)"

This reverts commit bb26536.

Signed-off-by: gashutos <[email protected]>
  • Loading branch information
gashutos committed Jun 8, 2023
1 parent e6348c5 commit 59285c0
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 6 deletions.
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-2.8.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377))
- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597))
- [Search Pipelines] Add script processor ([#7607](https://github.com/opensearch-project/OpenSearch/pull/7607))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
- Add task cancellation timestamp in task API ([#7455](https://github.com/opensearch-project/OpenSearch/pull/7455))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@

package org.opensearch.cluster.metadata;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PointValues;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.core.ParseField;
Expand All @@ -46,6 +50,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -59,6 +64,24 @@
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {

public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final String TIMESERIES_FIELDNAME = "@timestamp";
public static final Comparator<LeafReader> TIMESERIES_LEAF_SORTER = Comparator.comparingLong((LeafReader r) -> {
try {
PointValues points = r.getPointValues(TIMESERIES_FIELDNAME);
if (points != null) {
// could be a multipoint (probably not) but get the maximum time value anyway
byte[] sortValue = points.getMaxPackedValue();
// decode the first dimension because this should not be a multi dimension field
// it's a bug in the date field if it is
return LongPoint.decodeDimension(sortValue, 0);
} else {
// segment does not have a timestamp field, just return the minimum value
return Long.MIN_VALUE;
}
} catch (IOException e) {
throw new OpenSearchException("Not a timeseries Index! Field [{}] not found!", TIMESERIES_FIELDNAME);
}
}).reversed();

private final String name;
private final TimestampField timeStampField;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile long mappingTotalFieldsLimit;
private volatile long mappingDepthLimit;
private volatile long mappingFieldNameLengthLimit;
private volatile boolean searchSegmentOrderReversed;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -905,6 +906,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_PIPELINE, this::setDefaultSearchPipeline);
}

private void setSearchSegmentOrderReversed(boolean reversed) {
this.searchSegmentOrderReversed = reversed;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
this.searchIdleAfter = searchIdleAfter;
}
Expand Down Expand Up @@ -1084,6 +1089,13 @@ public Settings getNodeSettings() {
return nodeSettings;
}

/**
* Returns true if index level setting for leaf reverse order search optimization is enabled
*/
public boolean getSearchSegmentOrderReversed() {
return this.searchSegmentOrderReversed;
}

/**
* Updates the settings and index metadata and notifies all registered settings consumers with the new settings iff at least one
* setting has changed.
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -59,6 +60,7 @@
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -102,6 +104,7 @@ public final class EngineConfig {
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final Comparator<LeafReader> leafSorter;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -204,6 +207,7 @@ private EngineConfig(Builder builder) {
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.translogFactory = builder.translogFactory;
this.leafSorter = builder.leafSorter;
}

/**
Expand Down Expand Up @@ -451,6 +455,15 @@ public TranslogDeletionPolicyFactory getCustomTranslogDeletionPolicyFactory() {
return translogDeletionPolicyFactory;
}

/**
* Returns subReaderSorter for org.apache.lucene.index.BaseCompositeReader.
* This gets used in lucene IndexReader and decides order of segment read.
* @return comparator
*/
public Comparator<LeafReader> getLeafSorter() {
return this.leafSorter;
}

/**
* Builder for EngineConfig class
*
Expand Down Expand Up @@ -483,6 +496,7 @@ public static class Builder {
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();
Comparator<LeafReader> leafSorter;

public Builder shardId(ShardId shardId) {
this.shardId = shardId;
Expand Down Expand Up @@ -614,6 +628,11 @@ public Builder translogFactory(TranslogFactory translogFactory) {
return this;
}

public Builder leafSorter(Comparator<LeafReader> leafSorter) {
this.leafSorter = leafSorter;
return this;
}

public EngineConfig build() {
return new EngineConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -36,6 +37,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -151,7 +153,8 @@ public EngineConfig newEngineConfig(
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
TranslogFactory translogFactory
TranslogFactory translogFactory,
Comparator<LeafReader> leafSorter
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -184,6 +187,7 @@ public EngineConfig newEngineConfig(
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.translogFactory(translogFactory)
.leafSorter(leafSorter)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,9 @@ private IndexWriterConfig getIndexWriterConfig() {
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
}
if (config().getLeafSorter() != null) {
iwc.setLeafSorter(config().getLeafSorter()); // The default segment search order
}
return iwc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.mapper;

import org.apache.lucene.analysis.Analyzer;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.FieldNameAnalyzer;

Expand Down Expand Up @@ -261,6 +262,15 @@ public String getNestedScope(String path) {
return null;
}

/**
* If this index contains @timestamp field with Date type, it will return true
* @return true or false based on above condition
*/
public boolean containsTimeStampField() {
MappedFieldType timeSeriesFieldType = this.fieldTypeLookup.get(DataStream.TIMESERIES_FIELDNAME);
return timeSeriesFieldType != null && timeSeriesFieldType instanceof DateFieldMapper.DateFieldType; // has to be Date field type
}

private static String parentObject(String field) {
int lastDot = field.lastIndexOf('.');
if (lastDot == -1) {
Expand Down
16 changes: 15 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
Expand Down Expand Up @@ -333,6 +334,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final boolean isTimeSeriesIndex;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

public IndexShard(
Expand Down Expand Up @@ -451,6 +453,9 @@ public boolean shouldCache(Query query) {
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactorySupplier = translogFactorySupplier;
this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null)
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
}

Expand Down Expand Up @@ -3627,7 +3632,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
translogFactorySupplier.apply(indexSettings, shardRouting)
translogFactorySupplier.apply(indexSettings, shardRouting),
isTimeSeriesIndex ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for timeseries
);
}

Expand Down Expand Up @@ -4670,4 +4676,12 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* If index is time series (if it contains @timestamp field)
* @return true or false based on above condition
*/
public boolean isTimeSeriesIndex() {
return this.isTimeSeriesIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
import org.opensearch.search.sort.SortOrder;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -282,8 +283,17 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
for (LeafReaderContext ctx : leaves) { // search each subreader
searchLeaf(ctx, weight, collector);
if (shouldReverseLeafReaderContexts()) {
// reverse the segment search order if this flag is true.
// Certain queries can benefit if we reverse the segment read order,
// for example time series based queries if searched for desc sort order.
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
}
}
}

Expand Down Expand Up @@ -496,4 +506,22 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
}
return true;
}

private boolean shouldReverseLeafReaderContexts() {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext != null && searchContext.indexShard().isTimeSeriesIndex()) {
// Only reverse order for asc order sort queries
if (searchContext.request() != null
&& searchContext.request().source() != null
&& searchContext.request().source().sorts() != null
&& searchContext.request().source().sorts().size() > 0
&& searchContext.request().source().sorts().get(0).order() == SortOrder.ASC) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public void testCreateEngineConfigFromFactory() {
null,
false,
() -> Boolean.TRUE,
new InternalTranslogFactory()
new InternalTranslogFactory(),
null
);

assertNotNull(config.getCodec());
Expand Down Expand Up @@ -148,7 +149,8 @@ public void testCreateCodecServiceFromFactory() {
null,
false,
() -> Boolean.TRUE,
new InternalTranslogFactory()
new InternalTranslogFactory(),
null
);
assertNotNull(config.getCodec());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ public Settings indexSettings() {
).getStringRep()
);
}

return builder.build();
}

Expand Down

0 comments on commit 59285c0

Please sign in to comment.