Skip to content

Commit

Permalink
Fix false positive query timeouts due to using cached time (#3454) (#…
Browse files Browse the repository at this point in the history
…3624)

* Fix false positive query timeouts due to using cached time

Signed-off-by: Ahmad AbuKhalil <[email protected]>

* delegate nanoTime call to SearchContext

Signed-off-by: Ahmad AbuKhalil <[email protected]>

* add override to SearchContext getRelativeTimeInMillis to force non cached time

Signed-off-by: Ahmad AbuKhalil <[email protected]>
(cherry picked from commit b5f137b)

Co-authored-by: Ahmad AbuKhalil <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and aabukhalil authored Jun 17, 2022
1 parent 138c602 commit 61cecff
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.benchmark.time;

import org.openjdk.jmh.annotations.*;

import java.util.concurrent.TimeUnit;

@Fork(3)
@Warmup(iterations = 10)
@Measurement(iterations = 20)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
@SuppressWarnings("unused") // invoked by benchmarking framework
public class NanoTimeVsCurrentTimeMillisBenchmark {
private volatile long var = 0;

@Benchmark
public long currentTimeMillis() {
return System.currentTimeMillis();
}

@Benchmark
public long nanoTime() {
return System.nanoTime();
}

/*
* this acts as upper bound of how time is cached in org.opensearch.threadpool.ThreadPool
* */
@Benchmark
public long accessLongVar() {
return var++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,24 @@ public final boolean hasOnlySuggest() {

/**
* Returns time in milliseconds that can be used for relative time calculations.
* WARN: This is not the epoch time.
* WARN: This is not the epoch time and can be a cached time.
*/
public abstract long getRelativeTimeInMillis();

/**
* Returns time in milliseconds that can be used for relative time calculations. this method will fall back to
* {@link SearchContext#getRelativeTimeInMillis()} (which might be a cached time) if useCache was set to true else it will be just be a
* wrapper of {@link System#nanoTime()} converted to milliseconds.
* @param useCache to allow using cached time if true or forcing calling {@link System#nanoTime()} if false
* @return Returns time in milliseconds that can be used for relative time calculations.
*/
public long getRelativeTimeInMillis(boolean useCache) {
if (useCache) {
return getRelativeTimeInMillis();
}
return TimeValue.nsecToMSec(System.nanoTime());
}

/** Return a view of the additional query collector managers that should be run for this context. */
public abstract Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> queryCollectorManagers();

Expand Down
32 changes: 23 additions & 9 deletions server/src/main/java/org/opensearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,7 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q

final Runnable timeoutRunnable;
if (timeoutSet) {
final long startTime = searchContext.getRelativeTimeInMillis();
final long timeout = searchContext.timeout().millis();
final long maxTime = startTime + timeout;
timeoutRunnable = searcher.addQueryCancellation(() -> {
final long time = searchContext.getRelativeTimeInMillis();
if (time > maxTime) {
throw new TimeExceededException();
}
});
timeoutRunnable = searcher.addQueryCancellation(createQueryTimeoutChecker(searchContext));
} else {
timeoutRunnable = null;
}
Expand Down Expand Up @@ -309,6 +301,28 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
}
}

/**
* Create runnable which throws {@link TimeExceededException} when the runnable is called after timeout + runnable creation time
* exceeds currentTime
* @param searchContext to extract timeout from and to get relative time from
* @return the created runnable
*/
static Runnable createQueryTimeoutChecker(final SearchContext searchContext) {
/* for startTime, relative non-cached precise time must be used to prevent false positive timeouts.
* Using cached time for startTime will fail and produce false positive timeouts when maxTime = (startTime + timeout) falls in
* next time cache slot(s) AND time caching lifespan > passed timeout */
final long startTime = searchContext.getRelativeTimeInMillis(false);
final long maxTime = startTime + searchContext.timeout().millis();
return () -> {
/* As long as startTime is non cached time, using cached time here might only produce false negative timeouts within the time
* cache life span which is acceptable */
final long time = searchContext.getRelativeTimeInMillis();
if (time > maxTime) {
throw new TimeExceededException();
}
};
}

private static boolean searchWithCollector(
SearchContext searchContext,
ContextIndexSearcher searcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
// make sure getPreciseRelativeTimeInMillis is same as System.nanoTime()
long timeToleranceInMs = 10;
long currTime = TimeValue.nsecToMSec(System.nanoTime());
assertTrue(Math.abs(context3.getRelativeTimeInMillis(false) - currTime) <= timeToleranceInMs);

when(queryShardContext.getIndexSettings()).thenReturn(indexSettings);
when(queryShardContext.fieldMapper(anyString())).thenReturn(mock(MappedFieldType.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
Expand All @@ -105,6 +106,7 @@
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.test.TestSearchContext;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -117,9 +119,14 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.opensearch.search.query.TopDocsCollectorContext.hasInfMaxScore;

public class QueryPhaseTests extends IndexShardTestCase {
Expand Down Expand Up @@ -1079,6 +1086,58 @@ public void testCancellationDuringPreprocess() throws IOException {
}
}

public void testQueryTimeoutChecker() throws Exception {
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
long timeTolerance = timeCacheLifespan / 20;

// should throw time exceed exception for sure after timeCacheLifespan*2+timeTolerance (next's next cached time is available)
assertThrows(
QueryPhase.TimeExceededException.class,
() -> createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan * 2 + timeTolerance, true, false)
);

// should not throw time exceed exception after timeCacheLifespan+timeTolerance because new cached time - init time < timeout
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan + timeTolerance, true, false);

// should not throw time exceed exception after timeout < timeCacheLifespan when cached time didn't change
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 2, timeCacheLifespan / 2 + timeTolerance, false, true);
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 4, timeCacheLifespan / 2 + timeTolerance, false, true);
}

private void createTimeoutCheckerThenWaitThenRun(
long timeout,
long sleepAfterCreation,
boolean checkCachedTimeChanged,
boolean checkCachedTimeHasNotChanged
) throws Exception {
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
long timeTolerance = timeCacheLifespan / 20;
long currentTimeDiffWithCachedTime = TimeValue.nsecToMSec(System.nanoTime()) - threadPool.relativeTimeInMillis();
// need to run this test approximately at the start of cached time window
long timeToAlignTimeWithCachedTimeOffset = timeCacheLifespan - currentTimeDiffWithCachedTime + timeTolerance;
Thread.sleep(timeToAlignTimeWithCachedTimeOffset);

long initialRelativeCachedTime = threadPool.relativeTimeInMillis();
SearchContext mockedSearchContext = mock(SearchContext.class);
when(mockedSearchContext.timeout()).thenReturn(TimeValue.timeValueMillis(timeout));
when(mockedSearchContext.getRelativeTimeInMillis()).thenAnswer(invocation -> threadPool.relativeTimeInMillis());
when(mockedSearchContext.getRelativeTimeInMillis(eq(false))).thenCallRealMethod();
Runnable queryTimeoutChecker = QueryPhase.createQueryTimeoutChecker(mockedSearchContext);
// make sure next time slot become available
Thread.sleep(sleepAfterCreation);
if (checkCachedTimeChanged) {
assertNotEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
}
if (checkCachedTimeHasNotChanged) {
assertEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
}
queryTimeoutChecker.run();
verify(mockedSearchContext, times(1)).timeout();
verify(mockedSearchContext, times(1)).getRelativeTimeInMillis(eq(false));
verify(mockedSearchContext, atLeastOnce()).getRelativeTimeInMillis();
verifyNoMoreInteractions(mockedSearchContext);
}

private static class TestSearchContextWithRewriteAndCancellation extends TestSearchContext {

private TestSearchContextWithRewriteAndCancellation(
Expand Down

0 comments on commit 61cecff

Please sign in to comment.