diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java index 55d031bf82372..92c1208cba8d6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -362,13 +362,9 @@ public Future acquire(final Promise promise) { if (this.executor.inEventLoop()) { this.acquireChannel(promise); } else { - - // TODO: moderakh ensure/validate in all conditions we run the tasks - // if the replica endpoint is not reachable don't queue up on executor if (pendingAcquisitions.size() > 1000) { addTaskToPendingAcquisitionQueue(promise); } else { - // TODO: moderakh if the endpoint is bad the timer, health check has to cancel all the pending requests this.executor.execute(() -> this.acquireChannel(promise)); } } @@ -531,6 +527,8 @@ private void acquireChannel(final Promise promise) { return; } + // make sure to retrieve the actual channel count to avoid establishing more + // TCP connections than allowed. final int channelCount = this.channels(false); if (channelCount < this.maxChannels) { @@ -563,9 +561,12 @@ private void acquireChannel(final Promise promise) { final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class); if (manager == null) { - logger.warn("Channel({}) closed", channel); + logger.warn("Channel({} --> {}) closed", channel, this.remoteAddress()); } else { final long pendingRequestCount = manager.pendingRequestCount(); + + // we accept the risk of reusing the channel even if more than maxPendingRequests are queued - by picking + // the channel with the least number of outstanding requests we load balance reasonably if (isChannelServiceable(channel, true) && pendingRequestCount < pendingRequestCountMin) { pendingRequestCountMin = pendingRequestCount; candidate = channel; @@ -579,6 +580,9 @@ private void acquireChannel(final Promise promise) { } } else { for (Channel channel : this.availableChannels) { + + // we pick the first available channel to avoid the additional cost of laod balancing + // as long as the load is lower than the load factor threshold above. if (isChannelServiceable(channel, true)) { if (this.availableChannels.remove(channel)) { this.doAcquireChannel(promise, channel); @@ -607,9 +611,6 @@ private void acquireChannel(final Promise promise) { * @see #runTasksInPendingAcquisitionQueue */ private void addTaskToPendingAcquisitionQueue(Promise promise) { - -// this.ensureInEventLoop(); - if (logger.isDebugEnabled()) { logger.debug("{}, {}, {}, {}, {}, {}", Instant.now(), @@ -621,11 +622,8 @@ private void addTaskToPendingAcquisitionQueue(Promise promise) { } if (this.pendingAcquisitions.size() >= this.maxPendingAcquisitions) { - promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS); - } else { - final AcquireTask acquireTask = new AcquireTask(this, promise); if (this.pendingAcquisitions.offer(acquireTask)) { @@ -707,6 +705,7 @@ private double computeLoadFactor() { } private void doAcquireChannel(final Promise promise, final Channel candidate) { + this.ensureInEventLoop(); acquiredChannels.put(candidate, candidate); final Promise anotherPromise = this.newChannelPromise(promise); @@ -931,9 +930,8 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise { - // reportIssueUnless(logger, v == null, this, "expected null channel, not {}", v); + this.acquiredChannels.compute(channel, (ignored, acquiredChannel) -> { reportIssueUnless(logger, acquiredChannel == null, this, "Channel({}) to be acquired has already been acquired", channel); @@ -947,6 +945,7 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise this.close0 -> this.pollChannel } + // Only return channels as servicable here if less than maxPendingRequests + // are queued on them if (this.isChannelServiceable(first, false)) { return first; } @@ -1048,6 +1048,9 @@ private Channel pollChannel() { assert next != null : "impossible"; if (next.isActive()) { + + // Only return channels as servicable here if less than maxPendingRequests + // are queued on them if (this.isChannelServiceable(next, false)) { return next; } @@ -1069,8 +1072,6 @@ private Channel pollChannel() { private void releaseAndOfferChannel(final Channel channel, final Promise promise) { this.ensureInEventLoop(); try { - - // TODO: moderakh is this right?!!!!! this.acquiredChannels.remove(channel); if (this.offerChannel(channel)) { this.poolHandler.channelReleased(channel); @@ -1138,7 +1139,6 @@ private void releaseAndOfferChannelIfHealthy( private void releaseChannel(final Channel channel, final Promise promise) { checkState(channel.eventLoop().inEventLoop()); - // TODO: moderakh what is ChannelPool final ChannelPool pool = channel.attr(POOL_KEY).getAndSet(null); final boolean acquired = this.acquiredChannels.get(channel) != null; @@ -1180,8 +1180,6 @@ private void releaseChannel(final Channel channel, final Promise promise) * {@link #acquire}. */ private void runTasksInPendingAcquisitionQueue() { - ensureInEventLoop(); - this.ensureInEventLoop(); int channelsAvailable = this.availableChannels.size(); @@ -1206,7 +1204,7 @@ private void runTasksInPendingAcquisitionQueue() { task.acquired(true); this.acquire(task.promise); - } while (--channelsAvailable> 0); + } while (--channelsAvailable > 0); } private void throwIfClosed() { @@ -1437,8 +1435,8 @@ private ScheduledFuture startMonitoring() { return monitoringRntbdChannelPool.scheduleAtFixedRate(() -> { int i = getTaskCount(); if (isInterestingEndpoint()) { - logger.debug("{} total number of tasks on the executor [{}], connecting [{}], acquiredChannel [{}], availableChannel [{}], pending acquisition [{}]", - this.hashCode(), i, connecting.get(), acquiredChannels.size(), availableChannels.size(), pendingAcquisitions.size()); + logger.debug("{} total number of tasks on the executor [{}], remote address: [{}], connecting [{}], acquiredChannel [{}], availableChannel [{}], pending acquisition [{}]", + this.hashCode(), i, this.remoteAddress(), connecting.get(), acquiredChannels.size(), availableChannels.size(), pendingAcquisitions.size()); } }, 0, 60, TimeUnit.SECONDS); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java index caffd39f11401..88a057c71be50 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java @@ -22,9 +22,15 @@ public interface RntbdEndpoint extends AutoCloseable { // region Accessors - int channelsAcquired(); - - int channelsAvailable(); + /** + * @return approximate number of acquired channels. + */ + int channelsAcquiredMetric(); + + /** + * @return approximate number of available channels. + */ + int channelsAvailableMetric(); int concurrentRequests(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdMetrics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdMetrics.java index 2231e5a16a670..b68fb1002bd24 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdMetrics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdMetrics.java @@ -99,12 +99,12 @@ public RntbdMetrics(RntbdTransportClient client, RntbdEndpoint endpoint) { .tags(this.tags) .register(registry); - Gauge.builder(nameOf("channelsAcquired"), endpoint, RntbdEndpoint::channelsAcquired) + Gauge.builder(nameOf("channelsAcquired"), endpoint, RntbdEndpoint::channelsAcquiredMetric) .description("acquired channel count") .tags(this.tags) .register(registry); - Gauge.builder(nameOf("channelsAvailable"), endpoint, RntbdEndpoint::channelsAvailable) + Gauge.builder(nameOf("channelsAvailable"), endpoint, RntbdEndpoint::channelsAvailableMetric) .description("available channel count") .tags(this.tags) .register(registry); @@ -144,12 +144,12 @@ public static void add(MeterRegistry registry) { @JsonProperty public int channelsAcquired() { - return this.endpoint.channelsAcquired(); + return this.endpoint.channelsAcquiredMetric(); } @JsonProperty public int channelsAvailable() { - return this.endpoint.channelsAvailable(); + return this.endpoint.channelsAvailableMetric(); } /*** diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java index 1611352669921..000c669861291 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java @@ -84,7 +84,7 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound private static final ClosedChannelException ON_DEREGISTER = ThrowableUtil.unknownStackTrace(new ClosedChannelException(), RntbdRequestManager.class, "deregister"); - private static final EventExecutor requestExpirator = new DefaultEventExecutor(new RntbdThreadFactory( + private static final EventExecutor requestExpirationExecutor = new DefaultEventExecutor(new RntbdThreadFactory( "request-expirator", true, Thread.NORM_PRIORITY)); @@ -575,7 +575,7 @@ private RntbdRequestRecord addPendingRequestRecord(final ChannelHandlerContext c final Timeout pendingRequestTimeout = record.newTimeout(timeout -> { // We don't wish to complete on the timeout thread, but rather on a thread doled out by our executor - requestExpirator.execute(record::expire); + requestExpirationExecutor.execute(record::expire); }); record.whenComplete((response, error) -> { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java index 3b261330638d1..3647d8b9b07a8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -110,12 +110,18 @@ private RntbdServiceEndpoint( // region Accessors @Override - public int channelsAcquired() { + /** + * @return approximate number of acquired channels. + */ + public int channelsAcquiredMetric() { return this.channelPool.channelsAcquiredMetrics(); } + /** + * @return approximate number of available channels. + */ @Override - public int channelsAvailable() { + public int channelsAvailableMetric() { return this.channelPool.channelsAvailableMetrics(); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java index c7b2478242776..1271f1c41e645 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java @@ -841,12 +841,12 @@ private FakeEndpoint( // region Accessors @Override - public int channelsAcquired() { + public int channelsAcquiredMetric() { return 0; } @Override - public int channelsAvailable() { + public int channelsAvailableMetric() { return 0; }