diff --git a/docs/reference/cluster/nodes-stats.asciidoc b/docs/reference/cluster/nodes-stats.asciidoc index 253890cd2a175..bd2b8ffad7590 100644 --- a/docs/reference/cluster/nodes-stats.asciidoc +++ b/docs/reference/cluster/nodes-stats.asciidoc @@ -1899,6 +1899,30 @@ Size of TX packets sent by the node during internal cluster communication. (integer) Size, in bytes, of TX packets sent by the node during internal cluster communication. + +`handling_time_histogram`:: +(array) +The distribution of the time spent handling a message on a transport thread, +represented as a histogram. ++ +.Properties of `handling_time_histogram` +[%collapsible] +======= +`ge_millis`:: +(integer) +The inclusive lower bound of the bucket in milliseconds. Omitted on the first +bucket since this bucket has no lower bound. + +`lt_millis`:: +(integer) +The exclusive upper bound of the bucket in milliseconds. Omitted on the last +bucket since this bucket has no upper bound. + +`count`:: +(integer) +The number of times a transport thread took a period of time within the bounds +of this bucket to handle a message. +======= ====== [[cluster-nodes-stats-api-response-body-http]] diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/60_transport_stats.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/60_transport_stats.yml index 4f4b97bbcd521..bbfad5e60cec6 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/60_transport_stats.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/60_transport_stats.yml @@ -20,3 +20,33 @@ - gte: { nodes.$node_id.transport.tx_count: 0 } - gte: { nodes.$node_id.transport.rx_size_in_bytes: 0 } - gte: { nodes.$node_id.transport.tx_size_in_bytes: 0 } + +--- +"Transport handling time histogram": + - skip: + version: " - 8.0.99" + reason: "handling_time_histogram was added in 8.1" + features: [arbitrary_key] + + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.stats: + metric: [ transport ] + + - length: { nodes.$node_id.transport.handling_time_histogram: 11 } + + - gte: { nodes.$node_id.transport.handling_time_histogram.0.count: 0 } + - is_false: nodes.$node_id.transport.handling_time_histogram.0.ge_millis + - match: { nodes.$node_id.transport.handling_time_histogram.0.lt_millis: 100 } + + - gte: { nodes.$node_id.transport.handling_time_histogram.1.count: 0 } + - match: { nodes.$node_id.transport.handling_time_histogram.1.ge_millis: 100 } + - match: { nodes.$node_id.transport.handling_time_histogram.1.lt_millis: 300 } + + - gte: { nodes.$node_id.transport.handling_time_histogram.10.count: 0 } + - match: { nodes.$node_id.transport.handling_time_histogram.10.ge_millis: 65536 } + - is_false: nodes.$node_id.transport.handling_time_histogram.10.lt_millis diff --git a/server/src/main/java/org/elasticsearch/common/network/HandlingTimeTracker.java b/server/src/main/java/org/elasticsearch/common/network/HandlingTimeTracker.java new file mode 100644 index 0000000000000..96050b31a967a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/network/HandlingTimeTracker.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.network; + +import java.util.concurrent.atomic.LongAdder; + +/** + * Tracks how long message handling takes on a transport thread as a histogram with fixed buckets. + */ +public class HandlingTimeTracker { + + public static int[] getBucketUpperBounds() { + // Default clock resolution is 200ms so we have buckets for the 0-tick and 1-tick cases, then go up in powers of two + return new int[] { 100, 300, 1 << 9, 1 << 10, 1 << 11, 1 << 12, 1 << 13, 1 << 14, 1 << 15, 1 << 16 }; + } + + private static int getBucket(long handlingTimeMillis) { + if (handlingTimeMillis < 100L) { + return 0; + } else if (handlingTimeMillis < 300L) { + return 1; + } else if (1L << 16 <= handlingTimeMillis) { + return BUCKET_COUNT - 1; + } else { + return Long.SIZE - 7 - Long.numberOfLeadingZeros(handlingTimeMillis); + } + } + + public static final int BUCKET_COUNT = getBucketUpperBounds().length + 1; + + private final LongAdder[] buckets; + + public HandlingTimeTracker() { + buckets = new LongAdder[BUCKET_COUNT]; + for (int i = 0; i < BUCKET_COUNT; i++) { + buckets[i] = new LongAdder(); + } + } + + public void addHandlingTime(long handlingTimeMillis) { + buckets[getBucket(handlingTimeMillis)].increment(); + } + + /** + * @return An array of frequencies of handling times in buckets with upper bounds as returned by {@link #getBucketUpperBounds()}, plus + * an extra bucket for handling times longer than the longest upper bound. + */ + public long[] getHistogram() { + final long[] histogram = new long[BUCKET_COUNT]; + for (int i = 0; i < BUCKET_COUNT; i++) { + histogram[i] = buckets[i].longValue(); + } + return histogram; + } + +} diff --git a/server/src/main/java/org/elasticsearch/common/network/NetworkService.java b/server/src/main/java/org/elasticsearch/common/network/NetworkService.java index f816d9446ae9b..25c6aeea4e2db 100644 --- a/server/src/main/java/org/elasticsearch/common/network/NetworkService.java +++ b/server/src/main/java/org/elasticsearch/common/network/NetworkService.java @@ -90,11 +90,16 @@ public interface CustomNameResolver { } private final List customNameResolvers; + private final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker(); public NetworkService(List customNameResolvers) { this.customNameResolvers = Objects.requireNonNull(customNameResolvers, "customNameResolvers must be non null"); } + public HandlingTimeTracker getHandlingTimeTracker() { + return handlingTimeTracker; + } + /** * Resolves {@code bindHosts} to a list of internet addresses. The list will * not contain duplicate addresses. diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 5db7dbdf4f34c..43a167258c792 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -360,6 +360,7 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException()); } finally { final long took = threadPool.relativeTimeInMillis() - startTime; + networkService.getHandlingTimeTracker().addHandlingTime(took); final long logThreshold = slowLogThresholdMs; if (logThreshold > 0 && took > logThreshold) { logger.warn( diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index e5e27b059426f..9f8e88970d24a 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -41,6 +42,7 @@ public class InboundHandler { private final TransportHandshaker handshaker; private final TransportKeepAlive keepAlive; private final Transport.ResponseHandlers responseHandlers; + private final HandlingTimeTracker handlingTimeTracker; private final Transport.RequestHandlers requestHandlers; private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER; @@ -54,7 +56,8 @@ public class InboundHandler { TransportHandshaker handshaker, TransportKeepAlive keepAlive, Transport.RequestHandlers requestHandlers, - Transport.ResponseHandlers responseHandlers + Transport.ResponseHandlers responseHandlers, + HandlingTimeTracker handlingTimeTracker ) { this.threadPool = threadPool; this.outboundHandler = outboundHandler; @@ -63,6 +66,7 @@ public class InboundHandler { this.keepAlive = keepAlive; this.requestHandlers = requestHandlers; this.responseHandlers = responseHandlers; + this.handlingTimeTracker = handlingTimeTracker; } void setMessageListener(TransportMessageListener listener) { @@ -157,6 +161,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st } } finally { final long took = threadPool.relativeTimeInMillis() - startTime; + handlingTimeTracker.addHandlingTime(took); final long logThreshold = slowLogThresholdMs; if (logThreshold > 0 && took > logThreshold) { if (isRequest) { diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index 6597a8462e57c..1e390310917a0 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.network.CloseableChannel; +import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.transport.TransportAddress; @@ -38,17 +39,26 @@ final class OutboundHandler { private final StatsTracker statsTracker; private final ThreadPool threadPool; private final Recycler recycler; + private final HandlingTimeTracker handlingTimeTracker; private volatile long slowLogThresholdMs = Long.MAX_VALUE; private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER; - OutboundHandler(String nodeName, Version version, StatsTracker statsTracker, ThreadPool threadPool, Recycler recycler) { + OutboundHandler( + String nodeName, + Version version, + StatsTracker statsTracker, + ThreadPool threadPool, + Recycler recycler, + HandlingTimeTracker handlingTimeTracker + ) { this.nodeName = nodeName; this.version = version; this.statsTracker = statsTracker; this.threadPool = threadPool; this.recycler = recycler; + this.handlingTimeTracker = handlingTimeTracker; } void setSlowLogThreshold(TimeValue slowLogThreshold) { @@ -199,6 +209,7 @@ private void maybeLogSlowMessage(boolean success) { final long logThreshold = slowLogThresholdMs; if (logThreshold > 0) { final long took = threadPool.relativeTimeInMillis() - startTime; + handlingTimeTracker.addHandlingTime(took); if (took > logThreshold) { logger.warn( "sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn " diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 66a631604d8df..d3366213d5962 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.CloseableChannel; +import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; @@ -140,7 +141,14 @@ public TcpTransport( String nodeName = Node.NODE_NAME_SETTING.get(settings); this.recycler = createRecycler(settings, pageCacheRecycler); - this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, recycler); + this.outboundHandler = new OutboundHandler( + nodeName, + version, + statsTracker, + threadPool, + recycler, + networkService.getHandlingTimeTracker() + ); this.handshaker = new TransportHandshaker( version, threadPool, @@ -164,7 +172,8 @@ public TcpTransport( handshaker, keepAlive, requestHandlers, - responseHandlers + responseHandlers, + networkService.getHandlingTimeTracker() ); } @@ -912,7 +921,9 @@ public final TransportStats getStats() { messagesReceived, bytesRead, messagesSent, - bytesWritten + bytesWritten, + networkService.getHandlingTimeTracker().getHistogram(), + HandlingTimeTracker.getBucketUpperBounds() ); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportStats.java b/server/src/main/java/org/elasticsearch/transport/TransportStats.java index 7caf3c241615c..b00acf1f40e90 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportStats.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportStats.java @@ -8,6 +8,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -25,14 +26,28 @@ public class TransportStats implements Writeable, ToXContentFragment { private final long rxSize; private final long txCount; private final long txSize; - - public TransportStats(long serverOpen, long totalOutboundConnections, long rxCount, long rxSize, long txCount, long txSize) { + private final long[] handlingTimeBucketFrequencies; + private final int[] handlingTimeBucketBounds; + + public TransportStats( + long serverOpen, + long totalOutboundConnections, + long rxCount, + long rxSize, + long txCount, + long txSize, + long[] handlingTimeBucketFrequencies, + int[] handlingTimeBucketBounds + ) { this.serverOpen = serverOpen; this.totalOutboundConnections = totalOutboundConnections; this.rxCount = rxCount; this.rxSize = rxSize; this.txCount = txCount; this.txSize = txSize; + this.handlingTimeBucketFrequencies = handlingTimeBucketFrequencies; + this.handlingTimeBucketBounds = handlingTimeBucketBounds; + assert assertHistogramConsistent(); } public TransportStats(StreamInput in) throws IOException { @@ -42,6 +57,14 @@ public TransportStats(StreamInput in) throws IOException { rxSize = in.readVLong(); txCount = in.readVLong(); txSize = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_8_1_0)) { + handlingTimeBucketFrequencies = in.readLongArray(); + handlingTimeBucketBounds = in.readIntArray(); + } else { + handlingTimeBucketFrequencies = new long[0]; + handlingTimeBucketBounds = new int[0]; + } + assert assertHistogramConsistent(); } @Override @@ -52,6 +75,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(rxSize); out.writeVLong(txCount); out.writeVLong(txSize); + if (out.getVersion().onOrAfter(Version.V_8_1_0)) { + out.writeLongArray(handlingTimeBucketFrequencies); + out.writeIntArray(handlingTimeBucketBounds); + } } public long serverOpen() { @@ -94,6 +121,29 @@ public ByteSizeValue getTxSize() { return txSize(); } + public long[] getHandlingTimeBucketFrequencies() { + final long[] histogram = new long[handlingTimeBucketFrequencies.length]; + System.arraycopy(handlingTimeBucketFrequencies, 0, histogram, 0, handlingTimeBucketFrequencies.length); + return histogram; + } + + public int[] getHandlingTimeBucketBounds() { + final int[] bounds = new int[handlingTimeBucketBounds.length]; + System.arraycopy(handlingTimeBucketBounds, 0, bounds, 0, handlingTimeBucketBounds.length); + return bounds; + } + + private boolean assertHistogramConsistent() { + if (handlingTimeBucketFrequencies.length == 0) { + // Stats came from before v8.1 + assert Version.CURRENT.major == Version.V_8_0_0.major; + assert handlingTimeBucketBounds.length == 0; + } else { + assert handlingTimeBucketFrequencies.length == handlingTimeBucketBounds.length + 1; + } + return true; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.TRANSPORT); @@ -103,6 +153,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, new ByteSizeValue(rxSize)); builder.field(Fields.TX_COUNT, txCount); builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, new ByteSizeValue(txSize)); + if (handlingTimeBucketFrequencies.length > 0) { + builder.startArray(Fields.HANDLING_TIME_HISTOGRAM); + for (int i = 0; i < handlingTimeBucketFrequencies.length; i++) { + builder.startObject(); + if (i > 0 && i <= handlingTimeBucketBounds.length) { + builder.field("ge_millis", handlingTimeBucketBounds[i - 1]); + } + if (i < handlingTimeBucketBounds.length) { + builder.field("lt_millis", handlingTimeBucketBounds[i]); + } + builder.field("count", handlingTimeBucketFrequencies[i]); + builder.endObject(); + } + builder.endArray(); + } else { + // Stats came from before v8.1 + assert Version.CURRENT.major == Version.V_8_0_0.major; + } builder.endObject(); return builder; } @@ -117,5 +185,6 @@ static final class Fields { static final String TX_COUNT = "tx_count"; static final String TX_SIZE = "tx_size"; static final String TX_SIZE_IN_BYTES = "tx_size_in_bytes"; + static final String HANDLING_TIME_HISTOGRAM = "handling_time_histogram"; } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 144c9f0843441..6889117c92cd5 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterStateUpdateStats; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.core.Tuple; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.http.HttpStats; @@ -47,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -238,6 +240,14 @@ public void testSerialization() throws IOException { assertEquals(nodeStats.getTransport().getServerOpen(), deserializedNodeStats.getTransport().getServerOpen()); assertEquals(nodeStats.getTransport().getTxCount(), deserializedNodeStats.getTransport().getTxCount()); assertEquals(nodeStats.getTransport().getTxSize(), deserializedNodeStats.getTransport().getTxSize()); + assertArrayEquals( + nodeStats.getTransport().getHandlingTimeBucketFrequencies(), + deserializedNodeStats.getTransport().getHandlingTimeBucketFrequencies() + ); + assertArrayEquals( + nodeStats.getTransport().getHandlingTimeBucketBounds(), + deserializedNodeStats.getTransport().getHandlingTimeBucketBounds() + ); } if (nodeStats.getHttp() == null) { assertNull(deserializedNodeStats.getHttp()); @@ -672,7 +682,9 @@ public static NodeStats createNodeStats() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong() + randomNonNegativeLong(), + IntStream.range(0, HandlingTimeTracker.BUCKET_COUNT).mapToLong(i -> randomNonNegativeLong()).toArray(), + IntStream.range(0, HandlingTimeTracker.BUCKET_COUNT - 1).map(i -> between(0, Integer.MAX_VALUE)).toArray() ) : null; HttpStats httpStats = null; diff --git a/server/src/test/java/org/elasticsearch/common/network/HandlingTimeTrackerTests.java b/server/src/test/java/org/elasticsearch/common/network/HandlingTimeTrackerTests.java new file mode 100644 index 0000000000000..d0f59912adc9e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/network/HandlingTimeTrackerTests.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.network; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.greaterThan; + +public class HandlingTimeTrackerTests extends ESTestCase { + + public void testHistogram() { + final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker(); + + assertArrayEquals(new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(0L); + assertArrayEquals(new long[] { 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(randomLongBetween(0L, 99L)); + assertArrayEquals(new long[] { 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(100L); + assertArrayEquals(new long[] { 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(299L); + assertArrayEquals(new long[] { 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(300L); + assertArrayEquals(new long[] { 2, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(65535L); + assertArrayEquals(new long[] { 2, 2, 1, 0, 0, 0, 0, 0, 0, 1, 0 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(65536L); + assertArrayEquals(new long[] { 2, 2, 1, 0, 0, 0, 0, 0, 0, 1, 1 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(Long.MAX_VALUE); + assertArrayEquals(new long[] { 2, 2, 1, 0, 0, 0, 0, 0, 0, 1, 2 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(randomLongBetween(65536L, Long.MAX_VALUE)); + assertArrayEquals(new long[] { 2, 2, 1, 0, 0, 0, 0, 0, 0, 1, 3 }, handlingTimeTracker.getHistogram()); + + handlingTimeTracker.addHandlingTime(randomLongBetween(Long.MIN_VALUE, 99L)); + assertArrayEquals(new long[] { 3, 2, 1, 0, 0, 0, 0, 0, 0, 1, 3 }, handlingTimeTracker.getHistogram()); + } + + public void testHistogramRandom() { + final int[] upperBounds = HandlingTimeTracker.getBucketUpperBounds(); + final long[] expectedCounts = new long[upperBounds.length + 1]; + final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker(); + for (int i = between(0, 1000); i > 0; i--) { + final int bucket = between(0, expectedCounts.length - 1); + expectedCounts[bucket] += 1; + + final int lowerBound = bucket == 0 ? 0 : upperBounds[bucket - 1]; + final int upperBound = bucket == upperBounds.length ? randomBoolean() ? 100000 : Integer.MAX_VALUE : upperBounds[bucket] - 1; + handlingTimeTracker.addHandlingTime(between(lowerBound, upperBound)); + } + + assertArrayEquals(expectedCounts, handlingTimeTracker.getHistogram()); + } + + public void testBoundsConsistency() { + final int[] upperBounds = HandlingTimeTracker.getBucketUpperBounds(); + assertThat(upperBounds[0], greaterThan(0)); + for (int i = 1; i < upperBounds.length; i++) { + assertThat(upperBounds[i], greaterThan(upperBounds[i - 1])); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index a482a6bd713eb..fda0090125ab8 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.core.TimeValue; @@ -69,7 +70,8 @@ public void setUp() throws Exception { version, new StatsTracker(), threadPool, - new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE) + new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE), + new HandlingTimeTracker() ); requestHandlers = new Transport.RequestHandlers(); responseHandlers = new Transport.ResponseHandlers(); @@ -80,7 +82,8 @@ public void setUp() throws Exception { handshaker, keepAlive, requestHandlers, - responseHandlers + responseHandlers, + new HandlingTimeTracker() ); } diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index 0100af921a247..183379cc50c8d 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -68,7 +69,7 @@ public void setUp() throws Exception { node = new DiscoveryNode("", transportAddress, Version.CURRENT); StatsTracker statsTracker = new StatsTracker(); compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); - handler = new OutboundHandler("node", Version.CURRENT, statsTracker, threadPool, recycler); + handler = new OutboundHandler("node", Version.CURRENT, statsTracker, threadPool, recycler, new HandlingTimeTracker()); final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime()); final InboundDecoder decoder = new InboundDecoder(Version.CURRENT, this.recycler); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 47252ceb8a124..814afba514217 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; @@ -541,7 +542,8 @@ private void testExceptionHandling( Version.CURRENT, new StatsTracker(), testThreadPool, - new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY)) + new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY)), + new HandlingTimeTracker() ) ); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java b/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java index 85a45cd7a691c..15b24b0a77e6f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java @@ -9,6 +9,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.threadpool.ThreadPool; @@ -24,7 +25,7 @@ public static TcpTransportChannel newFakeTcpTransportChannel( ) { BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE); return new TcpTransportChannel( - new OutboundHandler(nodeName, version, new StatsTracker(), threadPool, recycler), + new OutboundHandler(nodeName, version, new StatsTracker(), threadPool, recycler, new HandlingTimeTracker()), channel, action, requestId,