Skip to content

Commit

Permalink
Track histogram of transport handling times (#80581)
Browse files Browse the repository at this point in the history
Adds to the transport node stats a record of the distribution of the
times for which a transport thread was handling a message, represented
as a histogram.

Closes #80428
  • Loading branch information
DaveCTurner authored Nov 29, 2021
1 parent 3d0c9ef commit 54e0370
Show file tree
Hide file tree
Showing 15 changed files with 393 additions and 19 deletions.
48 changes: 48 additions & 0 deletions docs/reference/cluster/nodes-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,54 @@ Size of TX packets sent by the node during internal cluster communication.
(integer)
Size, in bytes, of TX packets sent by the node during internal cluster
communication.
`inbound_handling_time_histogram`::
(array)
The distribution of the time spent handling each inbound message on a transport
thread, represented as a histogram.
+
.Properties of `inbound_handling_time_histogram`
[%collapsible]
=======
`ge_millis`::
(integer)
The inclusive lower bound of the bucket in milliseconds. Omitted on the first
bucket since this bucket has no lower bound.

`lt_millis`::
(integer)
The exclusive upper bound of the bucket in milliseconds. Omitted on the last
bucket since this bucket has no upper bound.

`count`::
(integer)
The number of times a transport thread took a period of time within the bounds
of this bucket to handle an inbound message.
=======
`outbound_handling_time_histogram`::
(array)
The distribution of the time spent sending each outbound transport message on a
transport thread, represented as a histogram.
+
.Properties of `outbound_handling_time_histogram`
[%collapsible]
=======
`ge_millis`::
(integer)
The inclusive lower bound of the bucket in milliseconds. Omitted on the first
bucket since this bucket has no lower bound.

`lt_millis`::
(integer)
The exclusive upper bound of the bucket in milliseconds. Omitted on the last
bucket since this bucket has no upper bound.

`count`::
(integer)
The number of times a transport thread took a period of time within the bounds
of this bucket to send a transport message.
=======
======

[[cluster-nodes-stats-api-response-body-http]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,48 @@
- gte: { nodes.$node_id.transport.tx_count: 0 }
- gte: { nodes.$node_id.transport.rx_size_in_bytes: 0 }
- gte: { nodes.$node_id.transport.tx_size_in_bytes: 0 }

---
"Transport handling time histogram":
- skip:
version: " - 8.0.99"
reason: "handling_time_histograms were added in 8.1"
features: [arbitrary_key]

- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.stats:
metric: [ transport ]

- length: { nodes.$node_id.transport.inbound_handling_time_histogram: 18 }

- gte: { nodes.$node_id.transport.inbound_handling_time_histogram.0.count: 0 }
- is_false: nodes.$node_id.transport.inbound_handling_time_histogram.0.ge_millis
- match: { nodes.$node_id.transport.inbound_handling_time_histogram.0.lt_millis: 1 }

- gte: { nodes.$node_id.transport.inbound_handling_time_histogram.1.count: 0 }
- match: { nodes.$node_id.transport.inbound_handling_time_histogram.1.ge_millis: 1 }
- match: { nodes.$node_id.transport.inbound_handling_time_histogram.1.lt_millis: 2 }

- gte: { nodes.$node_id.transport.inbound_handling_time_histogram.17.count: 0 }
- match: { nodes.$node_id.transport.inbound_handling_time_histogram.17.ge_millis: 65536 }
- is_false: nodes.$node_id.transport.inbound_handling_time_histogram.17.lt_millis


- length: { nodes.$node_id.transport.outbound_handling_time_histogram: 18 }

- gte: { nodes.$node_id.transport.outbound_handling_time_histogram.0.count: 0 }
- is_false: nodes.$node_id.transport.outbound_handling_time_histogram.0.ge_millis
- match: { nodes.$node_id.transport.outbound_handling_time_histogram.0.lt_millis: 1 }

- gte: { nodes.$node_id.transport.outbound_handling_time_histogram.1.count: 0 }
- match: { nodes.$node_id.transport.outbound_handling_time_histogram.1.ge_millis: 1 }
- match: { nodes.$node_id.transport.outbound_handling_time_histogram.1.lt_millis: 2 }

- gte: { nodes.$node_id.transport.outbound_handling_time_histogram.17.count: 0 }
- match: { nodes.$node_id.transport.outbound_handling_time_histogram.17.ge_millis: 65536 }
- is_false: nodes.$node_id.transport.outbound_handling_time_histogram.17.lt_millis
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.network;

import java.util.concurrent.atomic.LongAdder;

/**
* Tracks how long message handling takes on a transport thread as a histogram with fixed buckets.
*/
public class HandlingTimeTracker {

public static int[] getBucketUpperBounds() {
int[] bounds = new int[17];
for (int i = 0; i < bounds.length; i++) {
bounds[i] = 1 << i;
}
return bounds;
}

private static int getBucket(long handlingTimeMillis) {
if (handlingTimeMillis <= 0) {
return 0;
} else if (LAST_BUCKET_LOWER_BOUND <= handlingTimeMillis) {
return BUCKET_COUNT - 1;
} else {
return Long.SIZE - Long.numberOfLeadingZeros(handlingTimeMillis);
}
}

public static final int BUCKET_COUNT = getBucketUpperBounds().length + 1;

private static final long LAST_BUCKET_LOWER_BOUND = getBucketUpperBounds()[BUCKET_COUNT - 2];

private final LongAdder[] buckets;

public HandlingTimeTracker() {
buckets = new LongAdder[BUCKET_COUNT];
for (int i = 0; i < BUCKET_COUNT; i++) {
buckets[i] = new LongAdder();
}
}

public void addHandlingTime(long handlingTimeMillis) {
buckets[getBucket(handlingTimeMillis)].increment();
}

/**
* @return An array of frequencies of handling times in buckets with upper bounds as returned by {@link #getBucketUpperBounds()}, plus
* an extra bucket for handling times longer than the longest upper bound.
*/
public long[] getHistogram() {
final long[] histogram = new long[BUCKET_COUNT];
for (int i = 0; i < BUCKET_COUNT; i++) {
histogram[i] = buckets[i].longValue();
}
return histogram;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,16 @@ public interface CustomNameResolver {
}

private final List<CustomNameResolver> customNameResolvers;
private final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker();

public NetworkService(List<CustomNameResolver> customNameResolvers) {
this.customNameResolvers = Objects.requireNonNull(customNameResolvers, "customNameResolvers must be non null");
}

public HandlingTimeTracker getHandlingTimeTracker() {
return handlingTimeTracker;
}

/**
* Resolves {@code bindHosts} to a list of internet addresses. The list will
* not contain duplicate addresses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,12 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) {
*/
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
httpClientStatsTracker.updateClientStats(httpRequest, httpChannel);
final long startTime = threadPool.relativeTimeInMillis();
final long startTime = threadPool.rawRelativeTimeInMillis();
try {
handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException());
} finally {
final long took = threadPool.relativeTimeInMillis() - startTime;
final long took = threadPool.rawRelativeTimeInMillis() - startTime;
networkService.getHandlingTimeTracker().addHandlingTime(took);
final long logThreshold = slowLogThresholdMs;
if (logThreshold > 0 && took > logThreshold) {
logger.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.HandlingTimeTracker;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
Expand All @@ -40,6 +41,7 @@ public class InboundHandler {
private final TransportHandshaker handshaker;
private final TransportKeepAlive keepAlive;
private final Transport.ResponseHandlers responseHandlers;
private final HandlingTimeTracker handlingTimeTracker;
private final Transport.RequestHandlers requestHandlers;

private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
Expand All @@ -53,7 +55,8 @@ public class InboundHandler {
TransportHandshaker handshaker,
TransportKeepAlive keepAlive,
Transport.RequestHandlers requestHandlers,
Transport.ResponseHandlers responseHandlers
Transport.ResponseHandlers responseHandlers,
HandlingTimeTracker handlingTimeTracker
) {
this.threadPool = threadPool;
this.outboundHandler = outboundHandler;
Expand All @@ -62,6 +65,7 @@ public class InboundHandler {
this.keepAlive = keepAlive;
this.requestHandlers = requestHandlers;
this.responseHandlers = responseHandlers;
this.handlingTimeTracker = handlingTimeTracker;
}

void setMessageListener(TransportMessageListener listener) {
Expand All @@ -77,7 +81,7 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
}

void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception {
final long startTime = threadPool.relativeTimeInMillis();
final long startTime = threadPool.rawRelativeTimeInMillis();
channel.getChannelStats().markAccessed(startTime);
TransportLogger.logInboundMessage(channel, message);

Expand Down Expand Up @@ -155,7 +159,8 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
}
}
} finally {
final long took = threadPool.relativeTimeInMillis() - startTime;
final long took = threadPool.rawRelativeTimeInMillis() - startTime;
handlingTimeTracker.addHandlingTime(took);
final long logThreshold = slowLogThresholdMs;
if (logThreshold > 0 && took > logThreshold) {
if (isRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.HandlingTimeTracker;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -37,17 +38,26 @@ final class OutboundHandler {
private final StatsTracker statsTracker;
private final ThreadPool threadPool;
private final Recycler<BytesRef> recycler;
private final HandlingTimeTracker handlingTimeTracker;

private volatile long slowLogThresholdMs = Long.MAX_VALUE;

private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;

OutboundHandler(String nodeName, Version version, StatsTracker statsTracker, ThreadPool threadPool, Recycler<BytesRef> recycler) {
OutboundHandler(
String nodeName,
Version version,
StatsTracker statsTracker,
ThreadPool threadPool,
Recycler<BytesRef> recycler,
HandlingTimeTracker handlingTimeTracker
) {
this.nodeName = nodeName;
this.version = version;
this.statsTracker = statsTracker;
this.threadPool = threadPool;
this.recycler = recycler;
this.handlingTimeTracker = handlingTimeTracker;
}

void setSlowLogThreshold(TimeValue slowLogThreshold) {
Expand Down Expand Up @@ -168,7 +178,7 @@ private void internalSend(
@Nullable OutboundMessage message,
ActionListener<Void> listener
) {
final long startTime = threadPool.relativeTimeInMillis();
final long startTime = threadPool.rawRelativeTimeInMillis();
channel.getChannelStats().markAccessed(startTime);
final long messageSize = reference.length();
TransportLogger.logOutboundMessage(channel, reference);
Expand Down Expand Up @@ -196,7 +206,8 @@ public void onFailure(Exception e) {
private void maybeLogSlowMessage(boolean success) {
final long logThreshold = slowLogThresholdMs;
if (logThreshold > 0) {
final long took = threadPool.relativeTimeInMillis() - startTime;
final long took = threadPool.rawRelativeTimeInMillis() - startTime;
handlingTimeTracker.addHandlingTime(took);
if (took > logThreshold) {
logger.warn(
"sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.HandlingTimeTracker;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
Expand Down Expand Up @@ -116,6 +117,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements

private final TransportHandshaker handshaker;
private final TransportKeepAlive keepAlive;
private final HandlingTimeTracker outboundHandlingTimeTracker = new HandlingTimeTracker();
private final OutboundHandler outboundHandler;
private final InboundHandler inboundHandler;
private final ResponseHandlers responseHandlers = new ResponseHandlers();
Expand All @@ -141,7 +143,7 @@ public TcpTransport(
String nodeName = Node.NODE_NAME_SETTING.get(settings);

this.recycler = createRecycler(settings, pageCacheRecycler);
this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, recycler);
this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, recycler, outboundHandlingTimeTracker);
this.handshaker = new TransportHandshaker(
version,
threadPool,
Expand All @@ -165,7 +167,8 @@ public TcpTransport(
handshaker,
keepAlive,
requestHandlers,
responseHandlers
responseHandlers,
networkService.getHandlingTimeTracker()
);
}

Expand Down Expand Up @@ -918,7 +921,9 @@ public final TransportStats getStats() {
messagesReceived,
bytesRead,
messagesSent,
bytesWritten
bytesWritten,
networkService.getHandlingTimeTracker().getHistogram(),
outboundHandlingTimeTracker.getHistogram()
);
}

Expand Down
Loading

0 comments on commit 54e0370

Please sign in to comment.