diff --git a/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java b/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java index 47a123d2607b5..ec85aac59eb26 100644 --- a/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java +++ b/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java @@ -115,8 +115,15 @@ import io.reactivex.netty.channel.SingleNioLoopProvider; import io.reactivex.netty.client.RxClient; import io.reactivex.netty.pipeline.ssl.SSLEngineFactory; +import io.reactivex.netty.pipeline.PipelineConfigurator; +import io.reactivex.netty.pipeline.PipelineConfiguratorComposite; +import io.reactivex.netty.protocol.http.HttpObjectAggregationConfigurator; import io.reactivex.netty.protocol.http.client.CompositeHttpClient; import io.reactivex.netty.protocol.http.client.CompositeHttpClientBuilder; +import io.reactivex.netty.protocol.http.client.HttpClientPipelineConfigurator; +import io.reactivex.netty.protocol.http.client.HttpClientRequest; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; + import rx.Observable; import rx.Single; import rx.functions.Func1; @@ -231,13 +238,22 @@ public SSLEngine createSSLEngine(ByteBufAllocator allocator) { CompositeHttpClientBuilder builder = new CompositeHttpClientBuilder() .withSslEngineFactory(new DefaultSSLEngineFactory()) .withMaxConnections(connectionPolicy.getMaxPoolSize()) - .withIdleConnectionsTimeoutMillis(this.connectionPolicy.getIdleConnectionTimeoutInMillis()); + .withIdleConnectionsTimeoutMillis(this.connectionPolicy.getIdleConnectionTimeoutInMillis()) + .pipelineConfigurator(createClientPipelineConfigurator()); RxClient.ClientConfig config = new RxClient.ClientConfig.Builder() .readTimeout(connectionPolicy.getRequestTimeoutInMillis(), TimeUnit.MILLISECONDS).build(); return builder.config(config); } + private PipelineConfigurator createClientPipelineConfigurator() { + PipelineConfigurator clientPipelineConfigurator = new PipelineConfiguratorComposite, + HttpClientRequest>(new HttpClientPipelineConfigurator + (8192, 32768, 8182, true ), + new HttpObjectAggregationConfigurator()); + return clientPipelineConfigurator; + } + @Override public URI getServiceEndpoint() { return this.serviceEndpoint;