Skip to content

Commit

Permalink
Revert "Revert "Time series based workload desc order optimization th…
Browse files Browse the repository at this point in the history
…rough reverse segment read (opensearch-project#7244)" (opensearch-project#7892)"

This reverts commit bb26536.

Signed-off-by: gashutos <[email protected]>
  • Loading branch information
gashutos committed Jun 8, 2023
1 parent e6348c5 commit 59285c0
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 6 deletions.
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-2.8.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377))
- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597))
- [Search Pipelines] Add script processor ([#7607](https://github.com/opensearch-project/OpenSearch/pull/7607))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
- Add task cancellation timestamp in task API ([#7455](https://github.com/opensearch-project/OpenSearch/pull/7455))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@

package org.opensearch.cluster.metadata;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PointValues;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.core.ParseField;
Expand All @@ -46,6 +50,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -59,6 +64,24 @@
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {

public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final String TIMESERIES_FIELDNAME = "@timestamp";
public static final Comparator<LeafReader> TIMESERIES_LEAF_SORTER = Comparator.comparingLong((LeafReader r) -> {
try {
PointValues points = r.getPointValues(TIMESERIES_FIELDNAME);
if (points != null) {
// could be a multipoint (probably not) but get the maximum time value anyway
byte[] sortValue = points.getMaxPackedValue();
// decode the first dimension because this should not be a multi dimension field
// it's a bug in the date field if it is
return LongPoint.decodeDimension(sortValue, 0);
} else {
// segment does not have a timestamp field, just return the minimum value
return Long.MIN_VALUE;
}
} catch (IOException e) {
throw new OpenSearchException("Not a timeseries Index! Field [{}] not found!", TIMESERIES_FIELDNAME);
}
}).reversed();

private final String name;
private final TimestampField timeStampField;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile long mappingTotalFieldsLimit;
private volatile long mappingDepthLimit;
private volatile long mappingFieldNameLengthLimit;
private volatile boolean searchSegmentOrderReversed;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -905,6 +906,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_PIPELINE, this::setDefaultSearchPipeline);
}

private void setSearchSegmentOrderReversed(boolean reversed) {
this.searchSegmentOrderReversed = reversed;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
this.searchIdleAfter = searchIdleAfter;
}
Expand Down Expand Up @@ -1084,6 +1089,13 @@ public Settings getNodeSettings() {
return nodeSettings;
}

/**
* Returns true if index level setting for leaf reverse order search optimization is enabled
*/
public boolean getSearchSegmentOrderReversed() {
return this.searchSegmentOrderReversed;
}

/**
* Updates the settings and index metadata and notifies all registered settings consumers with the new settings iff at least one
* setting has changed.
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -59,6 +60,7 @@
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -102,6 +104,7 @@ public final class EngineConfig {
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final Comparator<LeafReader> leafSorter;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -204,6 +207,7 @@ private EngineConfig(Builder builder) {
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.translogFactory = builder.translogFactory;
this.leafSorter = builder.leafSorter;
}

/**
Expand Down Expand Up @@ -451,6 +455,15 @@ public TranslogDeletionPolicyFactory getCustomTranslogDeletionPolicyFactory() {
return translogDeletionPolicyFactory;
}

/**
* Returns subReaderSorter for org.apache.lucene.index.BaseCompositeReader.
* This gets used in lucene IndexReader and decides order of segment read.
* @return comparator
*/
public Comparator<LeafReader> getLeafSorter() {
return this.leafSorter;
}

/**
* Builder for EngineConfig class
*
Expand Down Expand Up @@ -483,6 +496,7 @@ public static class Builder {
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();
Comparator<LeafReader> leafSorter;

public Builder shardId(ShardId shardId) {
this.shardId = shardId;
Expand Down Expand Up @@ -614,6 +628,11 @@ public Builder translogFactory(TranslogFactory translogFactory) {
return this;
}

public Builder leafSorter(Comparator<LeafReader> leafSorter) {
this.leafSorter = leafSorter;
return this;
}

public EngineConfig build() {
return new EngineConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -36,6 +37,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -151,7 +153,8 @@ public EngineConfig newEngineConfig(
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
TranslogFactory translogFactory
TranslogFactory translogFactory,
Comparator<LeafReader> leafSorter
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -184,6 +187,7 @@ public EngineConfig newEngineConfig(
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.translogFactory(translogFactory)
.leafSorter(leafSorter)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,9 @@ private IndexWriterConfig getIndexWriterConfig() {
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
}
if (config().getLeafSorter() != null) {
iwc.setLeafSorter(config().getLeafSorter()); // The default segment search order
}
return iwc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.mapper;

import org.apache.lucene.analysis.Analyzer;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.FieldNameAnalyzer;

Expand Down Expand Up @@ -261,6 +262,15 @@ public String getNestedScope(String path) {
return null;
}

/**
* If this index contains @timestamp field with Date type, it will return true
* @return true or false based on above condition
*/
public boolean containsTimeStampField() {
MappedFieldType timeSeriesFieldType = this.fieldTypeLookup.get(DataStream.TIMESERIES_FIELDNAME);
return timeSeriesFieldType != null && timeSeriesFieldType instanceof DateFieldMapper.DateFieldType; // has to be Date field type
}

private static String parentObject(String field) {
int lastDot = field.lastIndexOf('.');
if (lastDot == -1) {
Expand Down
16 changes: 15 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
Expand Down Expand Up @@ -333,6 +334,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final boolean isTimeSeriesIndex;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

public IndexShard(
Expand Down Expand Up @@ -451,6 +453,9 @@ public boolean shouldCache(Query query) {
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactorySupplier = translogFactorySupplier;
this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null)
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
}

Expand Down Expand Up @@ -3627,7 +3632,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
translogFactorySupplier.apply(indexSettings, shardRouting)
translogFactorySupplier.apply(indexSettings, shardRouting),
isTimeSeriesIndex ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for timeseries
);
}

Expand Down Expand Up @@ -4670,4 +4676,12 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* If index is time series (if it contains @timestamp field)
* @return true or false based on above condition
*/
public boolean isTimeSeriesIndex() {
return this.isTimeSeriesIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
import org.opensearch.search.sort.SortOrder;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -282,8 +283,17 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
for (LeafReaderContext ctx : leaves) { // search each subreader
searchLeaf(ctx, weight, collector);
if (shouldReverseLeafReaderContexts()) {
// reverse the segment search order if this flag is true.
// Certain queries can benefit if we reverse the segment read order,
// for example time series based queries if searched for desc sort order.
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
}
}
}

Expand Down Expand Up @@ -496,4 +506,22 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
}
return true;
}

private boolean shouldReverseLeafReaderContexts() {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext != null && searchContext.indexShard().isTimeSeriesIndex()) {
// Only reverse order for asc order sort queries
if (searchContext.request() != null
&& searchContext.request().source() != null
&& searchContext.request().source().sorts() != null
&& searchContext.request().source().sorts().size() > 0
&& searchContext.request().source().sorts().get(0).order() == SortOrder.ASC) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public void testCreateEngineConfigFromFactory() {
null,
false,
() -> Boolean.TRUE,
new InternalTranslogFactory()
new InternalTranslogFactory(),
null
);

assertNotNull(config.getCodec());
Expand Down Expand Up @@ -148,7 +149,8 @@ public void testCreateCodecServiceFromFactory() {
null,
false,
() -> Boolean.TRUE,
new InternalTranslogFactory()
new InternalTranslogFactory(),
null
);
assertNotNull(config.getCodec());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ public Settings indexSettings() {
).getStringRep()
);
}

return builder.build();
}

Expand Down

0 comments on commit 59285c0

Please sign in to comment.