Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threadpool wait time metric #9681

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand All @@ -94,4 +95,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
Original file line number Diff line number Diff line change
@@ -1,3 +1,30 @@
"Test cat thread_pool total_wait_time output":
- skip:
version: " - 3.0.0"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was supposed to be 2.99.99 as well....will fix this in the backport/bwc PRs 🤦‍♂️

reason: thread_pool total_wait_time stats were introduced in V_3.0.0

- do:
cat.thread_pool: {}

- match:
$body: |
/ #node_name name active queue rejected
^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
- do:
cat.thread_pool:
thread_pool_patterns: search,search_throttled,index_searcher,generic
h: name,total_wait_time,twt
v: true

- match:
$body: |
/^ id \s+ name \s+ total_wait_time \s+ twt \n
(\S+ \s+ search \s+ \d+s \s+ \d+ \n
\S+ \s+ search_throttled \s+ \d+s \s+ \d+ \n
\S+ \s+ index_searcher \s+ \d+s \s+ \d+ \n
\S+ \s+ generic \s+ -1 \s+ -1 \n)+ $/
---
"Test cat thread_pool output":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.search.stats;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
Expand All @@ -26,6 +28,8 @@
import org.opensearch.search.SearchService;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPoolStats;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -35,6 +39,7 @@
import java.util.function.Function;

import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

Expand Down Expand Up @@ -307,6 +312,54 @@ public void testAvgConcurrencyIndexLevel() throws InterruptedException {
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0);
}

public void testThreadPoolWaitTime() throws Exception {
int NUM_SHARDS = 1;
String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
createIndex(
INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);

ensureGreen();

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 10))
.execute()
.actionGet();

client().prepareSearch(INDEX).execute().actionGet();

NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder(
client().admin().cluster(),
NodesStatsAction.INSTANCE
).setNodesIds().all();
NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet();
ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool();

for (ThreadPoolStats.Stats stats : threadPoolStats) {
if (stats.getName().equals(ThreadPool.Names.INDEX_SEARCHER)) {
assertThat(stats.getWaitTime().micros(), greaterThan(0L));
}
}

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2))
.execute()
.actionGet();
}

public static class ScriptedDelayedPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,15 @@ protected Runnable wrapRunnable(Runnable command) {
protected Runnable unwrap(Runnable runnable) {
return contextHolder.unwrap(runnable);
}

/**
* Returns the cumulative wait time of the ThreadPool. If the ThreadPool does not support tracking the cumulative pool wait time
* then this should return -1 which will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats}.
* ThreadPools that do support this metric should override this method. For example, {@link QueueResizingOpenSearchThreadPoolExecutor}
* does so using the {@link TimedRunnable} to get the difference between Runnable creation and execution.
*
*/
public long getPoolWaitTimeNanos() {
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.common.util.concurrent;

import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +28,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch
private final ResizableBlockingQueue<Runnable> workQueue;
private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

/**
* Create new resizable at runtime thread pool executor
Expand Down Expand Up @@ -101,6 +103,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch
this.workQueue = workQueue;
this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0);
this.poolWaitTime = new CounterMetric();
}

@Override
Expand Down Expand Up @@ -156,6 +159,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());
}

/**
Expand All @@ -173,4 +177,9 @@ public synchronized int resize(int capacity) {
capacity
);
}

@Override
public long getPoolWaitTimeNanos() {
return poolWaitTime.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.unit.TimeValue;

import java.util.Locale;
Expand Down Expand Up @@ -66,6 +67,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);
Expand Down Expand Up @@ -97,6 +99,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
this.poolWaitTime = new CounterMetric();
logger.debug(
"thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
getName(),
Expand Down Expand Up @@ -190,6 +193,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());

if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
Expand Down Expand Up @@ -290,4 +294,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
}

@Override
public long getPoolWaitTimeNanos() {
return poolWaitTime.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ long getTotalExecutionNanos() {
return Math.max(finishTimeNanos - startTimeNanos, 1);
}

long getWaitTimeNanos() {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
if (startTimeNanos == -1) {
// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return Math.max(startTimeNanos - creationTimeNanos, 1);
jed326 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* If the task was failed or rejected, return true.
* Otherwise, false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ protected Table getTableWithHeader(final RestRequest request) {
table.addCell("rejected", "alias:r;default:true;text-align:right;desc:number of rejected tasks");
table.addCell("largest", "alias:l;default:false;text-align:right;desc:highest number of seen active threads");
table.addCell("completed", "alias:c;default:false;text-align:right;desc:number of completed tasks");
table.addCell(
"total_wait_time",
"alias:twt;default:false;text-align:right;desc:total time tasks spent waiting in thread_pool queue"
);
table.addCell("core", "alias:cr;default:false;text-align:right;desc:core number of threads in a scaling thread pool");
table.addCell("max", "alias:mx;default:false;text-align:right;desc:maximum number of threads in a scaling thread pool");
table.addCell("size", "alias:sz;default:false;text-align:right;desc:number of threads in a fixed thread pool");
Expand Down Expand Up @@ -267,6 +271,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
table.addCell(poolStats == null ? null : poolStats.getRejected());
table.addCell(poolStats == null ? null : poolStats.getLargest());
table.addCell(poolStats == null ? null : poolStats.getCompleted());
table.addCell(poolStats == null ? null : poolStats.getWaitTime());
table.addCell(core);
table.addCell(max);
table.addCell(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,21 @@ public ThreadPoolStats stats() {
long rejected = -1;
int largest = -1;
long completed = -1;
if (holder.executor() instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor();
long waitTimeNanos = -1;
if (holder.executor() instanceof OpenSearchThreadPoolExecutor) {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor();
threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
active = threadPoolExecutor.getActiveCount();
largest = threadPoolExecutor.getLargestPoolSize();
completed = threadPoolExecutor.getCompletedTaskCount();
waitTimeNanos = threadPoolExecutor.getPoolWaitTimeNanos();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed));
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos));
}
return new ThreadPoolStats(stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.threadpool;

import org.opensearch.Version;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -65,15 +67,17 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable<S
private final long rejected;
private final int largest;
private final long completed;
private final long waitTimeNanos;

public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed) {
public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed, long waitTimeNanos) {
this.name = name;
this.threads = threads;
this.queue = queue;
this.active = active;
this.rejected = rejected;
this.largest = largest;
this.completed = completed;
this.waitTimeNanos = waitTimeNanos;
}

public Stats(StreamInput in) throws IOException {
Expand All @@ -84,6 +88,7 @@ public Stats(StreamInput in) throws IOException {
rejected = in.readLong();
largest = in.readInt();
completed = in.readLong();
waitTimeNanos = in.getVersion().onOrAfter(Version.V_3_0_0) ? in.readLong() : -1;
}

@Override
Expand All @@ -95,6 +100,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(rejected);
out.writeInt(largest);
out.writeLong(completed);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(waitTimeNanos);
}
}

public String getName() {
Expand Down Expand Up @@ -125,6 +133,14 @@ public long getCompleted() {
return this.completed;
}

public TimeValue getWaitTime() {
return TimeValue.timeValueNanos(waitTimeNanos);
}

public long getWaitTimeNanos() {
return waitTimeNanos;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name);
Expand All @@ -146,6 +162,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (completed != -1) {
builder.field(Fields.COMPLETED, completed);
}
if (waitTimeNanos != -1) {
if (builder.humanReadable()) {
builder.field(Fields.WAIT_TIME, getWaitTime());
}
builder.field(Fields.WAIT_TIME_NANOS, getWaitTimeNanos());
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -197,6 +219,8 @@ static final class Fields {
static final String REJECTED = "rejected";
static final String LARGEST = "largest";
static final String COMPLETED = "completed";
static final String WAIT_TIME = "total_wait_time";
jed326 marked this conversation as resolved.
Show resolved Hide resolved
static final String WAIT_TIME_NANOS = "total_wait_time_in_nanos";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
randomIntBetween(1, 1000),
randomNonNegativeLong(),
randomIntBetween(1, 1000),
randomIntBetween(1, 1000)
randomIntBetween(1, 1000),
randomIntBetween(-1, 10)
)
);
}
Expand Down
Loading