From 8e7ff2b86b1e8edc1f9db11ca2238c593b968ac0 Mon Sep 17 00:00:00 2001 From: Jonathan Buttner Date: Fri, 3 May 2024 16:27:45 -0400 Subject: [PATCH] Improving comment and adding tests --- .../action/GetInferenceStatsAction.java | 68 ++++++++----- .../GetInferenceStatsActionResponseTests.java | 98 +++++++++++++++++++ .../inference/external/http/HttpClient.java | 4 + .../inference/external/http/HttpSettings.java | 3 +- 4 files changed, 148 insertions(+), 25 deletions(-) create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/GetInferenceStatsActionResponseTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/action/GetInferenceStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/action/GetInferenceStatsAction.java index a497ed1198b62..95f479b2e8cc0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/action/GetInferenceStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/action/GetInferenceStatsAction.java @@ -52,7 +52,7 @@ public int hashCode() { } public static class Response extends ActionResponse implements ToXContentObject { - private static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats"; + static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats"; private final ConnectionPoolStats connectionPoolStats; @@ -92,42 +92,46 @@ public int hashCode() { return Objects.hash(connectionPoolStats); } - private static class ConnectionPoolStats implements ToXContentObject, Writeable { - private static final String AVAILABLE_CONNECTIONS = "available_connections"; - private static final String LEASED_CONNECTIONS = "leased_connections"; - private static final String MAX_CONNECTIONS = "max_connections"; - private static final String PENDING_CONNECTIONS = "pending_connections"; + ConnectionPoolStats getConnectionPoolStats() { + return connectionPoolStats; + } + + static class ConnectionPoolStats implements ToXContentObject, Writeable { + static final String LEASED_CONNECTIONS = "leased_connections"; + static final String PENDING_CONNECTIONS = "pending_connections"; + static final String AVAILABLE_CONNECTIONS = "available_connections"; + static final String MAX_CONNECTIONS = "max_connections"; - public static ConnectionPoolStats of(PoolStats poolStats) { - return new ConnectionPoolStats(poolStats.getAvailable(), poolStats.getLeased(), poolStats.getMax(), poolStats.getPending()); + static ConnectionPoolStats of(PoolStats poolStats) { + return new ConnectionPoolStats(poolStats.getLeased(), poolStats.getPending(), poolStats.getAvailable(), poolStats.getMax()); } - private final int availableConnections; private final int leasedConnections; - private final int maxConnections; private final int pendingConnections; + private final int availableConnections; + private final int maxConnections; - ConnectionPoolStats(int availableConnections, int leasedConnections, int maxConnections, int pendingConnections) { - this.availableConnections = availableConnections; + ConnectionPoolStats(int leasedConnections, int pendingConnections, int availableConnections, int maxConnections) { this.leasedConnections = leasedConnections; - this.maxConnections = maxConnections; this.pendingConnections = pendingConnections; + this.availableConnections = availableConnections; + this.maxConnections = maxConnections; } ConnectionPoolStats(StreamInput in) throws IOException { - this.availableConnections = in.readVInt(); this.leasedConnections = in.readVInt(); - this.maxConnections = in.readVInt(); this.pendingConnections = in.readVInt(); + this.availableConnections = in.readVInt(); + this.maxConnections = in.readVInt(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(AVAILABLE_CONNECTIONS, availableConnections); builder.field(LEASED_CONNECTIONS, leasedConnections); - builder.field(MAX_CONNECTIONS, maxConnections); builder.field(PENDING_CONNECTIONS, pendingConnections); + builder.field(AVAILABLE_CONNECTIONS, availableConnections); + builder.field(MAX_CONNECTIONS, maxConnections); builder.endObject(); return builder; @@ -135,10 +139,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(availableConnections); out.writeVInt(leasedConnections); - out.writeVInt(maxConnections); out.writeVInt(pendingConnections); + out.writeVInt(availableConnections); + out.writeVInt(maxConnections); } @Override @@ -146,15 +150,31 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ConnectionPoolStats that = (ConnectionPoolStats) o; - return availableConnections == that.availableConnections - && leasedConnections == that.leasedConnections - && maxConnections == that.maxConnections - && pendingConnections == that.pendingConnections; + return leasedConnections == that.leasedConnections + && pendingConnections == that.pendingConnections + && availableConnections == that.availableConnections + && maxConnections == that.maxConnections; } @Override public int hashCode() { - return Objects.hash(availableConnections, leasedConnections, maxConnections, pendingConnections); + return Objects.hash(leasedConnections, pendingConnections, availableConnections, maxConnections); + } + + int getLeasedConnections() { + return leasedConnections; + } + + int getPendingConnections() { + return pendingConnections; + } + + int getAvailableConnections() { + return availableConnections; + } + + int getMaxConnections() { + return maxConnections; } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/GetInferenceStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/GetInferenceStatsActionResponseTests.java new file mode 100644 index 0000000000000..ae884b9917cc8 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/GetInferenceStatsActionResponseTests.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.inference.action; + +import org.apache.http.pool.PoolStats; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Strings; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; +import org.hamcrest.CoreMatchers; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +public class GetInferenceStatsActionResponseTests extends AbstractBWCWireSerializationTestCase { + + public static GetInferenceStatsAction.Response createRandom() { + return new GetInferenceStatsAction.Response(new PoolStats(randomInt(), randomInt(), randomInt(), randomInt())); + } + + public void testToXContent() throws IOException { + var entity = new GetInferenceStatsAction.Response(new PoolStats(1, 2, 3, 4)); + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + entity.toXContent(builder, null); + String xContentResult = org.elasticsearch.common.Strings.toString(builder); + + assertThat(xContentResult, CoreMatchers.is(""" + {"connection_pool_stats":{"leased_connections":1,"pending_connections":2,"available_connections":3,"max_connections":4}}""")); + } + + @Override + protected Writeable.Reader instanceReader() { + return GetInferenceStatsAction.Response::new; + } + + @Override + protected GetInferenceStatsAction.Response createTestInstance() { + return createRandom(); + } + + @Override + protected GetInferenceStatsAction.Response mutateInstance(GetInferenceStatsAction.Response instance) throws IOException { + var select = randomIntBetween(0, 3); + var connPoolStats = instance.getConnectionPoolStats(); + + return switch (select) { + case 0 -> new GetInferenceStatsAction.Response( + new PoolStats( + randomInt(), + connPoolStats.getPendingConnections(), + connPoolStats.getAvailableConnections(), + connPoolStats.getMaxConnections() + ) + ); + case 1 -> new GetInferenceStatsAction.Response( + new PoolStats( + connPoolStats.getLeasedConnections(), + randomInt(), + connPoolStats.getAvailableConnections(), + connPoolStats.getMaxConnections() + ) + ); + case 2 -> new GetInferenceStatsAction.Response( + new PoolStats( + connPoolStats.getLeasedConnections(), + connPoolStats.getPendingConnections(), + randomInt(), + connPoolStats.getMaxConnections() + ) + ); + case 3 -> new GetInferenceStatsAction.Response( + new PoolStats( + connPoolStats.getLeasedConnections(), + connPoolStats.getPendingConnections(), + connPoolStats.getAvailableConnections(), + randomInt() + ) + ); + default -> throw new UnsupportedEncodingException(Strings.format("Encountered unsupported case %s", select)); + }; + } + + @Override + protected GetInferenceStatsAction.Response mutateInstanceForVersion( + GetInferenceStatsAction.Response instance, + TransportVersion version + ) { + return instance; + } +} diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClient.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClient.java index 547e1616d6270..5ae137419b366 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClient.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClient.java @@ -72,6 +72,10 @@ private static CloseableHttpAsyncClient createAsyncClient(PoolingNHttpClientConn connections that are idle past the max idle time will be closed with the eviction thread executes. If that functionality proves not to be sufficient we can add a keep-alive strategy to the builder below. + In my testing, setting a keep-alive didn't actually influence when the connection would be removed from the pool. Setting a low + keep alive forced later requests that occurred after the duration to recreate the connection. The stale connections would not be + removed from the pool until the eviction thread closes expired connections. + My understanding is that a connection marked as ready to be closed because of an elapsed keep-alive time will only be put into expiry status when another request is made. diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpSettings.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpSettings.java index 4b9839386fbfb..ef5fec24c3d59 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpSettings.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpSettings.java @@ -31,7 +31,8 @@ public class HttpSettings { public HttpSettings(Settings settings, ClusterService clusterService) { Objects.requireNonNull(clusterService); - this.maxResponseSize = MAX_HTTP_RESPONSE_SIZE.get(Objects.requireNonNull(settings)); + Objects.requireNonNull(settings); + maxResponseSize = MAX_HTTP_RESPONSE_SIZE.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_HTTP_RESPONSE_SIZE, this::setMaxResponseSize); }