Skip to content

Commit

Permalink
Add system properties for enabling SamplingAllocationStrategy (#1454)
Browse files Browse the repository at this point in the history
"reactor.netty.pool.getPermitsSamplingRate" and
"reactor.netty.pool.returnPermitsSamplingRate" are exposed
in order to be able to enable Reactor Pool's SamplingAllocationStrategy
  • Loading branch information
violetagg authored Jan 7, 2021
1 parent ee7d317 commit 6c3ab1c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ configure(rootProject) { project ->
systemProperty("reactor.trace.nocapacity", "true")
systemProperty("testGroups", project.properties.get("testGroups"))
systemProperty("io.netty.leakDetection.level", "paranoid")
systemProperty("reactor.netty.pool.getPermitsSamplingRate", "0.5")
systemProperty("reactor.netty.pool.returnPermitsSamplingRate", "0.5")
scanForTestClasses = false
include '**/*Tests.*'
include '**/*Test.*'
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/reactor/netty/ReactorNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.pool.AllocationStrategy;
import reactor.pool.PoolBuilder;
import reactor.pool.introspection.SamplingAllocationStrategy;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;
Expand Down Expand Up @@ -132,6 +135,22 @@ public final class ReactorNetty {
* </ul>
*/
public static final String POOL_LEASING_STRATEGY = "reactor.netty.pool.leasingStrategy";
/**
* Default {@code getPermitsSamplingRate} (between 0d and 1d (percentage))
* to be used with a {@link SamplingAllocationStrategy}.
* This strategy wraps a {@link PoolBuilder#sizeBetween(int, int) sizeBetween} {@link AllocationStrategy}
* and samples calls to {@link AllocationStrategy#getPermits(int)}.
* Fallback - sampling is not enabled.
*/
public static final String POOL_GET_PERMITS_SAMPLING_RATE = "reactor.netty.pool.getPermitsSamplingRate";
/**
* Default {@code returnPermitsSamplingRate} (between 0d and 1d (percentage))
* to be used with a {@link SamplingAllocationStrategy}.
* This strategy wraps a {@link PoolBuilder#sizeBetween(int, int) sizeBetween} {@link AllocationStrategy}
* and samples calls to {@link AllocationStrategy#returnPermits(int)}.
* Fallback - sampling is not enabled.
*/
public static final String POOL_RETURN_PERMITS_SAMPLING_RATE = "reactor.netty.pool.returnPermitsSamplingRate";


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.channel.ChannelOperations;
import reactor.pool.InstrumentedPool;
import reactor.pool.PoolBuilder;
import reactor.pool.PoolConfig;
import reactor.pool.PooledRef;
import reactor.pool.PooledRefMetadata;
import reactor.pool.introspection.SamplingAllocationStrategy;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.NonNull;
Expand Down Expand Up @@ -710,6 +712,34 @@ public int hashCode() {


final static class PoolFactory {
static final double DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE;
static {
double getPermitsSamplingRate =
Double.parseDouble(System.getProperty(ReactorNetty.POOL_GET_PERMITS_SAMPLING_RATE, "0"));
if (getPermitsSamplingRate > 1d) {
DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE = 0;
log.warn("Invalid configuration [" + ReactorNetty.POOL_GET_PERMITS_SAMPLING_RATE + "=" + getPermitsSamplingRate +
"], the value must be between 0d and 1d (percentage). SamplingAllocationStrategy in not enabled.");
}
else {
DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE = getPermitsSamplingRate;
}
}

static final double DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE;
static {
double returnPermitsSamplingRate =
Double.parseDouble(System.getProperty(ReactorNetty.POOL_RETURN_PERMITS_SAMPLING_RATE, "0"));
if (returnPermitsSamplingRate > 1d) {
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE = 0;
log.warn("Invalid configuration [" + ReactorNetty.POOL_RETURN_PERMITS_SAMPLING_RATE + "=" + returnPermitsSamplingRate +
"], the value must be between 0d and 1d (percentage). SamplingAllocationStrategy is enabled.");
}
else {
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE = returnPermitsSamplingRate;
}
}

final Duration evictionInterval;
final int maxConnections;
final int pendingAcquireMaxCount;
Expand Down Expand Up @@ -741,12 +771,25 @@ InstrumentedPool<PooledConnection> newPool(Publisher<PooledConnection> allocator
.or((poolable, meta) -> (maxIdleTime != -1 && meta.idleTime() >= maxIdleTime)
|| (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime)))
.maxPendingAcquire(pendingAcquireMaxCount)
.sizeBetween(0, maxConnections)
.evictInBackground(evictionInterval);

if (DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE <= 1d
&& DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE <= 1d) {
poolBuilder = poolBuilder.allocationStrategy(SamplingAllocationStrategy.sizeBetweenWithSampling(
0,
maxConnections,
DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE,
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE));
}
else {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
}

if (LEASING_STRATEGY_FIFO.equals(leasingStrategy)) {
return poolBuilder.idleResourceReuseLruOrder()
.buildPool();
}

return poolBuilder.idleResourceReuseMruOrder()
.buildPool();
}
Expand Down

0 comments on commit 6c3ab1c

Please sign in to comment.