Skip to content

Commit

Permalink
Cosmos Issue #6253: Port RntbdRequestTimer memory leak fix from V4 (#…
Browse files Browse the repository at this point in the history
…6254)

* Port from v4

* Corrected package misspelling in log4j.properties and removed System.exit from Main.java

* Port from V4

* Responded to code review comments

* Reverting tests.yml file back to master
  • Loading branch information
David Noble authored and xseeseesee committed Dec 10, 2019
1 parent 3ca8508 commit a957a48
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class Main {

public static void main(String[] args) throws Exception {
org.apache.log4j.Logger.getLogger("io.netty").setLevel(org.apache.log4j.Level.OFF);

AsyncBenchmark benchmark = null;
try {
LOGGER.debug("Parsing the arguments ...");
Configuration cfg = new Configuration();
Expand All @@ -27,7 +27,6 @@ public static void main(String[] args) throws Exception {
return;
}

AsyncBenchmark benchmark;
switch (cfg.getOperationType()) {
case WriteThroughput:
case WriteLatency:
Expand Down Expand Up @@ -67,13 +66,16 @@ public static void main(String[] args) throws Exception {

LOGGER.info("Starting {}", cfg.getOperationType());
benchmark.run();
benchmark.shutdown();

} catch (ParameterException e) {
// if any error in parsing the cmd-line options print out the usage help
System.err.println("INVALID Usage: " + e.getMessage());
System.err.println("Try '-help' for more information.");
throw e;
} finally {
if (benchmark != null) {
benchmark.shutdown();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,17 @@ public Duration shutdownTimeout() {
return this.shutdownTimeout;
}

@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
public UserAgentContainer userAgent() {
return this.userAgent;
}

// endregion

// region Methods

public UserAgentContainer userAgent() {
return this.userAgent;
@Override
public String toString() {
return RntbdObjectMapper.toJson(this);
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ protected void initChannel(final Channel channel) {
checkNotNull(channel);

final RntbdRequestManager requestManager = new RntbdRequestManager(this.healthChecker, this.config.maxRequestsPerChannel());
final long readerIdleTime = this.config.receiveHangDetectionTime();
final long writerIdleTime = this.config.sendHangDetectionTime();
final long allIdleTime = this.config.idleConnectionTimeout();
final long readerIdleTime = this.config.receiveHangDetectionTimeInNanos();
final long writerIdleTime = this.config.sendHangDetectionTimeInNanos();
final long allIdleTime = this.config.idleConnectionTimeoutInNanos();
final ChannelPipeline pipeline = channel.pipeline();

pipeline.addFirst(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ public RntbdClientChannelHealthChecker(final Config config) {

checkNotNull(config, "config: null");

this.idleConnectionTimeout = config.idleConnectionTimeout();
this.idleConnectionTimeout = config.idleConnectionTimeoutInNanos();

this.readDelayLimit = config.receiveHangDetectionTime();
this.readDelayLimit = config.receiveHangDetectionTimeInNanos();
checkArgument(this.readDelayLimit > readHangGracePeriod, "config.receiveHangDetectionTime: %s", this.readDelayLimit);

this.writeDelayLimit = config.sendHangDetectionTime();
this.writeDelayLimit = config.sendHangDetectionTimeInNanos();
checkArgument(this.writeDelayLimit > writeHangGracePeriod, "config.sendHangDetectionTime: %s", this.writeDelayLimit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void onTimeout(AcquireTask task) {
}
}

final long idleEndpointTimeout = config.idleEndpointTimeout();
final long idleEndpointTimeout = config.idleEndpointTimeoutInNanos();

this.idleStateDetectionScheduledFuture = this.executor.scheduleAtFixedRate(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,19 @@ public int bufferPageSize() {
}

@JsonProperty
public int connectionTimeout() {
public int connectionTimeoutInMillis() {
final long value = this.options.connectionTimeout().toMillis();
assert value <= Integer.MAX_VALUE;
return (int)value;
}

@JsonProperty
public long idleConnectionTimeout() {
public long idleConnectionTimeoutInNanos() {
return this.options.idleChannelTimeout().toNanos();
}

@JsonProperty
public long idleEndpointTimeout() {
public long idleEndpointTimeoutInNanos() {
return this.options.idleEndpointTimeout().toNanos();
}

Expand All @@ -137,22 +137,22 @@ public int maxRequestsPerChannel() {
}

@JsonProperty
public long receiveHangDetectionTime() {
public long receiveHangDetectionTimeInNanos() {
return this.options.receiveHangDetectionTime().toNanos();
}

@JsonProperty
public long requestTimeout() {
public long requestTimeoutInNanos() {
return this.options.requestTimeout().toNanos();
}

@JsonProperty
public long sendHangDetectionTime() {
public long sendHangDetectionTimeInNanos() {
return this.options.sendHangDetectionTime().toNanos();
}

@JsonProperty
public long shutdownTimeout() {
public long shutdownTimeoutInNanos() {
return this.options.shutdownTimeout().toNanos();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.ser.PropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.handler.codec.CorruptedFrameException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Strings.lenientFormat;

public final class RntbdRequestTimer implements AutoCloseable {

private static final long FIVE_MILLISECONDS = 5000000L;
Expand All @@ -22,11 +24,9 @@ public final class RntbdRequestTimer implements AutoCloseable {
private final Timer timer;

public RntbdRequestTimer(final long requestTimeout) {

// Inspection of the HashWheelTimer code indicates that our choice of a 5 millisecond timer resolution ensures
// a request will expire within 10 milliseconds of the specified requestTimeout interval. This is because
// cancellation of a timeout takes two timer resolution units to complete.

this.timer = new HashedWheelTimer(FIVE_MILLISECONDS, TimeUnit.NANOSECONDS);
this.requestTimeout = requestTimeout;
}
Expand All @@ -37,13 +37,28 @@ public long getRequestTimeout(final TimeUnit unit) {

@Override
public void close() {

final Set<Timeout> timeouts = this.timer.stop();
if (logger.isDebugEnabled()) {
final int count = timeouts.size();
if (count > 0) {
logger.debug("request expiration tasks cancelled: {}", count);
final int count = timeouts.size();

if (count == 0) {
logger.debug("no outstanding request timeout tasks");
return;
}

logger.debug("stopping {} request timeout tasks", count);

for (final Timeout timeout : timeouts) {
if (!timeout.isExpired()) {
try {
timeout.task().run(timeout);
} catch (Throwable error) {
logger.warn(lenientFormat("request timeout task failed due to ", error));
}
}
}

logger.debug("{} request timeout tasks stopped", count);
}

public Timeout newTimeout(final TimerTask task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
// region Fields

private static final String TAG_NAME = RntbdServiceEndpoint.class.getSimpleName();
private static final long QUIET_PERIOD = 2L * 1_000_000_000L; // 2 seconds
private static final long QUIET_PERIOD = 2_000_000_000L; // 2 seconds

private static final AtomicLong instanceCount = new AtomicLong();
private static final Logger logger = LoggerFactory.getLogger(RntbdServiceEndpoint.class);
Expand Down Expand Up @@ -79,7 +79,7 @@ private RntbdServiceEndpoint(
.group(group)
.option(ChannelOption.ALLOCATOR, config.allocator())
.option(ChannelOption.AUTO_READ, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeout())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectionTimeoutInMillis())
.option(ChannelOption.RCVBUF_ALLOCATOR, receiveBufferAllocator)
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(physicalAddress.getHost(), physicalAddress.getPort());
Expand Down Expand Up @@ -330,7 +330,7 @@ public Provider(final RntbdTransportClient transportClient, final Options option

this.transportClient = transportClient;
this.config = new Config(options, sslContext, wireLogLevel);
this.requestTimer = new RntbdRequestTimer(config.requestTimeout());
this.requestTimer = new RntbdRequestTimer(config.requestTimeoutInNanos());
this.eventLoopGroup = new NioEventLoopGroup(threadCount, threadFactory);

this.endpoints = new ConcurrentHashMap<>();
Expand All @@ -349,14 +349,15 @@ public void close() {
endpoint.close();
}

this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeout(), NANOSECONDS)
this.eventLoopGroup.shutdownGracefully(QUIET_PERIOD, this.config.shutdownTimeoutInNanos(), NANOSECONDS)
.addListener(future -> {
if (future.isSuccess()) {
logger.debug("\n [{}]\n closed endpoints", this);
return;
}
logger.error("\n [{}]\n failed to close endpoints due to ", this, future.cause());
});

return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ static class Provider implements RntbdEndpoint.Provider {

Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) {
this.config = new Config(options, sslContext, LogLevel.WARN);
this.timer = new RntbdRequestTimer(config.requestTimeout());
this.timer = new RntbdRequestTimer(config.requestTimeoutInNanos());
this.expected = expected;
}

Expand Down

0 comments on commit a957a48

Please sign in to comment.