Skip to content

Commit

Permalink
Merge pull request #1 from FabianMeiswinkel/users/fabianm/RntbdMetric…
Browse files Browse the repository at this point in the history
…sToDiagnostics

Users/fabianm/rntbd metrics to diagnostics
  • Loading branch information
FabianMeiswinkel authored Sep 14, 2020
2 parents 1757a02 + d3d1992 commit 0887948
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,9 @@ public Future<Channel> acquire(final Promise<Channel> promise) {
if (this.executor.inEventLoop()) {
this.acquireChannel(promise);
} else {

// TODO: moderakh ensure/validate in all conditions we run the tasks
// if the replica endpoint is not reachable don't queue up on executor
if (pendingAcquisitions.size() > 1000) {
addTaskToPendingAcquisitionQueue(promise);
} else {
// TODO: moderakh if the endpoint is bad the timer, health check has to cancel all the pending requests
this.executor.execute(() -> this.acquireChannel(promise));
}
}
Expand Down Expand Up @@ -531,6 +527,8 @@ private void acquireChannel(final Promise<Channel> promise) {
return;
}

// make sure to retrieve the actual channel count to avoid establishing more
// TCP connections than allowed.
final int channelCount = this.channels(false);

if (channelCount < this.maxChannels) {
Expand Down Expand Up @@ -563,9 +561,12 @@ private void acquireChannel(final Promise<Channel> promise) {
final RntbdRequestManager manager = channel.pipeline().get(RntbdRequestManager.class);

if (manager == null) {
logger.warn("Channel({}) closed", channel);
logger.warn("Channel({} --> {}) closed", channel, this.remoteAddress());
} else {
final long pendingRequestCount = manager.pendingRequestCount();

// we accept the risk of reusing the channel even if more than maxPendingRequests are queued - by picking
// the channel with the least number of outstanding requests we load balance reasonably
if (isChannelServiceable(channel, true) && pendingRequestCount < pendingRequestCountMin) {
pendingRequestCountMin = pendingRequestCount;
candidate = channel;
Expand All @@ -579,6 +580,9 @@ private void acquireChannel(final Promise<Channel> promise) {
}
} else {
for (Channel channel : this.availableChannels) {

// we pick the first available channel to avoid the additional cost of laod balancing
// as long as the load is lower than the load factor threshold above.
if (isChannelServiceable(channel, true)) {
if (this.availableChannels.remove(channel)) {
this.doAcquireChannel(promise, channel);
Expand Down Expand Up @@ -607,9 +611,6 @@ private void acquireChannel(final Promise<Channel> promise) {
* @see #runTasksInPendingAcquisitionQueue
*/
private void addTaskToPendingAcquisitionQueue(Promise<Channel> promise) {

// this.ensureInEventLoop();

if (logger.isDebugEnabled()) {
logger.debug("{}, {}, {}, {}, {}, {}",
Instant.now(),
Expand All @@ -621,11 +622,8 @@ private void addTaskToPendingAcquisitionQueue(Promise<Channel> promise) {
}

if (this.pendingAcquisitions.size() >= this.maxPendingAcquisitions) {

promise.setFailure(TOO_MANY_PENDING_ACQUISITIONS);

} else {

final AcquireTask acquireTask = new AcquireTask(this, promise);

if (this.pendingAcquisitions.offer(acquireTask)) {
Expand Down Expand Up @@ -707,6 +705,7 @@ private double computeLoadFactor() {
}

private void doAcquireChannel(final Promise<Channel> promise, final Channel candidate) {
this.ensureInEventLoop();
acquiredChannels.put(candidate, candidate);

final Promise<Channel> anotherPromise = this.newChannelPromise(promise);
Expand Down Expand Up @@ -931,9 +930,8 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise<Chan
if (logger.isDebugEnabled()) {
logger.debug("established a channel local {}, remote {}", channel.localAddress(), channel.remoteAddress());
}
this.acquiredChannels.compute(channel, (ignored, acquiredChannel) -> {
// reportIssueUnless(logger, v == null, this, "expected null channel, not {}", v);

this.acquiredChannels.compute(channel, (ignored, acquiredChannel) -> {
reportIssueUnless(logger, acquiredChannel == null, this,
"Channel({}) to be acquired has already been acquired",
channel);
Expand All @@ -947,6 +945,7 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise<Chan
if (logger.isDebugEnabled()) {
logger.debug("notifyChannelConnect promise.trySuccess(channel)=false");
}

// Promise was completed in the meantime (like cancelled), just close the channel
this.closeChannel(channel);
}
Expand Down Expand Up @@ -975,7 +974,6 @@ private void notifyChannelHealthCheck(
channel.attr(POOL_KEY).set(this);
this.poolHandler.channelAcquired(channel);
promise.setSuccess(channel);

} catch (Throwable cause) {
if (this.executor.inEventLoop()) {
this.closeChannelAndFail(channel, cause, promise);
Expand Down Expand Up @@ -1038,6 +1036,8 @@ private Channel pollChannel() {
return first; // because this.close -> this.close0 -> this.pollChannel
}

// Only return channels as servicable here if less than maxPendingRequests
// are queued on them
if (this.isChannelServiceable(first, false)) {
return first;
}
Expand All @@ -1048,6 +1048,9 @@ private Channel pollChannel() {
assert next != null : "impossible";

if (next.isActive()) {

// Only return channels as servicable here if less than maxPendingRequests
// are queued on them
if (this.isChannelServiceable(next, false)) {
return next;
}
Expand All @@ -1069,8 +1072,6 @@ private Channel pollChannel() {
private void releaseAndOfferChannel(final Channel channel, final Promise<Void> promise) {
this.ensureInEventLoop();
try {

// TODO: moderakh is this right?!!!!!
this.acquiredChannels.remove(channel);
if (this.offerChannel(channel)) {
this.poolHandler.channelReleased(channel);
Expand Down Expand Up @@ -1138,7 +1139,6 @@ private void releaseAndOfferChannelIfHealthy(
private void releaseChannel(final Channel channel, final Promise<Void> promise) {
checkState(channel.eventLoop().inEventLoop());

// TODO: moderakh what is ChannelPool
final ChannelPool pool = channel.attr(POOL_KEY).getAndSet(null);
final boolean acquired = this.acquiredChannels.get(channel) != null;

Expand Down Expand Up @@ -1180,8 +1180,6 @@ private void releaseChannel(final Channel channel, final Promise<Void> promise)
* {@link #acquire}.
*/
private void runTasksInPendingAcquisitionQueue() {
ensureInEventLoop();

this.ensureInEventLoop();
int channelsAvailable = this.availableChannels.size();

Expand All @@ -1206,7 +1204,7 @@ private void runTasksInPendingAcquisitionQueue() {

task.acquired(true);
this.acquire(task.promise);
} while (--channelsAvailable> 0);
} while (--channelsAvailable > 0);
}

private void throwIfClosed() {
Expand Down Expand Up @@ -1437,8 +1435,8 @@ private ScheduledFuture<?> startMonitoring() {
return monitoringRntbdChannelPool.scheduleAtFixedRate(() -> {
int i = getTaskCount();
if (isInterestingEndpoint()) {
logger.debug("{} total number of tasks on the executor [{}], connecting [{}], acquiredChannel [{}], availableChannel [{}], pending acquisition [{}]",
this.hashCode(), i, connecting.get(), acquiredChannels.size(), availableChannels.size(), pendingAcquisitions.size());
logger.debug("{} total number of tasks on the executor [{}], remote address: [{}], connecting [{}], acquiredChannel [{}], availableChannel [{}], pending acquisition [{}]",
this.hashCode(), i, this.remoteAddress(), connecting.get(), acquiredChannels.size(), availableChannels.size(), pendingAcquisitions.size());
}
}, 0, 60, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ public interface RntbdEndpoint extends AutoCloseable {

// region Accessors

int channelsAcquired();

int channelsAvailable();
/**
* @return approximate number of acquired channels.
*/
int channelsAcquiredMetric();

/**
* @return approximate number of available channels.
*/
int channelsAvailableMetric();

int concurrentRequests();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ public RntbdMetrics(RntbdTransportClient client, RntbdEndpoint endpoint) {
.tags(this.tags)
.register(registry);

Gauge.builder(nameOf("channelsAcquired"), endpoint, RntbdEndpoint::channelsAcquired)
Gauge.builder(nameOf("channelsAcquired"), endpoint, RntbdEndpoint::channelsAcquiredMetric)
.description("acquired channel count")
.tags(this.tags)
.register(registry);

Gauge.builder(nameOf("channelsAvailable"), endpoint, RntbdEndpoint::channelsAvailable)
Gauge.builder(nameOf("channelsAvailable"), endpoint, RntbdEndpoint::channelsAvailableMetric)
.description("available channel count")
.tags(this.tags)
.register(registry);
Expand Down Expand Up @@ -144,12 +144,12 @@ public static void add(MeterRegistry registry) {

@JsonProperty
public int channelsAcquired() {
return this.endpoint.channelsAcquired();
return this.endpoint.channelsAcquiredMetric();
}

@JsonProperty
public int channelsAvailable() {
return this.endpoint.channelsAvailable();
return this.endpoint.channelsAvailableMetric();
}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public final class RntbdRequestManager implements ChannelHandler, ChannelInbound
private static final ClosedChannelException ON_DEREGISTER =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(), RntbdRequestManager.class, "deregister");

private static final EventExecutor requestExpirator = new DefaultEventExecutor(new RntbdThreadFactory(
private static final EventExecutor requestExpirationExecutor = new DefaultEventExecutor(new RntbdThreadFactory(
"request-expirator",
true,
Thread.NORM_PRIORITY));
Expand Down Expand Up @@ -575,7 +575,7 @@ private RntbdRequestRecord addPendingRequestRecord(final ChannelHandlerContext c
final Timeout pendingRequestTimeout = record.newTimeout(timeout -> {

// We don't wish to complete on the timeout thread, but rather on a thread doled out by our executor
requestExpirator.execute(record::expire);
requestExpirationExecutor.execute(record::expire);
});

record.whenComplete((response, error) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,18 @@ private RntbdServiceEndpoint(
// region Accessors

@Override
public int channelsAcquired() {
/**
* @return approximate number of acquired channels.
*/
public int channelsAcquiredMetric() {
return this.channelPool.channelsAcquiredMetrics();
}

/**
* @return approximate number of available channels.
*/
@Override
public int channelsAvailable() {
public int channelsAvailableMetric() {
return this.channelPool.channelsAvailableMetrics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,12 +841,12 @@ private FakeEndpoint(
// region Accessors

@Override
public int channelsAcquired() {
public int channelsAcquiredMetric() {
return 0;
}

@Override
public int channelsAvailable() {
public int channelsAvailableMetric() {
return 0;
}

Expand Down

0 comments on commit 0887948

Please sign in to comment.