Skip to content

Commit

Permalink
Merge pull request Azure#211 from daschult/NonStringRequestResponseBo…
Browse files Browse the repository at this point in the history
…dies

Make HttpRequest take InputStream instead of String body
  • Loading branch information
Dan Schulte authored Sep 8, 2017
2 parents e22d112 + e7f680e commit 05d7a41
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/

package com.microsoft.rest.v2.http;

import java.io.ByteArrayInputStream;
import java.io.InputStream;

/**
* A HTTP request body that contains a byte[].
*/
public class ByteArrayHttpRequestBody implements HttpRequestBody {
private final byte[] contents;

/**
* Create a new ByteArrayHttpRequestBody with the provided byte[].
* @param contents The byte[] to store in this ByteArrayHttpRequestBody.
*/
public ByteArrayHttpRequestBody(byte[] contents) {
this.contents = contents;
}

@Override
public int contentLength() {
return contents.length;
}

@Override
public InputStream createInputStream() {
return new ByteArrayInputStream(contents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class HttpRequest {
private final String httpMethod;
private final String url;
private final HttpHeaders headers = new HttpHeaders();
private String body;
private HttpRequestBody body;
private String mimeType;

/**
Expand Down Expand Up @@ -80,16 +80,37 @@ public HttpHeaders headers() {
* @return This HttpRequest so that multiple operations can be chained together.
*/
public HttpRequest withBody(String body, String mimeType) {
final byte[] bodyBytes = body.getBytes();
return withBody(bodyBytes, mimeType);
}

/**
* Set the body of this HTTP request.
* @param body The body of this HTTP request.
* @param mimeType The MIME type of the body's contents.
* @return This HttpRequest so that multiple operations can be chained together.
*/
public HttpRequest withBody(byte[] body, String mimeType) {
return withBody(new ByteArrayHttpRequestBody(body), mimeType);
}

/**
* Set the body of this HTTP request.
* @param body The body of this HTTP request.
* @param mimeType The MIME type of the body's contents.
* @return This HttpRequest so that multiple operations can be chained together.
*/
public HttpRequest withBody(HttpRequestBody body, String mimeType) {
this.body = body;
this.mimeType = mimeType;
return this;
return withHeader("Content-Length", String.valueOf(body.contentLength()));
}

/**
* Get the body for this HttpRequest.
* @return The body for this HttpRequest.
*/
public String body() {
public HttpRequestBody body() {
return body;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/

package com.microsoft.rest.v2.http;

import java.io.InputStream;

/**
* The body of an HTTP request.
*/
public interface HttpRequestBody {
/**
* The length of this request body in bytes.
* @return The length of this request body in bytes.
*/
int contentLength();

/**
* Create an InputStream that contains the contents of this request body.
* @return An InputStream that contains the contents of this request body.
*/
InputStream createInputStream();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.microsoft.rest.v2.http;

import com.google.common.io.ByteStreams;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.Request;
Expand All @@ -14,6 +15,7 @@
import rx.Single;

import java.io.IOException;
import java.io.InputStream;

/**
* A HttpClient that is implemented using OkHttp.
Expand All @@ -37,25 +39,29 @@ public OkHttpAdapter(okhttp3.OkHttpClient client) {
@Override
public Single<? extends HttpResponse> sendRequestAsync(HttpRequest request) {
RequestBody requestBody = null;
final String requestBodyString = request.body();
if (requestBodyString != null && !requestBodyString.isEmpty()) {
final MediaType mediaType = MediaType.parse(request.mimeType());
requestBody = RequestBody.create(mediaType, requestBodyString);
}

final Request.Builder requestBuilder = new Request.Builder()
.method(request.httpMethod(), requestBody)
.url(request.url());
Single<? extends HttpResponse> result;

for (HttpHeader header : request.headers()) {
requestBuilder.addHeader(header.name(), header.value());
}
final HttpRequestBody body = request.body();
try {
if (body != null) {
final MediaType mediaType = MediaType.parse(request.mimeType());
try (final InputStream bodyStream = body.createInputStream()) {
requestBody = RequestBody.create(mediaType, ByteStreams.toByteArray(bodyStream));
}
}

final Request okhttpRequest = requestBuilder.build();
final Call call = client.newCall(okhttpRequest);
final Request.Builder requestBuilder = new Request.Builder()
.method(request.httpMethod(), requestBody)
.url(request.url());

for (HttpHeader header : request.headers()) {
requestBuilder.addHeader(header.name(), header.value());
}

final Request okhttpRequest = requestBuilder.build();
final Call call = client.newCall(okhttpRequest);

Single<? extends HttpResponse> result;
try {
final Response response = call.execute();
result = Single.just(new OkHttpResponse(response));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import rx.Observable;
import rx.Observer;
import rx.Single;
import rx.functions.Func1;
import rx.observables.SyncOnSubscribe;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -42,52 +47,94 @@ private SSLEngine getSSLEngine(String host) {

@Override
public Single<? extends HttpResponse> sendRequestAsync(HttpRequest request) {
URI uri;
Single<? extends HttpResponse> result;

try {
uri = new URI(request.url());
} catch (URISyntaxException e) {
return Single.error(e);
}
final URI uri = new URI(request.url());

Map<String, Set<Object>> rxnHeaders = new HashMap<>();
for (HttpHeader header : request.headers()) {
rxnHeaders.put(header.name(), Collections.<Object>singleton(header.value()));
}
Map<String, Set<Object>> rxnHeaders = new HashMap<>();
for (HttpHeader header : request.headers()) {
rxnHeaders.put(header.name(), Collections.<Object>singleton(header.value()));
}

String mimeType = request.mimeType();
if (mimeType != null) {
rxnHeaders.put("Content-Type", Collections.<Object>singleton(mimeType));
}
String mimeType = request.mimeType();
if (mimeType != null) {
rxnHeaders.put("Content-Type", Collections.<Object>singleton(mimeType));
}

String body = request.body();
if (body != null) {
rxnHeaders.put("Content-Length", Collections.<Object>singleton(String.valueOf(body.length())));
}
boolean isSecure = "https".equalsIgnoreCase(uri.getScheme());
io.reactivex.netty.protocol.http.client.HttpClient<ByteBuf, ByteBuf> rxnClient =
io.reactivex.netty.protocol.http.client.HttpClient.newClient(uri.getHost(), isSecure ? 443 : 80);

if (isSecure) {
rxnClient = rxnClient.secure(getSSLEngine(uri.getHost()));
}

HttpClientRequest<ByteBuf, ByteBuf> rxnReq = rxnClient
.createRequest(HttpMethod.valueOf(request.httpMethod()), uri.toASCIIString())
.addHeaders(rxnHeaders);

Observable<HttpClientResponse<ByteBuf>> obsResponse = rxnReq;

boolean isSecure = "https".equalsIgnoreCase(uri.getScheme());
io.reactivex.netty.protocol.http.client.HttpClient<ByteBuf, ByteBuf> rxnClient =
io.reactivex.netty.protocol.http.client.HttpClient.newClient(uri.getHost(), isSecure ? 443 : 80);
final HttpRequestBody body = request.body();
if (body != null) {
try (final InputStream bodyStream = body.createInputStream()) {
obsResponse = rxnReq.writeBytesContent(toByteArrayObservable(bodyStream));
}
}

if (isSecure) {
rxnClient = rxnClient.secure(getSSLEngine(uri.getHost()));
result = obsResponse
.map(new Func1<HttpClientResponse<ByteBuf>, HttpResponse>() {
@Override
public HttpResponse call(HttpClientResponse<ByteBuf> rxnRes) {
return new RxNettyResponse(rxnRes);
}
})
.toSingle();
} catch (URISyntaxException | IOException e) {
result = Single.error(e);
}

HttpClientRequest<ByteBuf, ByteBuf> rxnReq = rxnClient
.createRequest(HttpMethod.valueOf(request.httpMethod()), uri.toASCIIString())
.addHeaders(rxnHeaders);
return result;
}

// This InputStream to Observable<byte[]> conversion comes from rxjava-string
// (https://github.com/ReactiveX/RxJavaString). We can't just take a dependency on
// rxjava-string, however, because they require an older version of rxjava (1.1.1).
private static Observable<byte[]> toByteArrayObservable(InputStream inputStream) {
return Observable.create(new OnSubscribeInputStream(inputStream, 8 * 1024));
}

private static final class OnSubscribeInputStream extends SyncOnSubscribe<InputStream, byte[]> {
private final InputStream is;
private final int size;

Observable<HttpClientResponse<ByteBuf>> obsResponse = rxnReq;
if (body != null) {
obsResponse = rxnReq.writeStringContent(Observable.just(body));
OnSubscribeInputStream(InputStream is, int size) {
this.is = is;
this.size = size;
}

return obsResponse
.map(new Func1<HttpClientResponse<ByteBuf>, HttpResponse>() {
@Override
public HttpResponse call(HttpClientResponse<ByteBuf> rxnRes) {
return new RxNettyResponse(rxnRes);
}
})
.toSingle();
@Override
protected InputStream generateState() {
return this.is;
}

@Override
protected InputStream next(InputStream state, Observer<? super byte[]> observer) {
byte[] buffer = new byte[size];
try {
int count = state.read(buffer);
if (count == -1) {
observer.onCompleted();
} else if (count < size) {
observer.onNext(Arrays.copyOf(buffer, count));
} else {
observer.onNext(buffer);
}
} catch (IOException e) {
observer.onError(e);
}
return state;
}
}
}
Loading

0 comments on commit 05d7a41

Please sign in to comment.