diff --git a/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Main.java b/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Main.java index 130ead717577a..8dc20f7b64bc8 100644 --- a/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Main.java +++ b/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Main.java @@ -14,7 +14,7 @@ public class Main { public static void main(String[] args) throws Exception { org.apache.log4j.Logger.getLogger("io.netty").setLevel(org.apache.log4j.Level.OFF); - + AsyncBenchmark benchmark = null; try { LOGGER.debug("Parsing the arguments ..."); Configuration cfg = new Configuration(); @@ -27,7 +27,6 @@ public static void main(String[] args) throws Exception { return; } - AsyncBenchmark benchmark; switch (cfg.getOperationType()) { case WriteThroughput: case WriteLatency: @@ -67,13 +66,16 @@ public static void main(String[] args) throws Exception { LOGGER.info("Starting {}", cfg.getOperationType()); benchmark.run(); - benchmark.shutdown(); } catch (ParameterException e) { // if any error in parsing the cmd-line options print out the usage help System.err.println("INVALID Usage: " + e.getMessage()); System.err.println("Try '-help' for more information."); throw e; + } finally { + if (benchmark != null) { + benchmark.shutdown(); + } } } } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClient.java index 2b27fb9b14a3e..930a371e6a79a 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClient.java @@ -256,17 +256,17 @@ public Duration shutdownTimeout() { return this.shutdownTimeout; } - @Override - public String toString() { - return RntbdObjectMapper.toJson(this); + public UserAgentContainer userAgent() { + return this.userAgent; } // endregion // region Methods - public UserAgentContainer userAgent() { - return this.userAgent; + @Override + public String toString() { + return RntbdObjectMapper.toJson(this); } // endregion diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java index 9b7fadc09241e..bd5bcc38d3f7e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java @@ -96,9 +96,9 @@ protected void initChannel(final Channel channel) { checkNotNull(channel); final RntbdRequestManager requestManager = new RntbdRequestManager(this.healthChecker, this.config.maxRequestsPerChannel()); - final long readerIdleTime = this.config.receiveHangDetectionTime(); - final long writerIdleTime = this.config.sendHangDetectionTime(); - final long allIdleTime = this.config.idleConnectionTimeout(); + final long readerIdleTime = this.config.receiveHangDetectionTimeInNanos(); + final long writerIdleTime = this.config.sendHangDetectionTimeInNanos(); + final long allIdleTime = this.config.idleConnectionTimeoutInNanos(); final ChannelPipeline pipeline = channel.pipeline(); pipeline.addFirst( diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index 80cd5ee2b3069..39d09c241a0fc 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -69,12 +69,12 @@ public RntbdClientChannelHealthChecker(final Config config) { checkNotNull(config, "config: null"); - this.idleConnectionTimeout = config.idleConnectionTimeout(); + this.idleConnectionTimeout = config.idleConnectionTimeoutInNanos(); - this.readDelayLimit = config.receiveHangDetectionTime(); + this.readDelayLimit = config.receiveHangDetectionTimeInNanos(); checkArgument(this.readDelayLimit > readHangGracePeriod, "config.receiveHangDetectionTime: %s", this.readDelayLimit); - this.writeDelayLimit = config.sendHangDetectionTime(); + this.writeDelayLimit = config.sendHangDetectionTimeInNanos(); checkArgument(this.writeDelayLimit > writeHangGracePeriod, "config.sendHangDetectionTime: %s", this.writeDelayLimit); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelPool.java index 8e834d6b75bf2..96aa1053f6591 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -146,7 +146,7 @@ public void onTimeout(AcquireTask task) { } } - final long idleEndpointTimeout = config.idleEndpointTimeout(); + final long idleEndpointTimeout = config.idleEndpointTimeoutInNanos(); this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate( () -> { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdEndpoint.java index 33ebc20ec1242..6efe322a7aeb8 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdEndpoint.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdEndpoint.java @@ -105,19 +105,19 @@ public int bufferPageSize() { } @JsonProperty - public int connectionTimeout() { + public int connectionTimeoutInMillis() { final long value = this.options.connectionTimeout().toMillis(); assert value <= Integer.MAX_VALUE; return (int)value; } @JsonProperty - public long idleConnectionTimeout() { + public long idleConnectionTimeoutInNanos() { return this.options.idleChannelTimeout().toNanos(); } @JsonProperty - public long idleEndpointTimeout() { + public long idleEndpointTimeoutInNanos() { return this.options.idleEndpointTimeout().toNanos(); } @@ -137,22 +137,22 @@ public int maxRequestsPerChannel() { } @JsonProperty - public long receiveHangDetectionTime() { + public long receiveHangDetectionTimeInNanos() { return this.options.receiveHangDetectionTime().toNanos(); } @JsonProperty - public long requestTimeout() { + public long requestTimeoutInNanos() { return this.options.requestTimeout().toNanos(); } @JsonProperty - public long sendHangDetectionTime() { + public long sendHangDetectionTimeInNanos() { return this.options.sendHangDetectionTime().toNanos(); } @JsonProperty - public long shutdownTimeout() { + public long shutdownTimeoutInNanos() { return this.options.shutdownTimeout().toNanos(); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdObjectMapper.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdObjectMapper.java index ea9ca019879b2..57f776164b9a6 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdObjectMapper.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdObjectMapper.java @@ -11,7 +11,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.ser.PropertyFilter; import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; -import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.handler.codec.CorruptedFrameException; diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestTimer.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestTimer.java index a1e61b1817184..0af684d857b72 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestTimer.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestTimer.java @@ -13,6 +13,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static com.google.common.base.Strings.lenientFormat; + public final class RntbdRequestTimer implements AutoCloseable { private static final long FIVE_MILLISECONDS = 5000000L; @@ -22,11 +24,9 @@ public final class RntbdRequestTimer implements AutoCloseable { private final Timer timer; public RntbdRequestTimer(final long requestTimeout) { - // Inspection of the HashWheelTimer code indicates that our choice of a 5 millisecond timer resolution ensures // a request will expire within 10 milliseconds of the specified requestTimeout interval. This is because // cancellation of a timeout takes two timer resolution units to complete. - this.timer = new HashedWheelTimer(FIVE_MILLISECONDS, TimeUnit.NANOSECONDS); this.requestTimeout = requestTimeout; } @@ -37,13 +37,28 @@ public long getRequestTimeout(final TimeUnit unit) { @Override public void close() { + final Set timeouts = this.timer.stop(); - if (logger.isDebugEnabled()) { - final int count = timeouts.size(); - if (count > 0) { - logger.debug("request expiration tasks cancelled: {}", count); + final int count = timeouts.size(); + + if (count == 0) { + logger.debug("no outstanding request timeout tasks"); + return; + } + + logger.debug("stopping {} request timeout tasks", count); + + for (final Timeout timeout : timeouts) { + if (!timeout.isExpired()) { + try { + timeout.task().run(timeout); + } catch (Throwable error) { + logger.warn(lenientFormat("request timeout task failed due to ", error)); + } } } + + logger.debug("{} request timeout tasks stopped", count); } public Timeout newTimeout(final TimerTask task) { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java index 193ad69de503b..e29d654685186 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -48,7 +48,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint { // region Fields private static final String TAG_NAME = RntbdServiceEndpoint.class.getSimpleName(); - private static final long QUIET_PERIOD = 2L * 1_000_000_000L; // 2 seconds + private static final long QUIET_PERIOD = 2_000_000_000L; // 2 seconds private static final AtomicLong instanceCount = new AtomicLong(); private static final Logger logger = LoggerFactory.getLogger(RntbdServiceEndpoint.class); @@ -79,7 +79,7 @@ private RntbdServiceEndpoint( .group(group) .option(ChannelOption.ALLOCATOR, config.allocator()) .option(ChannelOption.AUTO_READ, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeout()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeoutInMillis()) .option(ChannelOption.RCVBUF_ALLOCATOR, receiveBufferAllocator) .option(ChannelOption.SO_KEEPALIVE, true) .remoteAddress(physicalAddress.getHost(), physicalAddress.getPort()); @@ -330,7 +330,7 @@ public Provider(final RntbdTransportClient transportClient, final Options option this.transportClient = transportClient; this.config = new Config(options, sslContext, wireLogLevel); - this.requestTimer = new RntbdRequestTimer(config.requestTimeout()); + this.requestTimer = new RntbdRequestTimer(config.requestTimeoutInNanos()); this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory); this.endpoints = new ConcurrentHashMap<>(); @@ -349,7 +349,7 @@ public void close() { endpoint.close(); } - this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeout(), NANOSECONDS) + this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeoutInNanos(), NANOSECONDS) .addListener(future -> { if (future.isSuccess()) { logger.debug("\n [{}]\n closed endpoints", this); @@ -357,6 +357,7 @@ public void close() { } logger.error("\n [{}]\n failed to close endpoints due to ", this, future.cause()); }); + return; } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClientTest.java index be84e94dfcb06..b6201b086f23e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClientTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClientTest.java @@ -914,7 +914,7 @@ static class Provider implements RntbdEndpoint.Provider { Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) { this.config = new Config(options, sslContext, LogLevel.WARN); - this.timer = new RntbdRequestTimer(config.requestTimeout()); + this.timer = new RntbdRequestTimer(config.requestTimeoutInNanos()); this.expected = expected; }