Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing an issue with Direct Channels acquired/close metrics for idle endpoints #33969

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

#### Bugs Fixed
* Fixed `readMany` API to take in hierarchical partition keys - See [32501](https://github.com/Azure/azure-sdk-for-java/pull/32501)

* Fixed and issue in the Direct Transport metrics for acquired/closed channels which would be triggered when endpoint get closed/evicted due to exceeding idle timeouts. This would surface as stale metrics for these endpoints. - See [33969](https://github.com/Azure/azure-sdk-for-java/pull/33969)
#### Other Changes
* Added fault injection support - See [PR 33329](https://github.com/Azure/azure-sdk-for-java/pull/33329).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics;
import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdDurableEndpointMetrics;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetricsCompletionRecorder;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord;
import com.azure.cosmos.implementation.guava25.net.PercentEscaper;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.models.CosmosMetricName;
import com.azure.cosmos.models.CosmosMicrometerMeterOptions;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.FunctionCounter;
Expand Down Expand Up @@ -1072,8 +1072,8 @@ private RntbdMetricsV2(MeterRegistry registry, RntbdTransportClient client, Rntb
if (options.isEnabled()) {
FunctionCounter.builder(
options.getMeterName().toString(),
endpoint,
RntbdEndpoint::totalChannelsAcquiredMetric)
endpoint.durableEndpointMetrics(),
RntbdDurableEndpointMetrics::totalChannelsAcquiredMetric)
.description("RNTBD acquired channel count")
.tags(getEffectiveTags(tags, options))
.register(registry);
Expand All @@ -1084,8 +1084,8 @@ private RntbdMetricsV2(MeterRegistry registry, RntbdTransportClient client, Rntb
if (options.isEnabled()) {
FunctionCounter.builder(
options.getMeterName().toString(),
endpoint,
RntbdEndpoint::totalChannelsClosedMetric)
endpoint.durableEndpointMetrics(),
RntbdDurableEndpointMetrics::totalChannelsClosedMetric)
.description("RNTBD closed channel count")
.tags(getEffectiveTags(tags, options))
.register(registry);
Expand All @@ -1094,7 +1094,10 @@ private RntbdMetricsV2(MeterRegistry registry, RntbdTransportClient client, Rntb
options = client
.getMeterOptions(CosmosMetricName.DIRECT_CHANNELS_AVAILABLE_COUNT);
if (options.isEnabled()) {
Gauge.builder(options.getMeterName().toString(), endpoint, RntbdEndpoint::channelsAvailableMetric)
Gauge.builder(
options.getMeterName().toString(),
endpoint.durableEndpointMetrics(),
RntbdDurableEndpointMetrics::channelsAvailableMetric)
.description("RNTBD available channel count")
.tags(getEffectiveTags(tags, options))
.register(registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -166,7 +165,6 @@ public final class RntbdClientChannelPool implements ChannelPool {
private final Runnable acquisitionTimeoutTask;
private final PooledByteBufAllocatorMetric allocatorMetric;
private final Bootstrap bootstrap;
private final RntbdServiceEndpoint endpoint;
private final EventExecutor executor;
private final ChannelHealthChecker healthChecker;
// private final ScheduledFuture<?> idleStateDetectionScheduledFuture;
Expand All @@ -175,8 +173,7 @@ public final class RntbdClientChannelPool implements ChannelPool {
private final int maxRequestsPerChannel;
private final ChannelPoolHandler poolHandler;
private final boolean releaseHealthCheck;
private final AtomicInteger totalAcquiredChannels = new AtomicInteger(0);
private final AtomicInteger totalClosedChannels = new AtomicInteger(0);
private final RntbdDurableEndpointMetrics durableEndpointMetrics;

// Because state from these fields can be requested on any thread...

Expand All @@ -202,22 +199,26 @@ public final class RntbdClientChannelPool implements ChannelPool {
* @param config the {@link Config} that is used for the channel pool instance created.
* @param clientTelemetry the {@link ClientTelemetry} that is used to track client telemetry related metrics.
* @param connectionStateListener the {@link RntbdConnectionStateListener}.
* @param durableEndpointMetrics a holder for the metric state (which should be
* durable for endpoints with the same address)
*/
RntbdClientChannelPool(
final RntbdServiceEndpoint endpoint,
final Bootstrap bootstrap,
final Config config,
final ClientTelemetry clientTelemetry,
final RntbdConnectionStateListener connectionStateListener,
final RntbdServerErrorInjector faultInjectionInterceptors) {
final RntbdServerErrorInjector faultInjectionInterceptors,
final RntbdDurableEndpointMetrics durableEndpointMetrics) {
this(
endpoint,
bootstrap,
config,
new RntbdClientChannelHealthChecker(config),
clientTelemetry,
connectionStateListener,
faultInjectionInterceptors);
faultInjectionInterceptors,
durableEndpointMetrics);
}

private RntbdClientChannelPool(
Expand All @@ -227,18 +228,20 @@ private RntbdClientChannelPool(
final RntbdClientChannelHealthChecker healthChecker,
final ClientTelemetry clientTelemetry,
final RntbdConnectionStateListener connectionStateListener,
final RntbdServerErrorInjector serverErrorInjector) {
final RntbdServerErrorInjector serverErrorInjector,
final RntbdDurableEndpointMetrics durableEndpointMetrics) {

checkNotNull(endpoint, "expected non-null endpoint");
checkNotNull(bootstrap, "expected non-null bootstrap");
checkNotNull(config, "expected non-null config");
checkNotNull(healthChecker, "expected non-null healthChecker");
checkNotNull(durableEndpointMetrics, "expected non-null durableEndpointMetrics");

this.endpoint = endpoint;
this.poolHandler = new RntbdClientChannelHandler(config, healthChecker, connectionStateListener, serverErrorInjector);
this.executor = bootstrap.config().group().next();
this.healthChecker = healthChecker;
this.serverErrorInjector = serverErrorInjector;
this.durableEndpointMetrics = durableEndpointMetrics;

this.bootstrap = bootstrap.clone().handler(new ChannelInitializer<Channel>() {
@Override
Expand Down Expand Up @@ -290,26 +293,6 @@ public void onTimeout(AcquireListener task) {
this.pendingAcquisitionExpirationFuture = null;
}
this.clientTelemetry = clientTelemetry;

// this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate(
// () -> {
// final long elapsedTimeInNanos = System.nanoTime() - endpoint.lastRequestNanoTime();
//
// if (idleEndpointTimeoutInNanos - elapsedTimeInNanos <= 0) {
// if (logger.isDebugEnabled()) {
// logger.debug(
// "{} closing endpoint due to inactivity (elapsedTime: {} > idleEndpointTimeout: {})",
// endpoint,
// Duration.ofNanos(elapsedTimeInNanos),
// Duration.ofNanos(idleEndpointTimeoutInNanos));
// }
// endpoint.close();
// return;
// }
//
// this.runTasksInPendingAcquisitionQueue();
//
// }, requestTimerResolutionInNanos, requestTimerResolutionInNanos, TimeUnit.NANOSECONDS);
}

// region Accessors
Expand Down Expand Up @@ -341,24 +324,6 @@ public int channelsAcquiredMetrics() {
return this.acquiredChannels.size();
}

/**
* Gets the total acquired channel count.
*
* @return the total acquired channel count.
*/
public int totalChannelsAcquiredMetrics() {
return this.totalAcquiredChannels.get();
}

/**
* Gets the total closed channel count.
*
* @return the total closed channel count.
*/
public int totalChannelsClosedMetrics() {
return this.totalClosedChannels.get();
}

/**
* Gets the current available channel count.
*
Expand Down Expand Up @@ -899,7 +864,7 @@ private void addTaskToPendingAcquisitionQueue(ChannelPromiseWithExpiryTime promi
*/
private void closeChannel(final Channel channel) {
this.ensureInEventLoop();
totalClosedChannels.incrementAndGet();
this.durableEndpointMetrics.incrementClosedChannels();
this.acquiredChannels.remove(channel);
this.availableChannels.remove(channel);
channel.attr(POOL_KEY).set(null);
Expand Down Expand Up @@ -1234,7 +1199,7 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise<Chan
logger.debug("established a channel local {}, remote {}", channel.localAddress(), channel.remoteAddress());
}

totalAcquiredChannels.incrementAndGet();
durableEndpointMetrics.incrementAcquiredChannels();

this.acquiredChannels.compute(channel, (ignored, acquiredChannel) -> {
reportIssueUnless(logger, acquiredChannel == null, this,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.directconnectivity.rntbd;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public final class RntbdDurableEndpointMetrics {
private final AtomicInteger totalAcquiredChannels;
private final AtomicInteger totalClosedChannels;
private final AtomicReference<RntbdEndpoint> latestEndpoint;

public RntbdDurableEndpointMetrics() {
this.totalAcquiredChannels = new AtomicInteger(0);
this.totalClosedChannels = new AtomicInteger(0);
this.latestEndpoint = new AtomicReference<>();
}

public void setEndpoint(RntbdEndpoint endpoint) {
this.latestEndpoint.set(endpoint);
}

public void incrementAcquiredChannels() {
totalAcquiredChannels.incrementAndGet();
}

public void incrementClosedChannels() {
totalClosedChannels.incrementAndGet();
}

public int channelsAvailableMetric() {
RntbdEndpoint snapshot = latestEndpoint.get();
if (snapshot != null) {
return snapshot.channelsAvailableMetric();
}

return 0;
}

public int totalChannelsClosedMetric() {
return totalClosedChannels.get();
}

public int totalChannelsAcquiredMetric() {
return totalAcquiredChannels.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,9 @@ public interface RntbdEndpoint extends AutoCloseable {
int channelsAcquiredMetric();

/**
* @return total number of acquired channels.
* @return durable monotonic counters for total acquired/closed channels.
*/
int totalChannelsAcquiredMetric();

/**
* @return total number of closed channels.
*/
int totalChannelsClosedMetric();
RntbdDurableEndpointMetrics durableEndpointMetrics();

/**
* @return approximate number of available channels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {

private final RntbdConnectionStateListener connectionStateListener;
private final URI serviceEndpoint;
private final RntbdDurableEndpointMetrics durableMetrics;
private String lastFaultInjectionRuleId;
private Instant lastFaultInjectionTimestamp;


// endregion

// region Constructors
Expand All @@ -106,8 +108,10 @@ private RntbdServiceEndpoint(
final URI physicalAddress,
final ClientTelemetry clientTelemetry,
final RntbdServerErrorInjector faultInjectionInterceptors,
final URI serviceEndpoint) {
final URI serviceEndpoint,
final RntbdDurableEndpointMetrics durableMetrics) {

this.durableMetrics = durableMetrics;
this.serverKey = RntbdUtils.getServerKey(physicalAddress);
this.serviceEndpoint = serviceEndpoint;

Expand Down Expand Up @@ -145,7 +149,8 @@ private RntbdServiceEndpoint(
config,
clientTelemetry,
this.connectionStateListener,
faultInjectionInterceptors);
faultInjectionInterceptors,
durableMetrics);

if (clientTelemetry != null &&
clientTelemetry.isClientMetricsEnabled() &&
Expand Down Expand Up @@ -216,20 +221,9 @@ public int channelsAcquiredMetric() {
return this.channelPool.channelsAcquiredMetrics();
}

/**
* @return approximate number of acquired channels.
*/
@Override
public int totalChannelsAcquiredMetric() {
return this.channelPool.totalChannelsAcquiredMetrics();
}

/**
* @return approximate number of closed channels.
*/
@Override
public int totalChannelsClosedMetric() {
return this.channelPool.totalChannelsClosedMetrics();
public RntbdDurableEndpointMetrics durableEndpointMetrics() {
return this.durableMetrics;
}

/**
Expand Down Expand Up @@ -622,6 +616,7 @@ public static final class Provider implements RntbdEndpoint.Provider {
private final AtomicBoolean closed;
private final Config config;
private final ConcurrentHashMap<String, RntbdEndpoint> endpoints;
private final ConcurrentHashMap<String, RntbdDurableEndpointMetrics> durableMetrics;
private final EventLoopGroup eventLoopGroup;
private final AtomicInteger evictions;
private final RntbdEndpointMonitoringProvider monitoring;
Expand Down Expand Up @@ -661,6 +656,7 @@ public Provider(

this.eventLoopGroup = this.getEventLoopGroup(options);
this.endpoints = new ConcurrentHashMap<>();
this.durableMetrics = new ConcurrentHashMap<>();
this.evictions = new AtomicInteger();
this.closed = new AtomicBoolean();
this.clientTelemetry = clientTelemetry;
Expand Down Expand Up @@ -728,15 +724,30 @@ public int evictions() {

@Override
public RntbdEndpoint createIfAbsent(final URI serviceEndpoint, final URI physicalAddress) {
return endpoints.computeIfAbsent(physicalAddress.getAuthority(), authority -> new RntbdServiceEndpoint(
this,
this.config,
this.eventLoopGroup,
this.requestTimer,
physicalAddress,
this.clientTelemetry,
this.serverErrorInjector,
serviceEndpoint));
return endpoints.computeIfAbsent(
physicalAddress.getAuthority(),
authority -> {

RntbdDurableEndpointMetrics durableEndpointMetrics = durableMetrics.computeIfAbsent(
physicalAddress.getAuthority(),
ignoreMe -> new RntbdDurableEndpointMetrics()
);

RntbdServiceEndpoint endpoint = new RntbdServiceEndpoint(
this,
this.config,
this.eventLoopGroup,
this.requestTimer,
physicalAddress,
this.clientTelemetry,
this.serverErrorInjector,
serviceEndpoint,
durableEndpointMetrics);

durableEndpointMetrics.setEndpoint(endpoint);

return endpoint;
});
}

@Override
Expand All @@ -754,7 +765,7 @@ public Stream<RntbdEndpoint> list() {
return this.endpoints.values().stream();
}

private void evict(final RntbdEndpoint endpoint) {
public void evict(final RntbdEndpoint endpoint) {
if (this.endpoints.remove(endpoint.serverKey().getAuthority()) != null) {
this.evictions.incrementAndGet();
}
Expand Down
Loading