Skip to content

Commit

Permalink
WIP commit for Metrics (issue ReactiveX#98)
Browse files Browse the repository at this point in the history
Did a few changes in the design as discussed in the github issue.

This code change suffices for the current Pool State change events. I will be adding the remaining events and migrate the tests to use the new model.
  • Loading branch information
Nitesh Kant committed Jun 9, 2014
1 parent 80498bc commit 5951dfb
Show file tree
Hide file tree
Showing 40 changed files with 676 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.metrics.MetricEventsListener;
import io.reactivex.netty.metrics.MetricEventsListenerFactory;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;

Expand All @@ -53,10 +54,13 @@ public abstract class AbstractClientBuilder<I, O, B extends AbstractClientBuilde
protected RxClient.ClientConfig clientConfig;
protected LogLevel wireLogginLevel;
protected MetricEventsListenerFactory eventListenersFactory;
protected MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject;

protected AbstractClientBuilder(Bootstrap bootstrap, String host, int port,
ClientConnectionFactory<O, I, ? extends ObservableConnection<O, I>> connectionFactory,
ClientChannelFactory<O, I> factory) {
eventsSubject = new MetricEventsSubject<ClientMetricsEvent<?>>();
factory.useMetricEventsSubject(eventsSubject);
this.bootstrap = bootstrap;
serverInfo = new RxClientImpl.ServerInfo(host, port);
clientConfig = RxClient.ClientConfig.Builder.newDefaultConfig();
Expand All @@ -67,6 +71,7 @@ protected AbstractClientBuilder(Bootstrap bootstrap, String host, int port,
}

protected AbstractClientBuilder(Bootstrap bootstrap, String host, int port, ConnectionPoolBuilder<O, I> poolBuilder) {
eventsSubject = new MetricEventsSubject<ClientMetricsEvent<?>>();
this.bootstrap = bootstrap;
this.poolBuilder = poolBuilder;
serverInfo = new RxClientImpl.ServerInfo(host, port);
Expand Down Expand Up @@ -215,6 +220,10 @@ public RxClientImpl.ServerInfo getServerInfo() {
return serverInfo;
}

public MetricEventsSubject<ClientMetricsEvent<?>> getEventsSubject() {
return eventsSubject;
}

public C build() {
if (null == socketChannel) {
socketChannel = NioSocketChannel.class;
Expand All @@ -240,7 +249,7 @@ public C build() {
}
C client = createClient();
if (null != eventListenersFactory) {
MetricEventsListener<? extends ClientMetricsEvent> listener =
MetricEventsListener<? extends ClientMetricsEvent<?>> listener =
newMetricsListener(eventListenersFactory, client);
client.subscribe(listener);
}
Expand All @@ -262,7 +271,7 @@ protected ConnectionPoolBuilder<O, I> getPoolBuilder(boolean createNew) {
* This works well as someone who wants to override the connection factory should either start with a
* pool builder or don't choose a pooled connection later.
*/
poolBuilder = new ConnectionPoolBuilder<O, I>(serverInfo, channelFactory); // Overrides the connection factory
poolBuilder = new ConnectionPoolBuilder<O, I>(serverInfo, channelFactory, eventsSubject); // Overrides the connection factory
}
return poolBuilder;
}
Expand All @@ -280,6 +289,6 @@ protected String generatedNamePrefix() {
return "RxClient-";
}

protected abstract MetricEventsListener<? extends ClientMetricsEvent>
protected abstract MetricEventsListener<? extends ClientMetricsEvent<? extends Enum>>
newMetricsListener(MetricEventsListenerFactory factory, C client);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@
/**
* @author Nitesh Kant
*/
public abstract class AbstractMetricsEvent implements MetricsEvent {
@SuppressWarnings("rawtypes")
public abstract class AbstractMetricsEvent<T extends Enum> implements MetricsEvent<T> {

protected final String name;
protected final T name;
protected final boolean isTimed;
protected final boolean isError;

protected AbstractMetricsEvent(String name, boolean isTimed, boolean isError) {
protected AbstractMetricsEvent(T name, boolean isTimed, boolean isError) {
this.isTimed = isTimed;
this.name = name;
this.isError = isError;
}

@Override
public String name() {
public T getType() {
return name;
}

Expand Down Expand Up @@ -64,7 +65,7 @@ public boolean equals(Object o) {
if (isTimed != that.isTimed) {
return false;
}
if (!name.equals(that.name)) {
if (name != that.name) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public ClientBuilder(Bootstrap bootstrap, String host, int port,
protected RxClient<I, O> createClient() {
if (null == poolBuilder) {
return new RxClientImpl<I, O>(getOrCreateName(), serverInfo, bootstrap, pipelineConfigurator, clientConfig,
channelFactory, connectionFactory);
channelFactory, connectionFactory, eventsSubject);
} else {
return new RxClientImpl<I, O>(getOrCreateName(), serverInfo, bootstrap, pipelineConfigurator, clientConfig,
poolBuilder);
poolBuilder, eventsSubject);
}
}

Expand All @@ -65,8 +65,8 @@ protected String generatedNamePrefix() {
}

@Override
protected MetricEventsListener<? extends ClientMetricsEvent> newMetricsListener(MetricEventsListenerFactory factory,
RxClient<I, O> client) {
protected MetricEventsListener<? extends ClientMetricsEvent<ClientMetricsEvent.EventType>>
newMetricsListener(MetricEventsListenerFactory factory, RxClient<I, O> client) {
return factory.forClient(client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.channel.ChannelFuture;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.metrics.MetricEventsSubject;
import rx.Subscriber;

/**
Expand All @@ -33,4 +34,6 @@ ChannelFuture connect(Subscriber<? super ObservableConnection<I, O>> subscriber,

void onNewConnection(ObservableConnection<I, O> newConnection,
Subscriber<? super ObservableConnection<I, O>> subscriber);

void useMetricEventsSubject(MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.pipeline.RxRequiredConfigurator;
import rx.Subscriber;
import rx.functions.Action0;
Expand All @@ -38,15 +40,23 @@
public class ClientChannelFactoryImpl<I, O> implements ClientChannelFactory<I,O> {

protected final Bootstrap clientBootstrap;
private MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject;

public ClientChannelFactoryImpl(Bootstrap clientBootstrap) {
public ClientChannelFactoryImpl(Bootstrap clientBootstrap, MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
this.clientBootstrap = clientBootstrap;
this.eventsSubject = eventsSubject;
}

public ClientChannelFactoryImpl(Bootstrap clientBootstrap) {
this(clientBootstrap, new MetricEventsSubject<ClientMetricsEvent<?>>());
}

@Override
public ChannelFuture connect(final Subscriber<? super ObservableConnection<I, O>> subscriber,
RxClient.ServerInfo serverInfo,
final ClientConnectionFactory<I, O,? extends ObservableConnection<I, O>> connectionFactory) {
final Clock clock = new Clock();
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_START);
final ChannelFuture connectFuture = clientBootstrap.connect(serverInfo.getHost(), serverInfo.getPort());

subscriber.add(Subscriptions.create(new Action0() {
Expand All @@ -62,8 +72,10 @@ public void call() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_FAILED, clock.stop(), future.cause());
subscriber.onError(future.cause());
} else {
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_SUCCESS, clock.stop());
ChannelPipeline pipeline = future.channel().pipeline();
ChannelHandlerContext ctx = pipeline.firstContext();
ObservableConnection<I, O> newConnection = connectionFactory.newConnection(ctx);
Expand All @@ -86,4 +98,9 @@ public void onNewConnection(ObservableConnection<I, O> newConnection,
subscriber.onNext(newConnection);
subscriber.onCompleted(); // The observer is no longer looking for any more connections.
}

@Override
public void useMetricEventsSubject(MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
this.eventsSubject = eventsSubject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,66 @@
/**
* @author Nitesh Kant
*/
public class ClientMetricsEvent extends AbstractMetricsEvent {
@SuppressWarnings("rawtypes")
public class ClientMetricsEvent<T extends Enum> extends AbstractMetricsEvent<T> {

/*Always refer to as constants*/protected ClientMetricsEvent(String name, boolean isTimed, boolean isError) {
public enum EventType {
ConnectStart(false, false),
ConnectSuccess(true, false),
ConnectFailed(true, true),
ConnectionCloseStart(false, false),
ConnectionCloseSuccess(true, false),
ConnectionCloseFailed(true, true),

PoolAcquireStart(false, false),
PoolAcquireSuccess(true, false),
PooledConnectionReuse(true, false),
PooledConnectionEviction(false, false),
PoolAcquireFailed(true, true),
PoolReleaseStart(false, false),
PoolReleaseSuccess(true, false),
PoolReleaseFailed(true, true);

private final boolean isTimed;
private final boolean isError;

EventType(boolean isTimed, boolean isError) {
this.isTimed = isTimed;
this.isError = isError;
}

public boolean isTimed() {
return isTimed;
}

public boolean isError() {
return isError;
}
}

public static final ClientMetricsEvent<EventType> CONNECT_START = from(EventType.ConnectStart);
public static final ClientMetricsEvent<EventType> CONNECT_SUCCESS = from(EventType.ConnectSuccess);
public static final ClientMetricsEvent<EventType> CONNECT_FAILED = from(EventType.ConnectFailed);

public static final ClientMetricsEvent<EventType> CONNECTION_CLOSE_START = from(EventType.ConnectionCloseStart);
public static final ClientMetricsEvent<EventType> CONNECTION_CLOSE_SUCCESS = from(EventType.ConnectionCloseSuccess);
public static final ClientMetricsEvent<EventType> CONNECTION_CLOSE_FAILED = from(EventType.ConnectionCloseFailed);

public static final ClientMetricsEvent<EventType> POOL_ACQUIRE_START = from(EventType.PoolAcquireStart);
public static final ClientMetricsEvent<EventType> POOL_ACQUIRE_SUCCESS = from(EventType.PoolAcquireSuccess);
public static final ClientMetricsEvent<EventType> POOL_ACQUIRE_FAILED = from(EventType.PoolAcquireFailed);
public static final ClientMetricsEvent<EventType> POOL_RELEASE_START = from(EventType.PoolReleaseStart);
public static final ClientMetricsEvent<EventType> POOL_RELEASE_SUCCESS = from(EventType.PoolReleaseSuccess);
public static final ClientMetricsEvent<EventType> POOL_RELEASE_FAILED = from(EventType.PoolReleaseFailed);
public static final ClientMetricsEvent<EventType> POOLED_CONNECTION_REUSE = from(EventType.PooledConnectionReuse);
public static final ClientMetricsEvent<EventType> POOLED_CONNECTION_EVICTION = from(EventType.PooledConnectionEviction);


/*Always refer to as constants*/protected ClientMetricsEvent(T name, boolean isTimed, boolean isError) {
super(name, isTimed, isError);
}

private static ClientMetricsEvent<EventType> from(EventType type) {
return new ClientMetricsEvent<EventType>(type, type.isTimed(), type.isError());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.reactivex.netty.client;

import java.util.concurrent.TimeUnit;

/**
* @author Nitesh Kant
*/
Expand All @@ -35,13 +37,22 @@ public CompositePoolLimitDeterminationStrategy(PoolLimitDeterminationStrategy...
}

@Override
@Deprecated
public boolean acquireCreationPermit() {
return acquireCreationPermit(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public boolean acquireCreationPermit(long acquireStartTime, TimeUnit timeUnit) {
for (int i = 0; i < strategies.length; i++) {
PoolLimitDeterminationStrategy strategy = strategies[i];
if (!strategy.acquireCreationPermit()) {
if (!strategy.acquireCreationPermit(acquireStartTime, timeUnit)) {
if (i > 0) {
long now = timeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
for (int j = i - 1; j >= 0; j--) {
strategies[j].onNext(PoolInsightProvider.PoolStateChangeEvent.ConnectFailed); // release all permits acquired before this failure.
strategies[j].onEvent(ClientMetricsEvent.CONNECT_FAILED, now - acquireStartTime,
timeUnit, ConnectionPoolImpl.POOL_EXHAUSTED_EXCEPTION,
null); // release all permits acquired before this failure.
}
}
return false;
Expand All @@ -65,6 +76,14 @@ public int getAvailablePermits() {
return minPermits; // If will atleast be one strategy (invariant in constructor) and hence this should be the value provided by that strategy.
}

@Override
public void onEvent(ClientMetricsEvent<ClientMetricsEvent.EventType> event, long duration, TimeUnit timeUnit,
Throwable throwable, Object value) {
for (PoolLimitDeterminationStrategy strategy : strategies) {
strategy.onEvent(event, duration, timeUnit, throwable, value);
}
}

@Override
public void onCompleted() {
for (PoolLimitDeterminationStrategy strategy : strategies) {
Expand All @@ -80,6 +99,7 @@ public void onError(Throwable e) {
}

@Override
@Deprecated
public void onNext(PoolInsightProvider.PoolStateChangeEvent stateChangeEvent) {
for (PoolLimitDeterminationStrategy strategy : strategies) {
strategy.onNext(stateChangeEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
package io.reactivex.netty.client;

import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.metrics.MetricEventsPublisher;
import rx.Observable;

/**
* A pool of {@link PooledConnection}
*
* @author Nitesh Kant
*/
public interface ConnectionPool<I, O> extends PoolInsightProvider {
@SuppressWarnings("deprecation")
public interface ConnectionPool<I, O> extends PoolInsightProvider,
MetricEventsPublisher<ClientMetricsEvent<ClientMetricsEvent.EventType>> {

Observable<ObservableConnection<I, O>> acquire();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.reactivex.netty.client;

import io.reactivex.netty.channel.RxDefaultThreadFactory;
import io.reactivex.netty.metrics.MetricEventsSubject;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -31,19 +32,22 @@ public class ConnectionPoolBuilder<I, O> {
Executors.newScheduledThreadPool(1, new RxDefaultThreadFactory("global-client-idle-conn-cleanup-scheduler"));

private final RxClient.ServerInfo serverInfo;
private MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject;
private ClientConnectionFactory<I, O, PooledConnection<I, O>> connectionFactory;
private ClientChannelFactory<I, O> channelFactory; // Nullable
private PoolLimitDeterminationStrategy limitDeterminationStrategy = new MaxConnectionsBasedStrategy();
private ScheduledExecutorService poolIdleCleanupScheduler = SHARED_IDLE_CLEANUP_SCHEDULER;
private long idleConnectionsTimeoutMillis = PoolConfig.DEFAULT_CONFIG.getMaxIdleTimeMillis();
private PoolStatsProvider statsProvider = new PoolStatsImpl();

public ConnectionPoolBuilder(RxClient.ServerInfo serverInfo, ClientChannelFactory<I, O> channelFactory) {
this(serverInfo, channelFactory, new PooledConnectionFactory<I, O>(PoolConfig.DEFAULT_CONFIG));
public ConnectionPoolBuilder(RxClient.ServerInfo serverInfo, ClientChannelFactory<I, O> channelFactory,
MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
this(serverInfo, channelFactory, new PooledConnectionFactory<I, O>(PoolConfig.DEFAULT_CONFIG), eventsSubject);
}

public ConnectionPoolBuilder(RxClient.ServerInfo serverInfo, ClientChannelFactory<I, O> channelFactory,
ClientConnectionFactory<I, O, PooledConnection<I, O>> connectionFactory) {
ClientConnectionFactory<I, O, PooledConnection<I, O>> connectionFactory,
MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
if (null == serverInfo) {
throw new NullPointerException("Server info can not be null.");
}
Expand All @@ -53,6 +57,7 @@ public ConnectionPoolBuilder(RxClient.ServerInfo serverInfo, ClientChannelFactor
if (null == connectionFactory) {
throw new NullPointerException("Connection factory can not be null.");
}
this.eventsSubject = eventsSubject;
this.serverInfo = serverInfo;
this.connectionFactory = connectionFactory;
this.channelFactory = channelFactory;
Expand Down Expand Up @@ -115,11 +120,12 @@ public ConnectionPool<I, O> build() {
PoolConfig poolConfig = new PoolConfig(idleConnectionsTimeoutMillis);

return new ConnectionPoolImpl<I, O>(serverInfo, poolConfig, limitDeterminationStrategy, poolIdleCleanupScheduler,
statsProvider, connectionFactory, channelFactory);
statsProvider, connectionFactory, channelFactory, eventsSubject);
}

public ConnectionPoolBuilder<I, O> copy(RxClient.ServerInfo serverInfo) {
ConnectionPoolBuilder<I, O> copy = new ConnectionPoolBuilder<I, O>(serverInfo, channelFactory, connectionFactory);
ConnectionPoolBuilder<I, O> copy = new ConnectionPoolBuilder<I, O>(serverInfo, channelFactory, connectionFactory,
eventsSubject);
copy.withIdleConnectionsTimeoutMillis(idleConnectionsTimeoutMillis)
.withPoolStatsProvider(statsProvider)
.withPoolIdleCleanupScheduler(poolIdleCleanupScheduler)
Expand Down
Loading

0 comments on commit 5951dfb

Please sign in to comment.