Skip to content

Commit

Permalink
Add option to disable chunked transfer-encoding (#3864)
Browse files Browse the repository at this point in the history
* fix for bug: #3640

Signed-off-by: Jitendra Kumar <[email protected]>

* fix for bug: #3640

Signed-off-by: Jitendra Kumar <[email protected]>

* fix for bug: #3640

Signed-off-by: Jitendra Kumar <[email protected]>

* fix for bug: #3640

Signed-off-by: Jitendra Kumar <[email protected]>

* adding chunk support for non-compressed request

Signed-off-by: Jitendra Kumar <[email protected]>

* Diable chunked transfer encoding in integration tests.

Signed-off-by: dblock <[email protected]>

* Replace chunkedTransferEncodingEnabled with chunkedEnabled.

Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>

* Make chunkedEnabled optional.

Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>

* Remove Optional<Boolean> constructor.

Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>

* Remove optionals from constructors.

Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>

* Use Optional.empty()

Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>

* Use a mode idiomatic syntax in isChunked.

Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>

Co-authored-by: Jitendra Kumar <[email protected]>
  • Loading branch information
dblock and kumjiten authored Jul 13, 2022
1 parent 2763e80 commit f7894f7
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 22 deletions.
142 changes: 130 additions & 12 deletions client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.entity.HttpEntityWrapper;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -131,6 +132,29 @@ public class RestClient implements Closeable {
private volatile NodeTuple<List<Node>> nodeTuple;
private final WarningsHandler warningsHandler;
private final boolean compressionEnabled;
private final Optional<Boolean> chunkedEnabled;

RestClient(
CloseableHttpAsyncClient client,
Header[] defaultHeaders,
List<Node> nodes,
String pathPrefix,
FailureListener failureListener,
NodeSelector nodeSelector,
boolean strictDeprecationMode,
boolean compressionEnabled,
boolean chunkedEnabled
) {
this.client = client;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
this.nodeSelector = nodeSelector;
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
this.compressionEnabled = compressionEnabled;
this.chunkedEnabled = Optional.of(chunkedEnabled);
setNodes(nodes);
}

RestClient(
CloseableHttpAsyncClient client,
Expand All @@ -149,6 +173,7 @@ public class RestClient implements Closeable {
this.nodeSelector = nodeSelector;
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
this.compressionEnabled = compressionEnabled;
this.chunkedEnabled = Optional.empty();
setNodes(nodes);
}

Expand Down Expand Up @@ -583,36 +608,42 @@ private static void addSuppressedException(Exception suppressedException, Except
}
}

private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) {
private HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
switch (method.toUpperCase(Locale.ROOT)) {
case HttpDeleteWithEntity.METHOD_NAME:
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled);
return addRequestBody(new HttpDeleteWithEntity(uri), entity);
case HttpGetWithEntity.METHOD_NAME:
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled);
return addRequestBody(new HttpGetWithEntity(uri), entity);
case HttpHead.METHOD_NAME:
return addRequestBody(new HttpHead(uri), entity, compressionEnabled);
return addRequestBody(new HttpHead(uri), entity);
case HttpOptions.METHOD_NAME:
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled);
return addRequestBody(new HttpOptions(uri), entity);
case HttpPatch.METHOD_NAME:
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled);
return addRequestBody(new HttpPatch(uri), entity);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity, compressionEnabled);
addRequestBody(httpPost, entity);
return httpPost;
case HttpPut.METHOD_NAME:
return addRequestBody(new HttpPut(uri), entity, compressionEnabled);
return addRequestBody(new HttpPut(uri), entity);
case HttpTrace.METHOD_NAME:
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled);
return addRequestBody(new HttpTrace(uri), entity);
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}

private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) {
private HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) {
if (entity != null) {
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
if (compressionEnabled) {
entity = new ContentCompressingEntity(entity);
if (chunkedEnabled.isPresent()) {
entity = new ContentCompressingEntity(entity, chunkedEnabled.get());
} else {
entity = new ContentCompressingEntity(entity);
}
} else if (chunkedEnabled.isPresent()) {
entity = new ContentHttpEntity(entity, chunkedEnabled.get());
}
((HttpEntityEnclosingRequestBase) httpRequest).setEntity(entity);
} else {
Expand Down Expand Up @@ -782,7 +813,7 @@ private class InternalRequest {
String ignoreString = params.remove("ignore");
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
this.cancellable = Cancellable.fromRequest(httpRequest);
setHeaders(httpRequest, request.getOptions().getHeaders());
setRequestConfig(httpRequest, request.getOptions().getRequestConfig());
Expand Down Expand Up @@ -936,6 +967,7 @@ private static Exception extractAndWrapCause(Exception exception) {
* A gzip compressing entity that also implements {@code getContent()}.
*/
public static class ContentCompressingEntity extends GzipCompressingEntity {
private Optional<Boolean> chunkedEnabled;

/**
* Creates a {@link ContentCompressingEntity} instance with the provided HTTP entity.
Expand All @@ -944,6 +976,18 @@ public static class ContentCompressingEntity extends GzipCompressingEntity {
*/
public ContentCompressingEntity(HttpEntity entity) {
super(entity);
this.chunkedEnabled = Optional.empty();
}

/**
* Creates a {@link ContentCompressingEntity} instance with the provided HTTP entity.
*
* @param entity the HTTP entity.
* @param chunkedEnabled force enable/disable chunked transfer-encoding.
*/
public ContentCompressingEntity(HttpEntity entity, boolean chunkedEnabled) {
super(entity);
this.chunkedEnabled = Optional.of(chunkedEnabled);
}

@Override
Expand All @@ -954,6 +998,80 @@ public InputStream getContent() throws IOException {
}
return out.asInput();
}

/**
* A gzip compressing entity doesn't work with chunked encoding with sigv4
*
* @return false
*/
@Override
public boolean isChunked() {
return chunkedEnabled.orElseGet(super::isChunked);
}

/**
* A gzip entity requires content length in http headers.
*
* @return content length of gzip entity
*/
@Override
public long getContentLength() {
if (chunkedEnabled.isPresent()) {
if (chunkedEnabled.get()) {
return -1L;
} else {
long size;
try (InputStream is = getContent()) {
size = is.readAllBytes().length;
} catch (IOException ex) {
size = -1L;
}

return size;
}
} else {
return super.getContentLength();
}
}
}

/**
* An entity that lets the caller specify the return value of {@code isChunked()}.
*/
public static class ContentHttpEntity extends HttpEntityWrapper {
private Optional<Boolean> chunkedEnabled;

/**
* Creates a {@link ContentHttpEntity} instance with the provided HTTP entity.
*
* @param entity the HTTP entity.
*/
public ContentHttpEntity(HttpEntity entity) {
super(entity);
this.chunkedEnabled = Optional.empty();
}

/**
* Creates a {@link ContentHttpEntity} instance with the provided HTTP entity.
*
* @param entity the HTTP entity.
* @param chunkedEnabled force enable/disable chunked transfer-encoding.
*/
public ContentHttpEntity(HttpEntity entity, boolean chunkedEnabled) {
super(entity);
this.chunkedEnabled = Optional.of(chunkedEnabled);
}

/**
* A chunked entity requires transfer-encoding:chunked in http headers
* which requires isChunked to be true
*
* @return true
*/
@Override
public boolean isChunked() {
return chunkedEnabled.orElseGet(super::isChunked);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* Helps creating a new {@link RestClient}. Allows to set the most common http client configuration options when internally
Expand Down Expand Up @@ -84,6 +85,7 @@ public final class RestClientBuilder {
private NodeSelector nodeSelector = NodeSelector.ANY;
private boolean strictDeprecationMode = false;
private boolean compressionEnabled = false;
private Optional<Boolean> chunkedEnabled;

/**
* Creates a new builder instance and sets the hosts that the client will send requests to.
Expand All @@ -100,6 +102,7 @@ public final class RestClientBuilder {
}
}
this.nodes = nodes;
this.chunkedEnabled = Optional.empty();
}

/**
Expand Down Expand Up @@ -238,6 +241,16 @@ public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) {
return this;
}

/**
* Whether the REST client should use Transfer-Encoding: chunked for requests or not"
*
* @param chunkedEnabled force enable/disable chunked transfer-encoding.
*/
public RestClientBuilder setChunkedEnabled(boolean chunkedEnabled) {
this.chunkedEnabled = Optional.of(chunkedEnabled);
return this;
}

/**
* Creates a new {@link RestClient} based on the provided configuration.
*/
Expand All @@ -248,16 +261,34 @@ public RestClient build() {
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient
);
RestClient restClient = new RestClient(
httpClient,
defaultHeaders,
nodes,
pathPrefix,
failureListener,
nodeSelector,
strictDeprecationMode,
compressionEnabled
);

RestClient restClient = null;

if (chunkedEnabled.isPresent()) {
restClient = new RestClient(
httpClient,
defaultHeaders,
nodes,
pathPrefix,
failureListener,
nodeSelector,
strictDeprecationMode,
compressionEnabled,
chunkedEnabled.get()
);
} else {
restClient = new RestClient(
httpClient,
defaultHeaders,
nodes,
pathPrefix,
failureListener,
nodeSelector,
strictDeprecationMode,
compressionEnabled
);
}

httpClient.start();
return restClient;
}
Expand Down
Loading

0 comments on commit f7894f7

Please sign in to comment.