Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added idle tcp connection timeout #12974

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,10 @@ private void buildConnectionPolicy() {
// Check if the user passed additional gateway connection configuration
if (this.gatewayConnectionConfig != null) {
this.connectionPolicy.setMaxConnectionPoolSize(this.gatewayConnectionConfig.getMaxConnectionPoolSize());
// TODO(kuthapar): potential bug - when we expose requestTimeout from direct and gateway connection config,
// as gateway connection config will overwrite direct connection config settings
this.connectionPolicy.setRequestTimeout(this.gatewayConnectionConfig.getRequestTimeout());
this.connectionPolicy.setIdleConnectionTimeout(this.gatewayConnectionConfig.getIdleConnectionTimeout());
this.connectionPolicy.setIdleHttpConnectionTimeout(this.gatewayConnectionConfig.getIdleConnectionTimeout());
}
} else if (gatewayConnectionConfig != null) {
this.connectionPolicy = new ConnectionPolicy(gatewayConnectionConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public final class ConnectionPolicy {

private ConnectionMode connectionMode;
private boolean endpointDiscoveryEnabled;
private Duration idleConnectionTimeout;
private boolean multipleWriteRegionsEnabled;
private List<String> preferredRegions;
private boolean readRequestsFallbackEnabled;
Expand All @@ -38,19 +37,21 @@ public final class ConnectionPolicy {
private int maxConnectionPoolSize;
private Duration requestTimeout;
private ProxyOptions proxy;
private Duration idleHttpConnectionTimeout;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thumbs up. thanks.


// Direct connection config properties
private Duration connectTimeout;
private Duration idleEndpointTimeout;
private int maxConnectionsPerEndpoint;
private int maxRequestsPerConnection;
private Duration idleTcpConnectionTimeout;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed. thanks.


/**
* Constructor.
*/
public ConnectionPolicy(GatewayConnectionConfig gatewayConnectionConfig) {
this(ConnectionMode.GATEWAY);
this.idleConnectionTimeout = gatewayConnectionConfig.getIdleConnectionTimeout();
this.idleHttpConnectionTimeout = gatewayConnectionConfig.getIdleConnectionTimeout();
this.maxConnectionPoolSize = gatewayConnectionConfig.getMaxConnectionPoolSize();
this.requestTimeout = BridgeInternal.getRequestTimeoutFromGatewayConnectionConfig(gatewayConnectionConfig);
this.proxy = gatewayConnectionConfig.getProxy();
Expand All @@ -59,7 +60,7 @@ public ConnectionPolicy(GatewayConnectionConfig gatewayConnectionConfig) {
public ConnectionPolicy(DirectConnectionConfig directConnectionConfig) {
this(ConnectionMode.DIRECT);
this.connectTimeout = directConnectionConfig.getConnectTimeout();
this.idleConnectionTimeout = directConnectionConfig.getIdleConnectionTimeout();
this.idleTcpConnectionTimeout = directConnectionConfig.getIdleConnectionTimeout();
this.idleEndpointTimeout = directConnectionConfig.getIdleEndpointTimeout();
this.maxConnectionsPerEndpoint = directConnectionConfig.getMaxConnectionsPerEndpoint();
this.maxRequestsPerConnection = directConnectionConfig.getMaxRequestsPerConnection();
Expand Down Expand Up @@ -149,24 +150,54 @@ public ConnectionPolicy setMaxConnectionPoolSize(int maxConnectionPoolSize) {
}

/**
* Gets the value of the timeout for an idle connection, the default is 60
* Gets the value of the timeout for an idle http connection, the default is 60
* seconds.
*
* @return Idle connection timeout duration.
*/
public Duration getIdleConnectionTimeout() {
return this.idleConnectionTimeout;
public Duration getIdleHttpConnectionTimeout() {
return this.idleHttpConnectionTimeout;
}

/**
* sets the value of the timeout for an idle connection. After that time,
* sets the value of the timeout for an idle http connection. After that time,
* the connection will be automatically closed.
*
* @param idleConnectionTimeout the duration for an idle connection.
* @param idleHttpConnectionTimeout the duration for an idle connection.
* @return the ConnectionPolicy.
*/
public ConnectionPolicy setIdleConnectionTimeout(Duration idleConnectionTimeout) {
this.idleConnectionTimeout = idleConnectionTimeout;
public ConnectionPolicy setIdleHttpConnectionTimeout(Duration idleHttpConnectionTimeout) {
this.idleHttpConnectionTimeout = idleHttpConnectionTimeout;
return this;
}

/**
* Gets the idle tcp connection timeout for direct client
*
* Default value is {@link Duration#ZERO}
*
* Direct client doesn't close a single connection to an endpoint
* by default unless specified.
*
* @return idle tcp connection timeout
*/
public Duration getIdleTcpConnectionTimeout() {
return idleTcpConnectionTimeout;
}

/**
* Sets the idle tcp connection timeout
*
* Default value is {@link Duration#ZERO}
*
* Direct client doesn't close a single connection to an endpoint
* by default unless specified.
*
* @param idleTcpConnectionTimeout idle connection timeout
* @return the {@link ConnectionPolicy}
*/
public ConnectionPolicy setIdleTcpConnectionTimeout(Duration idleTcpConnectionTimeout) {
this.idleTcpConnectionTimeout = idleTcpConnectionTimeout;
return this;
}

Expand Down Expand Up @@ -451,7 +482,8 @@ public String toString() {
"requestTimeout=" + requestTimeout +
", connectionMode=" + connectionMode +
", maxConnectionPoolSize=" + maxConnectionPoolSize +
", idleConnectionTimeout=" + idleConnectionTimeout +
", idleHttpConnectionTimeout=" + idleHttpConnectionTimeout +
", idleTcpConnectionTimeout=" + idleTcpConnectionTimeout +
", userAgentSuffix='" + userAgentSuffix + '\'' +
", throttlingRetryOptions=" + throttlingRetryOptions +
", endpointDiscoveryEnabled=" + endpointDiscoveryEnabled +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ RxGatewayStoreModel createRxGatewayProxy(ISessionContainer sessionContainer,
private HttpClient httpClient() {

HttpClientConfig httpClientConfig = new HttpClientConfig(this.configs)
.withMaxIdleConnectionTimeout(this.connectionPolicy.getIdleConnectionTimeout())
.withMaxIdleConnectionTimeout(this.connectionPolicy.getIdleHttpConnectionTimeout())
.withPoolSize(this.connectionPolicy.getMaxConnectionPoolSize())
.withProxy(this.connectionPolicy.getProxy())
.withRequestTimeout(this.connectionPolicy.getRequestTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private Options(final ConnectionPolicy connectionPolicy) {
this.bufferPageSize = 8192;
this.connectionAcquisitionTimeout = Duration.ZERO;
this.connectTimeout = connectionPolicy.getConnectTimeout();
this.idleChannelTimeout = connectionPolicy.getIdleConnectionTimeout();
this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout();
this.idleEndpointTimeout = Duration.ofSeconds(70L);
this.maxBufferCapacity = 8192 << 10;
this.maxChannelsPerEndpoint = connectionPolicy.getMaxConnectionsPerEndpoint();
Expand Down Expand Up @@ -484,7 +484,7 @@ public Builder(ConnectionPolicy connectionPolicy) {
this.bufferPageSize = DEFAULT_OPTIONS.bufferPageSize;
this.connectionAcquisitionTimeout = DEFAULT_OPTIONS.connectionAcquisitionTimeout;
this.connectTimeout = connectionPolicy.getConnectTimeout();
this.idleChannelTimeout = connectionPolicy.getIdleConnectionTimeout();
this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout();
this.idleEndpointTimeout = DEFAULT_OPTIONS.idleEndpointTimeout;
this.maxBufferCapacity = DEFAULT_OPTIONS.maxBufferCapacity;
this.maxChannelsPerEndpoint = connectionPolicy.getMaxConnectionsPerEndpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -37,7 +36,7 @@ public void buildClient_withDefaultGatewayConnectionConfig() {
AsyncDocumentClient asyncDocumentClient =
CosmosBridgeInternal.getAsyncDocumentClient(cosmosClient);
ConnectionPolicy connectionPolicy = asyncDocumentClient.getConnectionPolicy();
assertThat(connectionPolicy.getConnectionMode().equals(ConnectionMode.GATEWAY));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.GATEWAY);
validateGatewayConnectionConfig(connectionPolicy, cosmosClientBuilder, gatewayConnectionConfig);
safeCloseSyncClient(cosmosClient);
}
Expand Down Expand Up @@ -66,7 +65,7 @@ public void buildClient_withCustomGatewayConnectionConfig() {
AsyncDocumentClient asyncDocumentClient =
CosmosBridgeInternal.getAsyncDocumentClient(cosmosClient);
ConnectionPolicy connectionPolicy = asyncDocumentClient.getConnectionPolicy();
assertThat(connectionPolicy.getConnectionMode().equals(ConnectionMode.GATEWAY));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.GATEWAY);
validateGatewayConnectionConfig(connectionPolicy, cosmosClientBuilder, gatewayConnectionConfig);
safeCloseSyncClient(cosmosClient);
}
Expand All @@ -84,7 +83,7 @@ public void buildClient_withDefaultDirectConnectionConfig() {
AsyncDocumentClient asyncDocumentClient =
CosmosBridgeInternal.getAsyncDocumentClient(cosmosClient);
ConnectionPolicy connectionPolicy = asyncDocumentClient.getConnectionPolicy();
assertThat(connectionPolicy.getConnectionMode().equals(ConnectionMode.DIRECT));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.DIRECT);
validateDirectConnectionConfig(connectionPolicy, cosmosClientBuilder, directConnectionConfig);
safeCloseSyncClient(cosmosClient);
}
Expand Down Expand Up @@ -113,7 +112,7 @@ public void buildClient_withCustomDirectConnectionConfig() {
AsyncDocumentClient asyncDocumentClient =
CosmosBridgeInternal.getAsyncDocumentClient(cosmosClient);
ConnectionPolicy connectionPolicy = asyncDocumentClient.getConnectionPolicy();
assertThat(connectionPolicy.getConnectionMode().equals(ConnectionMode.DIRECT));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.DIRECT);
validateDirectConnectionConfig(connectionPolicy, cosmosClientBuilder, directConnectionConfig);
safeCloseSyncClient(cosmosClient);
}
Expand All @@ -133,7 +132,7 @@ public void buildClient_withDirectAndGatewayConnectionConfig() {
AsyncDocumentClient asyncDocumentClient =
CosmosBridgeInternal.getAsyncDocumentClient(cosmosClient);
ConnectionPolicy connectionPolicy = asyncDocumentClient.getConnectionPolicy();
assertThat(connectionPolicy.getConnectionMode().equals(ConnectionMode.DIRECT));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.DIRECT);
validateDirectAndGatewayConnectionConfig(connectionPolicy, cosmosClientBuilder, directConnectionConfig, gatewayConnectionConfig);
safeCloseSyncClient(cosmosClient);
}
Expand All @@ -149,52 +148,52 @@ public void buildClient_withNoConnectionConfig() {
AsyncDocumentClient asyncDocumentClient =
CosmosBridgeInternal.getAsyncDocumentClient(cosmosClient);
ConnectionPolicy connectionPolicy = asyncDocumentClient.getConnectionPolicy();
assertThat(connectionPolicy.getConnectionMode().equals(ConnectionMode.DIRECT));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.DIRECT);
validateDirectConnectionConfig(connectionPolicy, cosmosClientBuilder, DirectConnectionConfig.getDefaultConfig());
safeCloseSyncClient(cosmosClient);
}

private void validateDirectAndGatewayConnectionConfig(ConnectionPolicy connectionPolicy, CosmosClientBuilder cosmosClientBuilder,
DirectConnectionConfig directConnectionConfig, GatewayConnectionConfig gatewayConnectionConfig) {
validateCommonConnectionConfig(connectionPolicy, cosmosClientBuilder);
assertThat(Objects.equals(connectionPolicy.getConnectionMode(), ConnectionMode.DIRECT));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.DIRECT);
validateDirectConfig(connectionPolicy, directConnectionConfig);
validateGatewayConfig(connectionPolicy, gatewayConnectionConfig);
}

private void validateGatewayConnectionConfig(ConnectionPolicy connectionPolicy, CosmosClientBuilder cosmosClientBuilder, GatewayConnectionConfig gatewayConnectionConfig) {
validateCommonConnectionConfig(connectionPolicy, cosmosClientBuilder);
assertThat(Objects.equals(connectionPolicy.getConnectionMode(), ConnectionMode.GATEWAY));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.GATEWAY);
validateGatewayConfig(connectionPolicy, gatewayConnectionConfig);
}

private void validateDirectConnectionConfig(ConnectionPolicy connectionPolicy, CosmosClientBuilder cosmosClientBuilder, DirectConnectionConfig directConnectionConfig) {
validateCommonConnectionConfig(connectionPolicy, cosmosClientBuilder);
assertThat(Objects.equals(connectionPolicy.getConnectionMode(), ConnectionMode.DIRECT));
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.DIRECT);
validateDirectConfig(connectionPolicy, directConnectionConfig);
}

private void validateCommonConnectionConfig(ConnectionPolicy connectionPolicy, CosmosClientBuilder cosmosClientBuilder) {
assertThat(Objects.equals(connectionPolicy.isMultipleWriteRegionsEnabled(), cosmosClientBuilder.isMultipleWriteRegionsEnabled()));
assertThat(Objects.equals(connectionPolicy.isEndpointDiscoveryEnabled(), cosmosClientBuilder.isEndpointDiscoveryEnabled()));
assertThat(Objects.equals(connectionPolicy.isReadRequestsFallbackEnabled(), cosmosClientBuilder.isReadRequestsFallbackEnabled()));
assertThat(Objects.equals(connectionPolicy.getPreferredRegions(), cosmosClientBuilder.getPreferredRegions()));
assertThat(Objects.equals(connectionPolicy.getThrottlingRetryOptions(), cosmosClientBuilder.getThrottlingRetryOptions()));
assertThat(Objects.equals(connectionPolicy.getUserAgentSuffix(), cosmosClientBuilder.getUserAgentSuffix()));
assertThat(connectionPolicy.isMultipleWriteRegionsEnabled()).isEqualTo(cosmosClientBuilder.isMultipleWriteRegionsEnabled());
assertThat(connectionPolicy.isEndpointDiscoveryEnabled()).isEqualTo(cosmosClientBuilder.isEndpointDiscoveryEnabled());
assertThat(connectionPolicy.isReadRequestsFallbackEnabled()).isEqualTo(cosmosClientBuilder.isReadRequestsFallbackEnabled());
assertThat(connectionPolicy.getPreferredRegions()).isEqualTo(cosmosClientBuilder.getPreferredRegions());
assertThat(connectionPolicy.getThrottlingRetryOptions()).isEqualTo(cosmosClientBuilder.getThrottlingRetryOptions());
assertThat(connectionPolicy.getUserAgentSuffix()).isEqualTo(cosmosClientBuilder.getUserAgentSuffix());
}

private void validateGatewayConfig(ConnectionPolicy connectionPolicy, GatewayConnectionConfig gatewayConnectionConfig) {
assertThat(Objects.equals(connectionPolicy.getIdleConnectionTimeout(), gatewayConnectionConfig.getIdleConnectionTimeout()));
assertThat(Objects.equals(connectionPolicy.getMaxConnectionPoolSize(), gatewayConnectionConfig.getMaxConnectionPoolSize()));
assertThat(Objects.equals(connectionPolicy.getRequestTimeout(), gatewayConnectionConfig.getRequestTimeout()));
assertThat(Objects.equals(connectionPolicy.getProxy(), gatewayConnectionConfig.getProxy()));
assertThat(connectionPolicy.getIdleHttpConnectionTimeout()).isEqualTo(gatewayConnectionConfig.getIdleConnectionTimeout());
assertThat(connectionPolicy.getMaxConnectionPoolSize()).isEqualTo(gatewayConnectionConfig.getMaxConnectionPoolSize());
assertThat(connectionPolicy.getRequestTimeout()).isEqualTo(gatewayConnectionConfig.getRequestTimeout());
assertThat(connectionPolicy.getProxy()).isEqualTo(gatewayConnectionConfig.getProxy());
}

private void validateDirectConfig(ConnectionPolicy connectionPolicy, DirectConnectionConfig directConnectionConfig) {
assertThat(Objects.equals(connectionPolicy.getConnectTimeout(), directConnectionConfig.getConnectTimeout()));
assertThat(Objects.equals(connectionPolicy.getIdleConnectionTimeout(), directConnectionConfig.getIdleConnectionTimeout()));
assertThat(Objects.equals(connectionPolicy.getIdleEndpointTimeout(), directConnectionConfig.getIdleEndpointTimeout()));
assertThat(Objects.equals(connectionPolicy.getMaxConnectionsPerEndpoint(), directConnectionConfig.getMaxConnectionsPerEndpoint()));
assertThat(Objects.equals(connectionPolicy.getMaxRequestsPerConnection(), directConnectionConfig.getMaxRequestsPerConnection()));
assertThat(connectionPolicy.getConnectTimeout()).isEqualTo(directConnectionConfig.getConnectTimeout());
assertThat(connectionPolicy.getIdleTcpConnectionTimeout()).isEqualTo(directConnectionConfig.getIdleConnectionTimeout());
assertThat(connectionPolicy.getIdleEndpointTimeout()).isEqualTo(directConnectionConfig.getIdleEndpointTimeout());
assertThat(connectionPolicy.getMaxConnectionsPerEndpoint()).isEqualTo(directConnectionConfig.getMaxConnectionsPerEndpoint());
assertThat(connectionPolicy.getMaxRequestsPerConnection()).isEqualTo(directConnectionConfig.getMaxRequestsPerConnection());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void afterClass() {
private void createContainerWithoutPk() throws URISyntaxException, IOException {
ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
HttpClientConfig httpClientConfig = new HttpClientConfig(new Configs())
.withMaxIdleConnectionTimeout(connectionPolicy.getIdleConnectionTimeout())
.withMaxIdleConnectionTimeout(connectionPolicy.getIdleHttpConnectionTimeout())
.withPoolSize(connectionPolicy.getMaxConnectionPoolSize())
.withProxy(connectionPolicy.getProxy())
.withRequestTimeout(connectionPolicy.getRequestTimeout());
Expand Down