Skip to content

Commit

Permalink
WIP: Content-length optimization (#773)
Browse files Browse the repository at this point in the history
* Completed content-length optimization. Added new flag to DataChunk to mark them as immutable. Updated and created some new tests.

Signed-off-by: Santiago Pericas-Geertsen <[email protected]>

* Renamed DataChunk property.

Signed-off-by: Santiago Pericas-Geertsen <[email protected]>

* Renamed DataChunk property.

Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas authored Jun 12, 2019
1 parent 9f3cdf9 commit 4c3d6ce
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 37 deletions.
59 changes: 57 additions & 2 deletions common/http/src/main/java/io/helidon/common/http/DataChunk.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,7 +66,19 @@ static DataChunk create(byte[] bytes) {
* @return a reusable data chunk with no release callback
*/
static DataChunk create(boolean flush, ByteBuffer data) {
return create(flush, data, Utils.EMPTY_RUNNABLE);
return create(flush, data, Utils.EMPTY_RUNNABLE, false);
}

/**
* Creates a reusable data chunk.
*
* @param flush a signal that chunk should be written and flushed from any cache if possible
* @param data a data chunk. Should not be reused until {@code releaseCallback} is used
* @param readOnly indicates underlying buffer is not reused
* @return a reusable data chunk with no release callback
*/
static DataChunk create(boolean flush, ByteBuffer data, boolean readOnly) {
return create(flush, data, Utils.EMPTY_RUNNABLE, readOnly);
}

/**
Expand All @@ -78,6 +90,19 @@ static DataChunk create(boolean flush, ByteBuffer data) {
* @return a reusable data chunk with a release callback
*/
static DataChunk create(boolean flush, ByteBuffer data, Runnable releaseCallback) {
return create(flush, data, releaseCallback, false);
}

/**
* Creates a reusable data chunk.
*
* @param flush a signal that chunk should be written and flushed from any cache if possible
* @param data a data chunk. Should not be reused until {@code releaseCallback} is used
* @param releaseCallback a callback which is called when this chunk is completely processed and instance is free for reuse
* @param readOnly indicates underlying buffer is not reused
* @return a reusable data chunk with a release callback
*/
static DataChunk create(boolean flush, ByteBuffer data, Runnable releaseCallback, boolean readOnly) {
return new DataChunk() {
private boolean isReleased = false;

Expand All @@ -101,6 +126,11 @@ public void release() {
public boolean isReleased() {
return isReleased;
}

@Override
public boolean isReadOnly() {
return readOnly;
}
};
}

Expand Down Expand Up @@ -194,4 +224,29 @@ default void release() {
default boolean flush() {
return false;
}

/**
* Makes a copy of this data chunk including its underlying {@link ByteBuffer}. This
* may be necessary for caching in case {@link ByteBuffer#rewind()} is called to
* reuse a byte buffer. Note that only the actual bytes used in the data chunk are
* copied, the resulting data chunk's capacity may be less than the original.
*
* @return A copy of this data chunk.
*/
default DataChunk duplicate() {
byte[] bytes = new byte[data().limit()];
DataChunk dup = DataChunk.create(data().get(bytes));
dup.data().position(0);
return dup;
}

/**
* Returns {@code true} if the underlying byte buffer of this chunk is read
* only or {@code false} otherwise.
*
* @return Immutability outcome.
*/
default boolean isReadOnly() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,11 +64,12 @@ public void request(long n) {
return;
}
if (bytes > 0) {
buffer.rewind();
LOGGER.info(buffer.toString());
buffer.flip();
s.onNext(DataChunk.create(buffer));
n--;
}
buffer.rewind();
}
} catch (IOException e) {
s.onError(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,8 @@ private void upload(ServerRequest request, ServerResponse response) {

private void download(ServerRequest request, ServerResponse response) {
LOGGER.info("Entering download ..." + Thread.currentThread());
long length = filePath.toFile().length();
response.headers().add("Content-Length", String.valueOf(length));
response.send(new ServerFileReader(filePath));
LOGGER.info("Exiting download ...");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public Flow.Publisher<DataChunk> apply(byte[] bytes) {
} else {
bs = bytes;
}
DataChunk chunk = DataChunk.create(false, ByteBuffer.wrap(bs));
DataChunk chunk = DataChunk.create(false, ByteBuffer.wrap(bs), true);
return ReactiveStreamsAdapter.publisherToFlow(Mono.just(chunk));
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ public Flow.Publisher<DataChunk> apply(CharSequence s) {
if (s == null || s.length() == 0) {
return ReactiveStreamsAdapter.publisherToFlow(Mono.empty());
}
DataChunk chunk = DataChunk.create(false, charset.encode(s.toString()));
DataChunk chunk = DataChunk.create(false, charset.encode(s.toString()), true);
return ReactiveStreamsAdapter.publisherToFlow(Mono.just(chunk));
}
}
Expand All @@ -220,7 +220,7 @@ public Flow.Publisher<DataChunk> apply(CharBuffer buffer) {
if (buffer == null || buffer.size() == 0) {
return ReactiveStreamsAdapter.publisherToFlow(Mono.empty());
}
final DataChunk chunk = DataChunk.create(false, buffer.encode(charset));
final DataChunk chunk = DataChunk.create(false, buffer.encode(charset), true);
return ReactiveStreamsAdapter.publisherToFlow(Mono.just(chunk));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

class BookResourceTest {

Expand All @@ -53,27 +54,31 @@ void testBooks() {
.request()
.post(Entity.json(getBookAsJson()));
assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
assertNotNull(res.getHeaderString("content-length"));

assertBookStoreSize(1);

res = client.target(getConnectionString("/books/123456"))
.request()
.get();
assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
assertNotNull(res.getHeaderString("content-length"));

assertBookStoreSize(1);

res = client.target(getConnectionString("/books/123456"))
.request()
.put(Entity.json(getBookAsJson()));
assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
assertNotNull(res.getHeaderString("content-length"));

assertBookStoreSize(1);

res = client.target(getConnectionString("/books/123456"))
.request()
.delete();
assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
assertNotNull(res.getHeaderString("content-length"));

assertBookStoreSize(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;

public class MainTest {

private static WebServer webServer;
Expand Down Expand Up @@ -70,6 +72,7 @@ public void testHelloWorld() throws Exception {

conn = getURLConnection("GET","/books");
Assertions.assertEquals(200, conn.getResponseCode(), "HTTP response1");
Assertions.assertNotNull(conn.getHeaderField("content-length"));

conn = getURLConnection("POST","/books");
writeJsonContent(conn, json);
Expand All @@ -81,16 +84,17 @@ public void testHelloWorld() throws Exception {
JsonObject jsonObject = jsonReader.readObject();
Assertions.assertEquals("123456", jsonObject.getString("isbn"),
"Checking if correct ISBN");
Assertions.assertNotNull(conn.getHeaderField("content-length"));

conn = getURLConnection("GET","/books/0000");
Assertions.assertEquals(404, conn.getResponseCode(), "HTTP response GET bad ISBN");

conn = getURLConnection("GET","/books");
Assertions.assertEquals(200, conn.getResponseCode(), "HTTP response list books");
Assertions.assertNotNull(conn.getHeaderField("content-length"));

conn = getURLConnection("DELETE","/books/123456");
Assertions.assertEquals(200, conn.getResponseCode(), "HTTP response delete book");

}

private HttpURLConnection getURLConnection(String method, String path) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ public void remove(String isbn) {
public boolean contains(String isbn) { return store.containsKey(isbn); }

}

Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ void basicTestJsonMP() throws Exception {
runJsonFunctionalTest("mp", "");
}


@Test
void basicTestMetricsHealthSE() throws Exception {
runMetricsAndHealthTest("se", "jsonp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ public void write(int b) throws IOException {

if (contentLength >= 0) {
res.headers().put(Http.Header.CONTENT_LENGTH, String.valueOf(contentLength));
} else {
res.headers().put(Http.Header.TRANSFER_ENCODING, "chunked");
}

for (Map.Entry<String, List<String>> entry : context.getStringHeaders().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class BareResponseImpl implements BareResponse {
private final GenericFutureListener<? extends Future<? super Void>> channelClosedListener;

private volatile Flow.Subscription subscription;
private volatile DataChunk firstChunk;
private volatile DefaultHttpResponse response;
private volatile boolean lengthOptimization;

/**
* @param ctx the channel handler context
Expand Down Expand Up @@ -130,7 +133,7 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
throw new IllegalStateException("Status and headers were already sent");
}

DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, valueOf(status.code()));
response = new DefaultHttpResponse(HTTP_1_1, valueOf(status.code()));
for (Map.Entry<String, List<String>> headerEntry : headers.entrySet()) {
response.headers().add(headerEntry.getKey(), headerEntry.getValue());
}
Expand All @@ -140,24 +143,25 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
.filter(header -> header.startsWith(HTTP_2_HEADER_PREFIX))
.forEach(header -> response.headers().add(header, requestHeaders.get(header)));

// Set chunked if length not set, may switch to length later
boolean lengthSet = HttpUtil.isContentLengthSet(response);
if (!lengthSet) {
lengthOptimization = status.code() == Http.Status.OK_200.code()
&& !HttpUtil.isTransferEncodingChunked(response);
HttpUtil.setTransferEncodingChunked(response, true);
}

// Add keep alive header as per:
// http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
if (keepAlive) {
if (status.code() != Http.Status.NO_CONTENT_204.code()) {
HttpUtil.setTransferEncodingChunked(response, true);
}
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}

LOGGER.finest(() -> log("Writing headers: " + status));
ctx.writeAndFlush(response)
.addListener(future -> {
if (future.isSuccess()) {
headersFuture.complete(this);
}
})
.addListener(completeOnFailureListener("An exception occurred when writing headers."))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// Content length optimization attempt
if (!lengthOptimization) {
LOGGER.finest(() -> log("Writing headers: " + status));
initWriteResponse();
}
}

/**
Expand Down Expand Up @@ -209,7 +213,21 @@ private void completeInternal(Throwable throwable) {
}
}

/**
* Write last HTTP content. If length optimization is active and a first chunk is cached,
* switch content encoding and write response.
*
* @param throwable A throwable.
* @param closeAction Close action listener.
*/
private void writeLastContent(final Throwable throwable, final ChannelFutureListener closeAction) {
if (lengthOptimization) {
if (firstChunk != null) {
HttpUtil.setTransferEncodingChunked(response, false);
HttpUtil.setContentLength(response, firstChunk.data().limit());
}
initWriteResponse();
}
ctx.writeAndFlush(LAST_HTTP_CONTENT)
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(completeOnSuccessListener(throwable))
Expand Down Expand Up @@ -245,7 +263,42 @@ public void onNext(DataChunk data) {
throw new IllegalStateException("Response is already closed!");
}
if (data != null) {
if (lengthOptimization) {
if (firstChunk == null) {
firstChunk = data.isReadOnly() ? data : data.duplicate(); // cache first chunk
return;
}
initWriteResponse();
}
sendData(data);
}
}

/**
* Initiates write of response and sends first chunk if available.
*
* @return Future of response or first chunk.
*/
private ChannelFuture initWriteResponse() {
ChannelFuture cf = ctx.write(response)
.addListener(future -> {
if (future.isSuccess()) {
headersFuture.complete(this);
}
})
.addListener(completeOnFailureListener("An exception occurred when writing headers."))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
response = null;
if (firstChunk != null) {
cf = sendData(firstChunk);
firstChunk.release();
firstChunk = null;
}
lengthOptimization = false;
return cf;
}

private ChannelFuture sendData(DataChunk data) {
LOGGER.finest(() -> log("Sending data chunk"));

DefaultHttpContent httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(data.data()));
Expand All @@ -259,15 +312,13 @@ public void onNext(DataChunk data) {
channelFuture = ctx.write(httpContent);
}

channelFuture
return channelFuture
.addListener(future -> {
data.release();
LOGGER.finest(() -> log("Data chunk sent with result: " + future.isSuccess()));
})
.addListener(completeOnFailureListener("Failure when sending a content!"))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

}
}

private String log(String s) {
Expand Down
Loading

0 comments on commit 4c3d6ce

Please sign in to comment.