Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Content-length optimization #773

Merged
merged 3 commits into from
Jun 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions common/http/src/main/java/io/helidon/common/http/DataChunk.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,7 +66,19 @@ static DataChunk create(byte[] bytes) {
* @return a reusable data chunk with no release callback
*/
static DataChunk create(boolean flush, ByteBuffer data) {
return create(flush, data, Utils.EMPTY_RUNNABLE);
return create(flush, data, Utils.EMPTY_RUNNABLE, false);
}

/**
* Creates a reusable data chunk.
*
* @param flush a signal that chunk should be written and flushed from any cache if possible
* @param data a data chunk. Should not be reused until {@code releaseCallback} is used
* @param readOnly indicates underlying buffer is not reused
* @return a reusable data chunk with no release callback
*/
static DataChunk create(boolean flush, ByteBuffer data, boolean readOnly) {
return create(flush, data, Utils.EMPTY_RUNNABLE, readOnly);
}

/**
Expand All @@ -78,6 +90,19 @@ static DataChunk create(boolean flush, ByteBuffer data) {
* @return a reusable data chunk with a release callback
*/
static DataChunk create(boolean flush, ByteBuffer data, Runnable releaseCallback) {
return create(flush, data, releaseCallback, false);
}

/**
* Creates a reusable data chunk.
*
* @param flush a signal that chunk should be written and flushed from any cache if possible
* @param data a data chunk. Should not be reused until {@code releaseCallback} is used
* @param releaseCallback a callback which is called when this chunk is completely processed and instance is free for reuse
* @param readOnly indicates underlying buffer is not reused
* @return a reusable data chunk with a release callback
*/
static DataChunk create(boolean flush, ByteBuffer data, Runnable releaseCallback, boolean readOnly) {
return new DataChunk() {
private boolean isReleased = false;

Expand All @@ -101,6 +126,11 @@ public void release() {
public boolean isReleased() {
return isReleased;
}

@Override
public boolean isReadOnly() {
return readOnly;
}
};
}

Expand Down Expand Up @@ -194,4 +224,29 @@ default void release() {
default boolean flush() {
return false;
}

/**
* Makes a copy of this data chunk including its underlying {@link ByteBuffer}. This
* may be necessary for caching in case {@link ByteBuffer#rewind()} is called to
* reuse a byte buffer. Note that only the actual bytes used in the data chunk are
* copied, the resulting data chunk's capacity may be less than the original.
*
* @return A copy of this data chunk.
*/
default DataChunk duplicate() {
byte[] bytes = new byte[data().limit()];
DataChunk dup = DataChunk.create(data().get(bytes));
dup.data().position(0);
return dup;
}

/**
* Returns {@code true} if the underlying byte buffer of this chunk is read
* only or {@code false} otherwise.
*
* @return Immutability outcome.
*/
default boolean isReadOnly() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,11 +64,12 @@ public void request(long n) {
return;
}
if (bytes > 0) {
buffer.rewind();
LOGGER.info(buffer.toString());
buffer.flip();
s.onNext(DataChunk.create(buffer));
n--;
}
buffer.rewind();
}
} catch (IOException e) {
s.onError(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,8 @@ private void upload(ServerRequest request, ServerResponse response) {

private void download(ServerRequest request, ServerResponse response) {
LOGGER.info("Entering download ..." + Thread.currentThread());
long length = filePath.toFile().length();
response.headers().add("Content-Length", String.valueOf(length));
response.send(new ServerFileReader(filePath));
LOGGER.info("Exiting download ...");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public Flow.Publisher<DataChunk> apply(byte[] bytes) {
} else {
bs = bytes;
}
DataChunk chunk = DataChunk.create(false, ByteBuffer.wrap(bs));
DataChunk chunk = DataChunk.create(false, ByteBuffer.wrap(bs), true);
return ReactiveStreamsAdapter.publisherToFlow(Mono.just(chunk));
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ public Flow.Publisher<DataChunk> apply(CharSequence s) {
if (s == null || s.length() == 0) {
return ReactiveStreamsAdapter.publisherToFlow(Mono.empty());
}
DataChunk chunk = DataChunk.create(false, charset.encode(s.toString()));
DataChunk chunk = DataChunk.create(false, charset.encode(s.toString()), true);
return ReactiveStreamsAdapter.publisherToFlow(Mono.just(chunk));
}
}
Expand All @@ -220,7 +220,7 @@ public Flow.Publisher<DataChunk> apply(CharBuffer buffer) {
if (buffer == null || buffer.size() == 0) {
return ReactiveStreamsAdapter.publisherToFlow(Mono.empty());
}
final DataChunk chunk = DataChunk.create(false, buffer.encode(charset));
final DataChunk chunk = DataChunk.create(false, buffer.encode(charset), true);
return ReactiveStreamsAdapter.publisherToFlow(Mono.just(chunk));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

class BookResourceTest {

Expand All @@ -53,27 +54,31 @@ void testBooks() {
.request()
.post(Entity.json(getBookAsJson()));
assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
assertNotNull(res.getHeaderString("content-length"));

assertBookStoreSize(1);

res = client.target(getConnectionString("/books/123456"))
.request()
.get();
assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
assertNotNull(res.getHeaderString("content-length"));

assertBookStoreSize(1);

res = client.target(getConnectionString("/books/123456"))
.request()
.put(Entity.json(getBookAsJson()));
assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
assertNotNull(res.getHeaderString("content-length"));

assertBookStoreSize(1);

res = client.target(getConnectionString("/books/123456"))
.request()
.delete();
assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
assertNotNull(res.getHeaderString("content-length"));

assertBookStoreSize(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;

public class MainTest {

private static WebServer webServer;
Expand Down Expand Up @@ -70,6 +72,7 @@ public void testHelloWorld() throws Exception {

conn = getURLConnection("GET","/books");
Assertions.assertEquals(200, conn.getResponseCode(), "HTTP response1");
Assertions.assertNotNull(conn.getHeaderField("content-length"));

conn = getURLConnection("POST","/books");
writeJsonContent(conn, json);
Expand All @@ -81,16 +84,17 @@ public void testHelloWorld() throws Exception {
JsonObject jsonObject = jsonReader.readObject();
Assertions.assertEquals("123456", jsonObject.getString("isbn"),
"Checking if correct ISBN");
Assertions.assertNotNull(conn.getHeaderField("content-length"));

conn = getURLConnection("GET","/books/0000");
Assertions.assertEquals(404, conn.getResponseCode(), "HTTP response GET bad ISBN");

conn = getURLConnection("GET","/books");
Assertions.assertEquals(200, conn.getResponseCode(), "HTTP response list books");
Assertions.assertNotNull(conn.getHeaderField("content-length"));

conn = getURLConnection("DELETE","/books/123456");
Assertions.assertEquals(200, conn.getResponseCode(), "HTTP response delete book");

}

private HttpURLConnection getURLConnection(String method, String path) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ public void remove(String isbn) {
public boolean contains(String isbn) { return store.containsKey(isbn); }

}

Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ void basicTestJsonMP() throws Exception {
runJsonFunctionalTest("mp", "");
}


@Test
void basicTestMetricsHealthSE() throws Exception {
runMetricsAndHealthTest("se", "jsonp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ public void write(int b) throws IOException {

if (contentLength >= 0) {
res.headers().put(Http.Header.CONTENT_LENGTH, String.valueOf(contentLength));
} else {
res.headers().put(Http.Header.TRANSFER_ENCODING, "chunked");
}

for (Map.Entry<String, List<String>> entry : context.getStringHeaders().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class BareResponseImpl implements BareResponse {
private final GenericFutureListener<? extends Future<? super Void>> channelClosedListener;

private volatile Flow.Subscription subscription;
private volatile DataChunk firstChunk;
private volatile DefaultHttpResponse response;
private volatile boolean lengthOptimization;

/**
* @param ctx the channel handler context
Expand Down Expand Up @@ -130,7 +133,7 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
throw new IllegalStateException("Status and headers were already sent");
}

DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, valueOf(status.code()));
response = new DefaultHttpResponse(HTTP_1_1, valueOf(status.code()));
for (Map.Entry<String, List<String>> headerEntry : headers.entrySet()) {
response.headers().add(headerEntry.getKey(), headerEntry.getValue());
}
Expand All @@ -140,24 +143,25 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
.filter(header -> header.startsWith(HTTP_2_HEADER_PREFIX))
.forEach(header -> response.headers().add(header, requestHeaders.get(header)));

// Set chunked if length not set, may switch to length later
boolean lengthSet = HttpUtil.isContentLengthSet(response);
if (!lengthSet) {
lengthOptimization = status.code() == Http.Status.OK_200.code()
&& !HttpUtil.isTransferEncodingChunked(response);
HttpUtil.setTransferEncodingChunked(response, true);
}

// Add keep alive header as per:
// http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
if (keepAlive) {
if (status.code() != Http.Status.NO_CONTENT_204.code()) {
HttpUtil.setTransferEncodingChunked(response, true);
}
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}

LOGGER.finest(() -> log("Writing headers: " + status));
ctx.writeAndFlush(response)
.addListener(future -> {
if (future.isSuccess()) {
headersFuture.complete(this);
}
})
.addListener(completeOnFailureListener("An exception occurred when writing headers."))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// Content length optimization attempt
if (!lengthOptimization) {
LOGGER.finest(() -> log("Writing headers: " + status));
initWriteResponse();
}
}

/**
Expand Down Expand Up @@ -209,7 +213,21 @@ private void completeInternal(Throwable throwable) {
}
}

/**
* Write last HTTP content. If length optimization is active and a first chunk is cached,
* switch content encoding and write response.
*
* @param throwable A throwable.
* @param closeAction Close action listener.
*/
private void writeLastContent(final Throwable throwable, final ChannelFutureListener closeAction) {
if (lengthOptimization) {
if (firstChunk != null) {
HttpUtil.setTransferEncodingChunked(response, false);
HttpUtil.setContentLength(response, firstChunk.data().limit());
}
initWriteResponse();
}
ctx.writeAndFlush(LAST_HTTP_CONTENT)
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(completeOnSuccessListener(throwable))
Expand Down Expand Up @@ -245,7 +263,42 @@ public void onNext(DataChunk data) {
throw new IllegalStateException("Response is already closed!");
}
if (data != null) {
if (lengthOptimization) {
if (firstChunk == null) {
firstChunk = data.isReadOnly() ? data : data.duplicate(); // cache first chunk
return;
}
initWriteResponse();
}
sendData(data);
}
}

/**
* Initiates write of response and sends first chunk if available.
*
* @return Future of response or first chunk.
*/
private ChannelFuture initWriteResponse() {
ChannelFuture cf = ctx.write(response)
.addListener(future -> {
if (future.isSuccess()) {
headersFuture.complete(this);
}
})
.addListener(completeOnFailureListener("An exception occurred when writing headers."))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
response = null;
if (firstChunk != null) {
cf = sendData(firstChunk);
firstChunk.release();
firstChunk = null;
}
lengthOptimization = false;
return cf;
}

private ChannelFuture sendData(DataChunk data) {
LOGGER.finest(() -> log("Sending data chunk"));

DefaultHttpContent httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(data.data()));
Expand All @@ -259,15 +312,13 @@ public void onNext(DataChunk data) {
channelFuture = ctx.write(httpContent);
}

channelFuture
return channelFuture
.addListener(future -> {
data.release();
LOGGER.finest(() -> log("Data chunk sent with result: " + future.isSuccess()));
})
.addListener(completeOnFailureListener("Failure when sending a content!"))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

}
}

private String log(String s) {
Expand Down
Loading