Skip to content

Commit

Permalink
Use enum field for HotThreads report type (#77812)
Browse files Browse the repository at this point in the history
Backport of #77462.

HotThreads report request type was declared as String, but used as an
enum where only few values were allowed. This PR changes the type to
enum allowing malformed requests to correctly report the error.

This PR also introduces a small refactor to make the report getter
function part of the accumulator class.
  • Loading branch information
grcevski authored Sep 16, 2021
1 parent 669a4fb commit 4176187
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.jvm.HotThreads;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher;

Expand Down Expand Up @@ -66,7 +67,7 @@ public void testHotThreadsDontFail() throws ExecutionException, InterruptedExcep
break;
}
assertThat(type, notNullValue());
nodesHotThreadsRequestBuilder.setType(type);
nodesHotThreadsRequestBuilder.setType(HotThreads.ReportType.of(type));
} else {
type = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.jvm.HotThreads;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class NodesHotThreadsRequest extends BaseNodesRequest<NodesHotThreadsRequest> {

int threads = 3;
String type = "cpu";
HotThreads.ReportType type = HotThreads.ReportType.CPU;
TimeValue interval = new TimeValue(500, TimeUnit.MILLISECONDS);
int snapshots = 10;
boolean ignoreIdleThreads = true;
Expand All @@ -29,7 +30,7 @@ public NodesHotThreadsRequest(StreamInput in) throws IOException {
super(in);
threads = in.readInt();
ignoreIdleThreads = in.readBoolean();
type = in.readString();
type = HotThreads.ReportType.of(in.readString());
interval = in.readTimeValue();
snapshots = in.readInt();
}
Expand Down Expand Up @@ -60,12 +61,12 @@ public NodesHotThreadsRequest ignoreIdleThreads(boolean ignoreIdleThreads) {
return this;
}

public NodesHotThreadsRequest type(String type) {
public NodesHotThreadsRequest type(HotThreads.ReportType type) {
this.type = type;
return this;
}

public String type() {
public HotThreads.ReportType type() {
return this.type;
}

Expand All @@ -92,7 +93,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(threads);
out.writeBoolean(ignoreIdleThreads);
out.writeString(type);
out.writeString(type.getTypeValue());
out.writeTimeValue(interval);
out.writeInt(snapshots);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.jvm.HotThreads;

public class NodesHotThreadsRequestBuilder
extends NodesOperationRequestBuilder<NodesHotThreadsRequest, NodesHotThreadsResponse, NodesHotThreadsRequestBuilder> {
Expand All @@ -29,7 +30,7 @@ public NodesHotThreadsRequestBuilder setIgnoreIdleThreads(boolean ignoreIdleThre
return this;
}

public NodesHotThreadsRequestBuilder setType(String type) {
public NodesHotThreadsRequestBuilder setType(HotThreads.ReportType type) {
request.type(type);
return this;
}
Expand Down
105 changes: 66 additions & 39 deletions server/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,42 @@ public class HotThreads {

private static final Object mutex = new Object();

private static final StackTraceElement[] EMPTY = new StackTraceElement[0];
private static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("date_optional_time");

private int busiestThreads = 3;
private TimeValue interval = new TimeValue(500, TimeUnit.MILLISECONDS);
private TimeValue threadElementsSnapshotDelay = new TimeValue(10);
private TimeValue threadElementsSnapshotDelay = new TimeValue(10, TimeUnit.MILLISECONDS);
private int threadElementsSnapshotCount = 10;
private String type = "cpu";
private ReportType type = ReportType.CPU;
private boolean ignoreIdleThreads = true;

public enum ReportType {

CPU("cpu"),
WAIT("wait"),
BLOCK("block");

private final String type;

ReportType(String type) {
this.type = type;
}

public String getTypeValue() {
return type;
}

public static ReportType of(String type) {
for (ReportType report : values()) {
if (report.type.equals(type)) {
return report;
}
}
throw new IllegalArgumentException("type not supported [" + type + "]");
}
}

public HotThreads interval(TimeValue interval) {
this.interval = interval;
return this;
Expand All @@ -65,12 +92,8 @@ public HotThreads threadElementsSnapshotCount(int threadElementsSnapshotCount) {
return this;
}

public HotThreads type(String type) {
if ("cpu".equals(type) || "wait".equals(type) || "block".equals(type)) {
this.type = type;
} else {
throw new IllegalArgumentException("type not supported [" + type + "]");
}
public HotThreads type(ReportType type) {
this.type = type;
return this;
}

Expand Down Expand Up @@ -134,7 +157,7 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti
sb.append(ignoreIdleThreads);
sb.append(":\n");

Map<Long, MyThreadInfo> threadInfos = new HashMap<>();
Map<Long, ThreadTimeAccumulator> threadInfos = new HashMap<>();
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (currentThreadId == threadId) {
Expand All @@ -148,7 +171,7 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti
if (info == null) {
continue;
}
threadInfos.put(threadId, new MyThreadInfo(cpu, info));
threadInfos.put(threadId, new ThreadTimeAccumulator(info, cpu));
}
Thread.sleep(interval.millis());
for (long threadId : threadBean.getAllThreadIds()) {
Expand All @@ -166,43 +189,25 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti
threadInfos.remove(threadId);
continue;
}
MyThreadInfo data = threadInfos.get(threadId);
ThreadTimeAccumulator data = threadInfos.get(threadId);
if (data != null) {
data.setDelta(cpu, info);
data.setDelta(info, cpu);
} else {
threadInfos.remove(threadId);
}
}
// sort by delta CPU time on thread.
List<MyThreadInfo> hotties = new ArrayList<>(threadInfos.values());
List<ThreadTimeAccumulator> hotties = new ArrayList<>(threadInfos.values());
final int busiestThreads = Math.min(this.busiestThreads, hotties.size());
// skip that for now
final ToLongFunction<MyThreadInfo> getter;
if ("cpu".equals(type)) {
getter = o -> {
assert o.cpuTime >= -1 : "cpu time should not be negative, but was " + o.cpuTime + ", thread info: " + o.info;
return o.cpuTime;
};
} else if ("wait".equals(type)) {
getter = o -> {
assert o.waitedTime >= -1 : "waited time should not be negative, but was " + o.waitedTime + ", thread info: " + o.info;
return o.waitedTime;
};
} else if ("block".equals(type)) {
getter = o -> {
assert o.blockedTime >= -1 : "blocked time should not be negative, but was " + o.blockedTime + ", thread info: " + o.info;
return o.blockedTime;
};
} else {
throw new IllegalArgumentException("expected thread type to be either 'cpu', 'wait', or 'block', but was " + type);
}
final ToLongFunction<ThreadTimeAccumulator> getter = ThreadTimeAccumulator.valueGetterForReportType(type);

CollectionUtil.introSort(hotties, Comparator.comparingLong(getter).reversed());

// analyse N stack traces for M busiest threads
long[] ids = new long[busiestThreads];
for (int i = 0; i < busiestThreads; i++) {
MyThreadInfo info = hotties.get(i);
ThreadTimeAccumulator info = hotties.get(i);
ids[i] = info.info.getThreadId();
}
ThreadInfo[][] allInfos = new ThreadInfo[threadElementsSnapshotCount][];
Expand Down Expand Up @@ -231,7 +236,7 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti
}
double percent = (((double) time) / interval.nanos()) * 100;
sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n",
percent, TimeValue.timeValueNanos(time), interval, type, threadName));
percent, TimeValue.timeValueNanos(time), interval, type.getTypeValue(), threadName));
// for each snapshot (2nd array index) find later snapshot for same thread with max number of
// identical StackTraceElements (starting from end of each)
boolean[] done = new boolean[threadElementsSnapshotCount];
Expand Down Expand Up @@ -276,8 +281,6 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti
return sb.toString();
}

private static final StackTraceElement[] EMPTY = new StackTraceElement[0];

int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) {
StackTraceElement[] s1 = threadInfo == null ? EMPTY : threadInfo.getStackTrace();
StackTraceElement[] s2 = threadInfo0 == null ? EMPTY : threadInfo0.getStackTrace();
Expand All @@ -293,7 +296,7 @@ int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) {
}


class MyThreadInfo {
static class ThreadTimeAccumulator {
long cpuTime;
long blockedCount;
long blockedTime;
Expand All @@ -302,7 +305,7 @@ class MyThreadInfo {
boolean deltaDone;
ThreadInfo info;

MyThreadInfo(long cpuTime, ThreadInfo info) {
ThreadTimeAccumulator(ThreadInfo info, long cpuTime) {
blockedCount = info.getBlockedCount();
blockedTime = info.getBlockedTime();
waitedCount = info.getWaitedCount();
Expand All @@ -311,7 +314,7 @@ class MyThreadInfo {
this.info = info;
}

void setDelta(long cpuTime, ThreadInfo info) {
void setDelta(ThreadInfo info, long cpuTime) {
if (deltaDone) throw new IllegalStateException("setDelta already called once");
blockedCount = info.getBlockedCount() - blockedCount;
blockedTime = info.getBlockedTime() - blockedTime;
Expand All @@ -321,5 +324,29 @@ void setDelta(long cpuTime, ThreadInfo info) {
deltaDone = true;
this.info = info;
}

static ToLongFunction<ThreadTimeAccumulator> valueGetterForReportType(ReportType type) {
switch (type) {
case CPU:
return o -> {
assert o.cpuTime >= -1 :
"cpu time should not be negative, but was " + o.cpuTime + ", thread info: " + o.info;
return o.cpuTime;
};
case WAIT:
return o -> {
assert o.waitedTime >= -1 :
"waited time should not be negative, but was " + o.waitedTime + ", thread info: " + o.info;
return o.waitedTime;
};
case BLOCK:
return o -> {
assert o.blockedTime >= -1 :
"blocked time should not be negative, but was " + o.blockedTime + ", thread info: " + o.info;
return o.blockedTime;
};
}
throw new IllegalArgumentException("expected thread type to be either 'cpu', 'wait', or 'block', but was " + type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.jvm.HotThreads;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -93,7 +94,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
NodesHotThreadsRequest nodesHotThreadsRequest = new NodesHotThreadsRequest(nodesIds);
nodesHotThreadsRequest.threads(request.paramAsInt("threads", nodesHotThreadsRequest.threads()));
nodesHotThreadsRequest.ignoreIdleThreads(request.paramAsBoolean("ignore_idle_threads", nodesHotThreadsRequest.ignoreIdleThreads()));
nodesHotThreadsRequest.type(request.param("type", nodesHotThreadsRequest.type()));
nodesHotThreadsRequest.type(HotThreads.ReportType.of(request.param("type", nodesHotThreadsRequest.type().getTypeValue())));
nodesHotThreadsRequest.interval(TimeValue.parseTimeValue(request.param("interval"), nodesHotThreadsRequest.interval(), "interval"));
nodesHotThreadsRequest.snapshots(request.paramAsInt("snapshots", nodesHotThreadsRequest.snapshots()));
nodesHotThreadsRequest.timeout(request.param("timeout"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.hotthreads;

import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.jvm.HotThreads;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class NodesHotThreadsRequestTests extends ESTestCase {

/** Simple override of BaseNodesRequest to ensure we read the
* common fields of the nodes request.
*/
static class NodesHotThreadsRequestHelper extends BaseNodesRequest<NodesHotThreadsRequestHelper> {
NodesHotThreadsRequestHelper(StreamInput in) throws IOException {
super(in);
}

NodesHotThreadsRequestHelper(String... nodesIds) {
super(nodesIds);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

public void testBWCSerialization() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();

TimeValue sampleInterval = new TimeValue(50, TimeUnit.MINUTES);

NodesHotThreadsRequestHelper outHelper = new NodesHotThreadsRequestHelper("123");

outHelper.writeTo(out);
// Write manually some values that differ from the defaults
// in NodesHotThreadsRequest
out.writeInt(4); // threads
out.writeBoolean(false); // ignoreIdleThreads
out.writeString("block"); // type
out.writeTimeValue(sampleInterval); // interval
out.writeInt(3); // snapshots

NodesHotThreadsRequest inRequest = new NodesHotThreadsRequest(out.bytes().streamInput());

assertEquals(4, inRequest.threads());
assertFalse(inRequest.ignoreIdleThreads());
assertEquals(HotThreads.ReportType.BLOCK, inRequest.type());
assertEquals(sampleInterval, inRequest.interval());
assertEquals(3, inRequest.snapshots());

// Change the report type enum
inRequest.type(HotThreads.ReportType.WAIT);

BytesStreamOutput writeOut = new BytesStreamOutput();
inRequest.writeTo(writeOut);

StreamInput whatWeWrote = writeOut.bytes().streamInput();

// We construct the helper to read the common serialized fields from the in.
new NodesHotThreadsRequestHelper(whatWeWrote);
// Make sure we serialized in the following format
assertEquals(4, whatWeWrote.readInt());
assertFalse(whatWeWrote.readBoolean());
assertEquals("wait", whatWeWrote.readString()); // lowercase enum value, not label
assertEquals(sampleInterval, whatWeWrote.readTimeValue());
assertEquals(3, whatWeWrote.readInt());
}
}
Loading

0 comments on commit 4176187

Please sign in to comment.