diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6c09d3c32f286..5b4d122a5ef6d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1157,7 +1157,8 @@ protected Node( searchModule.getFetchPhase(), responseCollectorService, circuitBreakerService, - searchModule.getIndexSearcherExecutor(threadPool) + searchModule.getIndexSearcherExecutor(threadPool), + tracer ); final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class) @@ -1791,7 +1792,8 @@ protected SearchService newSearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + Tracer tracer ) { return new SearchService( clusterService, @@ -1803,7 +1805,8 @@ protected SearchService newSearchService( fetchPhase, responseCollectorService, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + tracer ); } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 2c85fcbb25f35..f821d65afa097 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -61,6 +61,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.common.util.io.IOUtils; @@ -84,6 +85,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.Rewriteable; +import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.SearchOperationListener; @@ -135,6 +137,9 @@ import org.opensearch.search.sort.SortOrder; import org.opensearch.search.suggest.Suggest; import org.opensearch.search.suggest.completion.CompletionSuggestion; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.Scheduler.Cancellable; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; @@ -158,6 +163,7 @@ import static org.opensearch.common.unit.TimeValue.timeValueHours; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueMinutes; +import static org.opensearch.common.util.FeatureFlags.TELEMETRY; /** * The main search service @@ -318,6 +324,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final AtomicInteger openPitContexts = new AtomicInteger(); private final String sessionId = UUIDs.randomBase64UUID(); private final Executor indexSearcherExecutor; + private final Tracer tracer; public SearchService( ClusterService clusterService, @@ -329,7 +336,8 @@ public SearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + Tracer tracer ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -346,6 +354,7 @@ public SearchService( circuitBreakerService.getBreaker(CircuitBreaker.REQUEST) ); this.indexSearcherExecutor = indexSearcherExecutor; + this.tracer = tracer; TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings); setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings)); setPitKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_PIT_KEEPALIVE_SETTING.get(settings)); @@ -606,7 +615,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh SearchContext context = createContext(readerContext, request, task, true) ) { final long afterQueryTime; - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, tracer)) { loadOrExecuteQueryPhase(request, context); if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) { freeReaderContext(readerContext.id()); @@ -637,7 +646,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh } private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) { - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)) { + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, tracer, true, afterQueryTime)) { shortcutDocIdsToLoad(context); fetchPhase.execute(context); if (reader.singleSession()) { @@ -666,7 +675,7 @@ public void executeQueryPhase( final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer) ) { searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); @@ -690,7 +699,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, readerContext.setAggregatedDfs(request.dfs()); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer) ) { searchContext.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(searchContext); @@ -745,7 +754,7 @@ public void executeFetchPhase( final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer) ) { searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); @@ -776,7 +785,12 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs())); searchContext.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); try ( - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime()) + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor( + searchContext, + tracer, + true, + System.nanoTime() + ) ) { fetchPhase.execute(searchContext); if (readerContext.singleSession()) { @@ -1168,10 +1182,10 @@ private void checkKeepAliveLimit(long keepAlive) { if (keepAlive > maxKeepAlive) { throw new IllegalArgumentException( "Keep alive for request (" - + TimeValue.timeValueMillis(keepAlive) + + timeValueMillis(keepAlive) + ") is too large. " + "It must be less than (" - + TimeValue.timeValueMillis(maxKeepAlive) + + timeValueMillis(maxKeepAlive) + "). " + "This limit can be set by changing the [" + MAX_KEEPALIVE_SETTING.getKey() @@ -1187,10 +1201,10 @@ private void checkPitKeepAliveLimit(long keepAlive) { if (keepAlive > maxPitKeepAlive) { throw new IllegalArgumentException( "Keep alive for request (" - + TimeValue.timeValueMillis(keepAlive) + + timeValueMillis(keepAlive) + ") is too large. " + "It must be less than (" - + TimeValue.timeValueMillis(maxPitKeepAlive) + + timeValueMillis(maxPitKeepAlive) + "). " + "This limit can be set by changing the [" + MAX_PIT_KEEPALIVE_SETTING.getKey() @@ -1352,7 +1366,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc + "] index level setting." ); } - for (org.opensearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) { + for (SearchSourceBuilder.ScriptField field : source.scriptFields()) { FieldScript.Factory factory = scriptService.compile(field.script(), FieldScript.CONTEXT); SearchLookup lookup = context.getQueryShardContext().lookup(); FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), lookup); @@ -1657,29 +1671,20 @@ public IndicesService getIndicesService() { } /** - * Returns a builder for {@link InternalAggregation.ReduceContext}. This + * Returns a builder for {@link ReduceContext}. This * builder retains a reference to the provided {@link SearchSourceBuilder}. */ public InternalAggregation.ReduceContextBuilder aggReduceContextBuilder(SearchSourceBuilder searchSourceBuilder) { return new InternalAggregation.ReduceContextBuilder() { @Override - public InternalAggregation.ReduceContext forPartialReduction() { - return InternalAggregation.ReduceContext.forPartialReduction( - bigArrays, - scriptService, - () -> requestToPipelineTree(searchSourceBuilder) - ); + public ReduceContext forPartialReduction() { + return ReduceContext.forPartialReduction(bigArrays, scriptService, () -> requestToPipelineTree(searchSourceBuilder)); } @Override public ReduceContext forFinalReduction() { PipelineTree pipelineTree = requestToPipelineTree(searchSourceBuilder); - return InternalAggregation.ReduceContext.forFinalReduction( - bigArrays, - scriptService, - multiBucketConsumerService.create(), - pipelineTree - ); + return ReduceContext.forFinalReduction(bigArrays, scriptService, multiBucketConsumerService.create(), pipelineTree); } }; } @@ -1728,7 +1733,7 @@ public MinAndMax estimatedMinAndMax() { /** * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}. - * This is crucial for some implementations like {@link org.opensearch.index.search.stats.ShardSearchStats}. + * This is crucial for some implementations like {@link ShardSearchStats}. */ private static final class SearchOperationListenerExecutor implements AutoCloseable { private final SearchOperationListener listener; @@ -1737,20 +1742,28 @@ private static final class SearchOperationListenerExecutor implements AutoClosea private final boolean fetch; private long afterQueryTime = -1; private boolean closed = false; + private Span querySpan = null; + private Span fetchSpan = null; - SearchOperationListenerExecutor(SearchContext context) { - this(context, false, System.nanoTime()); + SearchOperationListenerExecutor(SearchContext context, Tracer tracer) { + this(context, tracer, false, System.nanoTime()); } - SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) { + SearchOperationListenerExecutor(SearchContext context, Tracer tracer, boolean fetch, long startTime) { this.listener = context.indexShard().getSearchOperationListener(); this.context = context; time = startTime; this.fetch = fetch; if (fetch) { listener.onPreFetchPhase(context); + if (FeatureFlags.isEnabled(TELEMETRY)) { + fetchSpan = tracer.startSpan(SpanBuilder.from("shardFetch", context)); + } } else { listener.onPreQueryPhase(context); + if (FeatureFlags.isEnabled(TELEMETRY)) { + querySpan = tracer.startSpan(SpanBuilder.from("shardQuery", context)); + } } } @@ -1766,14 +1779,26 @@ public void close() { if (afterQueryTime != -1) { if (fetch) { listener.onFetchPhase(context, afterQueryTime - time); + if (fetchSpan != null) { + fetchSpan.endSpan(); + } } else { listener.onQueryPhase(context, afterQueryTime - time); + if (querySpan != null) { + querySpan.endSpan(); + } } } else { if (fetch) { listener.onFailedFetchPhase(context); + if (fetchSpan != null) { + fetchSpan.endSpan(); + } } else { listener.onFailedQueryPhase(context); + if (querySpan != null) { + querySpan.endSpan(); + } } } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index cce051087eafa..f1868facba0f6 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2279,7 +2279,8 @@ public void onFailure(final Exception e) { new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService(), - null + null, + NoopTracer.INSTANCE ); SearchPhaseController searchPhaseController = new SearchPhaseController( writableRegistry(), diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index e6c7e21d5b3ea..c3f36f44ed409 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -155,7 +155,8 @@ protected SearchService newSearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + Tracer tracer ) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService( @@ -168,7 +169,8 @@ protected SearchService newSearchService( fetchPhase, responseCollectorService, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + tracer ); } return new MockSearchService( @@ -180,7 +182,8 @@ protected SearchService newSearchService( queryPhase, fetchPhase, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + tracer ); } diff --git a/test/framework/src/main/java/org/opensearch/search/MockSearchService.java b/test/framework/src/main/java/org/opensearch/search/MockSearchService.java index a0bbcb7be05f9..85a547239df10 100644 --- a/test/framework/src/main/java/org/opensearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/opensearch/search/MockSearchService.java @@ -42,6 +42,7 @@ import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.internal.ReaderContext; import org.opensearch.search.query.QueryPhase; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import java.util.HashMap; @@ -96,7 +97,8 @@ public MockSearchService( QueryPhase queryPhase, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + Tracer tracer ) { super( clusterService, @@ -108,7 +110,8 @@ public MockSearchService( fetchPhase, null, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + tracer ); }