Skip to content

Commit

Permalink
Improving comment and adding tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-buttner committed May 3, 2024
1 parent 9ec77ac commit 8e7ff2b
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public int hashCode() {
}

public static class Response extends ActionResponse implements ToXContentObject {
private static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats";
static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats";

private final ConnectionPoolStats connectionPoolStats;

Expand Down Expand Up @@ -92,69 +92,89 @@ public int hashCode() {
return Objects.hash(connectionPoolStats);
}

private static class ConnectionPoolStats implements ToXContentObject, Writeable {
private static final String AVAILABLE_CONNECTIONS = "available_connections";
private static final String LEASED_CONNECTIONS = "leased_connections";
private static final String MAX_CONNECTIONS = "max_connections";
private static final String PENDING_CONNECTIONS = "pending_connections";
ConnectionPoolStats getConnectionPoolStats() {
return connectionPoolStats;
}

static class ConnectionPoolStats implements ToXContentObject, Writeable {
static final String LEASED_CONNECTIONS = "leased_connections";
static final String PENDING_CONNECTIONS = "pending_connections";
static final String AVAILABLE_CONNECTIONS = "available_connections";
static final String MAX_CONNECTIONS = "max_connections";

public static ConnectionPoolStats of(PoolStats poolStats) {
return new ConnectionPoolStats(poolStats.getAvailable(), poolStats.getLeased(), poolStats.getMax(), poolStats.getPending());
static ConnectionPoolStats of(PoolStats poolStats) {
return new ConnectionPoolStats(poolStats.getLeased(), poolStats.getPending(), poolStats.getAvailable(), poolStats.getMax());
}

private final int availableConnections;
private final int leasedConnections;
private final int maxConnections;
private final int pendingConnections;
private final int availableConnections;
private final int maxConnections;

ConnectionPoolStats(int availableConnections, int leasedConnections, int maxConnections, int pendingConnections) {
this.availableConnections = availableConnections;
ConnectionPoolStats(int leasedConnections, int pendingConnections, int availableConnections, int maxConnections) {
this.leasedConnections = leasedConnections;
this.maxConnections = maxConnections;
this.pendingConnections = pendingConnections;
this.availableConnections = availableConnections;
this.maxConnections = maxConnections;
}

ConnectionPoolStats(StreamInput in) throws IOException {
this.availableConnections = in.readVInt();
this.leasedConnections = in.readVInt();
this.maxConnections = in.readVInt();
this.pendingConnections = in.readVInt();
this.availableConnections = in.readVInt();
this.maxConnections = in.readVInt();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(AVAILABLE_CONNECTIONS, availableConnections);
builder.field(LEASED_CONNECTIONS, leasedConnections);
builder.field(MAX_CONNECTIONS, maxConnections);
builder.field(PENDING_CONNECTIONS, pendingConnections);
builder.field(AVAILABLE_CONNECTIONS, availableConnections);
builder.field(MAX_CONNECTIONS, maxConnections);
builder.endObject();

return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(availableConnections);
out.writeVInt(leasedConnections);
out.writeVInt(maxConnections);
out.writeVInt(pendingConnections);
out.writeVInt(availableConnections);
out.writeVInt(maxConnections);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConnectionPoolStats that = (ConnectionPoolStats) o;
return availableConnections == that.availableConnections
&& leasedConnections == that.leasedConnections
&& maxConnections == that.maxConnections
&& pendingConnections == that.pendingConnections;
return leasedConnections == that.leasedConnections
&& pendingConnections == that.pendingConnections
&& availableConnections == that.availableConnections
&& maxConnections == that.maxConnections;
}

@Override
public int hashCode() {
return Objects.hash(availableConnections, leasedConnections, maxConnections, pendingConnections);
return Objects.hash(leasedConnections, pendingConnections, availableConnections, maxConnections);
}

int getLeasedConnections() {
return leasedConnections;
}

int getPendingConnections() {
return pendingConnections;
}

int getAvailableConnections() {
return availableConnections;
}

int getMaxConnections() {
return maxConnections;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.inference.action;

import org.apache.http.pool.PoolStats;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import org.hamcrest.CoreMatchers;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

public class GetInferenceStatsActionResponseTests extends AbstractBWCWireSerializationTestCase<GetInferenceStatsAction.Response> {

public static GetInferenceStatsAction.Response createRandom() {
return new GetInferenceStatsAction.Response(new PoolStats(randomInt(), randomInt(), randomInt(), randomInt()));
}

public void testToXContent() throws IOException {
var entity = new GetInferenceStatsAction.Response(new PoolStats(1, 2, 3, 4));
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
entity.toXContent(builder, null);
String xContentResult = org.elasticsearch.common.Strings.toString(builder);

assertThat(xContentResult, CoreMatchers.is("""
{"connection_pool_stats":{"leased_connections":1,"pending_connections":2,"available_connections":3,"max_connections":4}}"""));
}

@Override
protected Writeable.Reader<GetInferenceStatsAction.Response> instanceReader() {
return GetInferenceStatsAction.Response::new;
}

@Override
protected GetInferenceStatsAction.Response createTestInstance() {
return createRandom();
}

@Override
protected GetInferenceStatsAction.Response mutateInstance(GetInferenceStatsAction.Response instance) throws IOException {
var select = randomIntBetween(0, 3);
var connPoolStats = instance.getConnectionPoolStats();

return switch (select) {
case 0 -> new GetInferenceStatsAction.Response(
new PoolStats(
randomInt(),
connPoolStats.getPendingConnections(),
connPoolStats.getAvailableConnections(),
connPoolStats.getMaxConnections()
)
);
case 1 -> new GetInferenceStatsAction.Response(
new PoolStats(
connPoolStats.getLeasedConnections(),
randomInt(),
connPoolStats.getAvailableConnections(),
connPoolStats.getMaxConnections()
)
);
case 2 -> new GetInferenceStatsAction.Response(
new PoolStats(
connPoolStats.getLeasedConnections(),
connPoolStats.getPendingConnections(),
randomInt(),
connPoolStats.getMaxConnections()
)
);
case 3 -> new GetInferenceStatsAction.Response(
new PoolStats(
connPoolStats.getLeasedConnections(),
connPoolStats.getPendingConnections(),
connPoolStats.getAvailableConnections(),
randomInt()
)
);
default -> throw new UnsupportedEncodingException(Strings.format("Encountered unsupported case %s", select));
};
}

@Override
protected GetInferenceStatsAction.Response mutateInstanceForVersion(
GetInferenceStatsAction.Response instance,
TransportVersion version
) {
return instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ private static CloseableHttpAsyncClient createAsyncClient(PoolingNHttpClientConn
connections that are idle past the max idle time will be closed with the eviction thread executes. If that functionality proves
not to be sufficient we can add a keep-alive strategy to the builder below.
In my testing, setting a keep-alive didn't actually influence when the connection would be removed from the pool. Setting a low
keep alive forced later requests that occurred after the duration to recreate the connection. The stale connections would not be
removed from the pool until the eviction thread closes expired connections.
My understanding is that a connection marked as ready to be closed because of an elapsed keep-alive time will only be put into
expiry status when another request is made.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class HttpSettings {

public HttpSettings(Settings settings, ClusterService clusterService) {
Objects.requireNonNull(clusterService);
this.maxResponseSize = MAX_HTTP_RESPONSE_SIZE.get(Objects.requireNonNull(settings));
Objects.requireNonNull(settings);
maxResponseSize = MAX_HTTP_RESPONSE_SIZE.get(settings);

clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_HTTP_RESPONSE_SIZE, this::setMaxResponseSize);
}
Expand Down

0 comments on commit 8e7ff2b

Please sign in to comment.