From ef4146521641e7a1766a587ecb7ec70956210b8d Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Fri, 1 Sep 2023 10:31:07 -0400 Subject: [PATCH] Add threadpool wait time metric Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../search/stats/ConcurrentSearchStatsIT.java | 56 ++++++++++++++++++- .../OpenSearchThreadPoolExecutor.java | 10 ++++ ...ResizableOpenSearchThreadPoolExecutor.java | 9 +++ ...eResizingOpenSearchThreadPoolExecutor.java | 8 +++ .../common/util/concurrent/TimedRunnable.java | 8 +++ .../org/opensearch/threadpool/ThreadPool.java | 9 ++- .../threadpool/ThreadPoolStats.java | 21 ++++++- .../cluster/node/stats/NodeStatsTests.java | 1 + .../threadpool/ThreadPoolStatsTests.java | 24 ++++---- 10 files changed, 130 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4a060cc16504..7c00e1be89fa5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622)) - Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507)) - Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479)) +- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java index 352fe78b2680d..e6185a4394c9c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java @@ -8,6 +8,9 @@ package org.opensearch.search.stats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; @@ -23,18 +26,21 @@ import org.opensearch.script.ScriptType; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.ThreadPoolStats; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Locale; import java.util.Map; import java.util.function.Function; import static org.opensearch.index.query.QueryBuilders.scriptQuery; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 0) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) public class ConcurrentSearchStatsIT extends OpenSearchIntegTestCase { @Override @@ -127,6 +133,54 @@ public void testConcurrentQueryCount() throws Exception { ); } + public void testThreadPoolWaitTime() throws Exception { + int NUM_SHARDS = 1; + String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createIndex( + INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + + ensureGreen(); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get(); + refresh(); + } + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 10)) + .execute() + .actionGet(); + + client().prepareSearch(INDEX).execute().actionGet(); + + NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder( + client().admin().cluster(), + NodesStatsAction.INSTANCE + ).setNodesIds().all(); + NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet(); + ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool(); + + for (ThreadPoolStats.Stats stats : threadPoolStats) { + if (stats.getName().equals("index_searcher")) { + assertThat(stats.getPoolWaitTime().millis(), greaterThan(0L)); + } + } + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2)) + .execute() + .actionGet(); + } + public static class ScriptedDelayedPlugin extends MockScriptPlugin { static final String SCRIPT_NAME = "search_timeout"; diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index d967b7423ca80..6bb33421a9ea5 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -205,4 +205,14 @@ protected Runnable wrapRunnable(Runnable command) { protected Runnable unwrap(Runnable runnable) { return contextHolder.unwrap(runnable); } + + /** + * Thread pool wait time must be implemented by the Executor. For example, {@link QueueResizingOpenSearchThreadPoolExecutor} does so + * using the {@link TimedRunnable} to get the difference between Runnable creation and execution. + * Default implementation of -1 for unsupported executors will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats} + * + */ + public long getPoolWaitTime() { + return -1; + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java index 7a0ce8244efe4..0dd781c6d8a2b 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java @@ -9,6 +9,7 @@ package org.opensearch.common.util.concurrent; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.MeanMetric; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -27,6 +28,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch private final ResizableBlockingQueue workQueue; private final Function runnableWrapper; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final MeanMetric poolWaitTime; /** * Create new resizable at runtime thread pool executor @@ -101,6 +103,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch this.workQueue = workQueue; this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0); + this.poolWaitTime = new MeanMetric(); } @Override @@ -156,6 +159,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); } /** @@ -173,4 +177,9 @@ public synchronized int resize(int capacity) { capacity ); } + + @Override + public long getPoolWaitTime() { + return (long) poolWaitTime.mean(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java index 684dd7c9d8de5..62024a100dba6 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.MeanMetric; import org.opensearch.common.unit.TimeValue; import java.util.Locale; @@ -66,6 +67,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT private final int maxQueueSize; private final long targetedResponseTimeNanos; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final MeanMetric poolWaitTime; private final AtomicLong totalTaskNanos = new AtomicLong(0); private final AtomicInteger taskCount = new AtomicInteger(0); @@ -97,6 +99,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.poolWaitTime = new MeanMetric(); logger.debug( "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), @@ -190,6 +193,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); @@ -290,4 +294,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); } + @Override + public long getPoolWaitTime() { + return (long) poolWaitTime.mean(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java index f3bc50a33453b..2eb6657898008 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java @@ -107,6 +107,14 @@ long getTotalExecutionNanos() { return Math.max(finishTimeNanos - startTimeNanos, 1); } + long getWaitTimeNanos() { + if (startTimeNanos == -1) { + // There must have been an exception thrown, the total time is unknown (-1) + return -1; + } + return Math.max(startTimeNanos - creationTimeNanos, 1); + } + /** * If the task was failed or rejected, return true. * Otherwise, false. diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 6ddf3ff6b2f6a..446844aa020a5 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -383,8 +383,9 @@ public ThreadPoolStats stats() { long rejected = -1; int largest = -1; long completed = -1; - if (holder.executor() instanceof ThreadPoolExecutor) { - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor(); + long waitTime = -1; + if (holder.executor() instanceof OpenSearchThreadPoolExecutor) { + OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor(); threads = threadPoolExecutor.getPoolSize(); queue = threadPoolExecutor.getQueue().size(); active = threadPoolExecutor.getActiveCount(); @@ -393,9 +394,11 @@ public ThreadPoolStats stats() { RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) { rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected(); + + waitTime = threadPoolExecutor.getPoolWaitTime(); } } - stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed)); + stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTime)); } return new ThreadPoolStats(stats); } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java index b4d7e4a3fbf7a..a310903505867 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java @@ -32,6 +32,8 @@ package org.opensearch.threadpool; +import org.opensearch.Version; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -43,6 +45,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Stats for a threadpool @@ -65,8 +68,9 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L)); List copy = new ArrayList<>(stats); Collections.sort(copy); @@ -79,11 +79,11 @@ public void testThreadPoolStatsToXContent() throws IOException { try (BytesStreamOutput os = new BytesStreamOutput()) { List stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, 0L)); ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats); try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {