Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Netflix/RxNetty into metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitesh Kant committed Jun 4, 2014
2 parents 3abdab8 + 56588db commit 80498bc
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,30 @@ public final class RxContexts {

public static final ThreadLocalRequestCorrelator DEFAULT_CORRELATOR = new ThreadLocalRequestCorrelator();

private static String defaultRequestIdContextKeyName = "X-RXNETTY-REQUEST-ID";

private RxContexts() {
}

public static <I, O> HttpServerBuilder<I, O> newHttpServerBuilder(int port, RequestHandler<I, O> requestHandler,
RequestCorrelator correlator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(defaultRequestIdContextKeyName, correlator);
return newHttpServerBuilder(port, requestHandler, provider, correlator);
}

public static <I, O> HttpServerBuilder<I, O> newHttpServerBuilder(int port, RequestHandler<I, O> requestHandler,
String requestIdHeaderName,
RequestCorrelator correlator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(requestIdHeaderName, correlator);
return newHttpServerBuilder(port, requestHandler, provider, correlator);
}

public static <I, O> HttpClientBuilder<I, O> newHttpClientBuilder(String host, int port,
RequestCorrelator correlator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(defaultRequestIdContextKeyName, correlator);
return newHttpClientBuilder(host, port, provider, correlator);
}

public static <I, O> HttpClientBuilder<I, O> newHttpClientBuilder(String host, int port, String requestIdHeaderName,
RequestCorrelator correlator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(requestIdHeaderName, correlator);
Expand All @@ -71,21 +85,40 @@ public static <I, O> HttpClientBuilder<I, O> newHttpClientBuilder(String host, i
RequestIdProvider provider,
RequestCorrelator correlator) {
HttpClientBuilder<I, O> builder = RxNetty.newHttpClientBuilder(host, port);
return builder.pipelineConfigurator(ContextPipelineConfigurators.<I, O>httpClientConfigurator(provider, correlator))
return builder.pipelineConfigurator(ContextPipelineConfigurators.<I, O>httpClientConfigurator(provider,
correlator))
.withChannelFactory(new HttpContextClientChannelFactory<I, O>(builder.getBootstrap(),
correlator));
}

public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, RequestHandler<ByteBuf, ByteBuf> requestHandler) {
return newHttpServerBuilder(port, requestHandler, defaultRequestIdContextKeyName,
DEFAULT_CORRELATOR).build();
}

public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, RequestHandler<ByteBuf, ByteBuf> requestHandler,
String requestIdHeaderName) {
return newHttpServerBuilder(port, requestHandler, requestIdHeaderName, DEFAULT_CORRELATOR).build();
}

public static HttpClient<ByteBuf, ByteBuf> createHttpClient(String host, int port) {
return RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port, defaultRequestIdContextKeyName,
DEFAULT_CORRELATOR).build();
}

public static HttpClient<ByteBuf, ByteBuf> createHttpClient(String host, int port, String requestIdHeaderName) {
return RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port, requestIdHeaderName,
DEFAULT_CORRELATOR).build();
}

public static <I, O> HttpServer<I, O> createHttpServer(int port, RequestHandler<I, O> requestHandler,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> configurator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(defaultRequestIdContextKeyName, DEFAULT_CORRELATOR);
return newHttpServerBuilder(port, requestHandler, defaultRequestIdContextKeyName, DEFAULT_CORRELATOR)
.pipelineConfigurator(ContextPipelineConfigurators
.httpServerConfigurator(provider, DEFAULT_CORRELATOR, configurator)).build();
}

public static <I, O> HttpServer<I, O> createHttpServer(int port, RequestHandler<I, O> requestHandler,
String requestIdHeaderName,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> configurator) {
Expand All @@ -95,6 +128,17 @@ public static <I, O> HttpServer<I, O> createHttpServer(int port, RequestHandler<
.httpServerConfigurator(provider, DEFAULT_CORRELATOR, configurator)).build();
}

public static <I, O> HttpClient<I, O> createHttpClient(String host, int port,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> configurator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(defaultRequestIdContextKeyName, DEFAULT_CORRELATOR);
return RxContexts.<I, O>newHttpClientBuilder(host, port, defaultRequestIdContextKeyName,
DEFAULT_CORRELATOR)
.pipelineConfigurator(ContextPipelineConfigurators.httpClientConfigurator(provider,
DEFAULT_CORRELATOR,
configurator))
.build();
}

public static <I, O> HttpClient<I, O> createHttpClient(String host, int port, String requestIdHeaderName,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> configurator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(requestIdHeaderName, DEFAULT_CORRELATOR);
Expand All @@ -104,4 +148,15 @@ public static <I, O> HttpClient<I, O> createHttpClient(String host, int port, St
configurator))
.build();
}

/**
* Default Context key name used for extracting the request Id. This is the default and will be useful to set a
* system wide requestId key name so that it is consistent between all clients and the server created through
* this factory class.
*
* @param name The name of the context key to be used as default.
*/
public static void useRequestIdContextKey(String name) {
defaultRequestIdContextKeyName = name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.reactivex.netty.serialization.ByteTransformer;
import io.reactivex.netty.serialization.ContentTransformer;

import java.nio.charset.Charset;

Expand Down Expand Up @@ -119,6 +120,10 @@ public HttpClientRequest<T> withRawContentSource(final RawContentSource<?> rawCo
setRawContentFactory(new SimpleContentSourceFactory(rawContentSource));
return this;
}

public HttpClientRequest<T> withRawContent(T content, ContentTransformer<T> transformer) {
return withRawContentSource(new SingletonRawSource<T>(content, transformer));
}

public HttpClientRequest<T> withContent(T content) {
setContentFactory(new SimpleContentSourceFactory<T, ContentSource<T>>(new ContentSource.SingletonSource<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void tearDown() throws Exception {
clientBootstrap.group().shutdownGracefully();
}
if (null != server) {
serverConnHandler.closeNewConnectionsOnReceive(false); // reset state after test. Close New should be explicit.
try {
serverConnHandler.closeAllClientConnections();
} catch (IllegalStateException e) {
Expand Down Expand Up @@ -342,15 +343,19 @@ public void testPoolExhaustion() throws Exception {

@Test
public void testIdleCleanupThread() throws Exception {
serverConnHandler.closeNewConnectionsOnReceive(false);
pool.shutdown();
pool = new ConnectionPoolImpl<String, String>(serverInfo, PoolConfig.DEFAULT_CONFIG, strategy, Executors.newScheduledThreadPool(1),
pool = new ConnectionPoolImpl<String, String>(serverInfo, PoolConfig.DEFAULT_CONFIG, strategy,
Executors.newScheduledThreadPool(1),
new PoolStatsImpl(), factory);

stats = pool.getStats();

ObservableConnection<String, String> connection = acquireAndTestStats();
connection.close();

serverConnHandler.closeAllClientConnections();

waitForClose();

assertAllConnectionsReturned();
Expand Down

0 comments on commit 80498bc

Please sign in to comment.