Skip to content

Commit

Permalink
Merge pull request #91 from cdapio/feature/ISSUE-99-chunk-flush
Browse files Browse the repository at this point in the history
(ISSUE-99) Support flushing chunks response
  • Loading branch information
chtyim authored Jun 24, 2020
2 parents dd17fad + 0e7716a commit 84be17f
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 9 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
<compilerArgs>
<arg>-Xlint</arg>
</compilerArgs>
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/io/cdap/http/ChunkResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import io.netty.buffer.ByteBuf;

import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A responder for sending chunk-encoded response
*/
public interface ChunkResponder extends Closeable {
public interface ChunkResponder extends Closeable, Flushable {

/**
* Adds a chunk of data to the response. The content will be sent to the client asynchronously.
Expand All @@ -43,6 +44,14 @@ public interface ChunkResponder extends Closeable {
*/
void sendChunk(ByteBuf chunk) throws IOException;

/**
* Flushes all the chunks writen so far to the client asynchronously.
*/
@Override
default void flush() {
// no-op
}

/**
* Closes this responder which signals the end of the chunk response.
*/
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/cdap/http/internal/BasicHttpResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ final class BasicHttpResponder extends AbstractHttpResponder {
private final Channel channel;
private final AtomicBoolean responded;
private final boolean sslEnabled;
private final int chunkMemoryLimit;

BasicHttpResponder(Channel channel, boolean sslEnabled) {
BasicHttpResponder(Channel channel, boolean sslEnabled, int chunkMemoryLimit) {
this.channel = channel;
this.responded = new AtomicBoolean(false);
this.sslEnabled = sslEnabled;
this.chunkMemoryLimit = chunkMemoryLimit;
}

@Override
Expand All @@ -90,7 +92,7 @@ public ChunkResponder sendChunkStart(HttpResponseStatus status, HttpHeaders head

checkNotResponded();
channel.write(response);
return new ChannelChunkResponder(channel);
return new ChannelChunkResponder(channel, chunkMemoryLimit);
}

@Override
Expand Down
32 changes: 29 additions & 3 deletions src/main/java/io/cdap/http/internal/ChannelChunkResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link ChunkResponder} that writes chunks to a {@link Channel}.
Expand All @@ -35,10 +36,14 @@ final class ChannelChunkResponder implements ChunkResponder {

private final Channel channel;
private final AtomicBoolean closed;
private final AtomicLong bufferedSize;
private final int chunkMemoryLimit;

ChannelChunkResponder(Channel channel) {
ChannelChunkResponder(Channel channel, int chunkMemoryLimit) {
this.channel = channel;
this.closed = new AtomicBoolean(false);
this.closed = new AtomicBoolean();
this.bufferedSize = new AtomicLong();
this.chunkMemoryLimit = chunkMemoryLimit;
}

@Override
Expand All @@ -54,14 +59,35 @@ public void sendChunk(ByteBuf chunk) throws IOException {
if (!channel.isActive()) {
throw new IOException("Connection already closed.");
}
int chunkSize = chunk.readableBytes();
channel.write(new DefaultHttpContent(chunk));
tryFlush(chunkSize);
}

@Override
public void close() throws IOException {
public void flush() {
// Use the limit as the size to force a flush
tryFlush(chunkMemoryLimit);
}

@Override
public void close() {
if (!closed.compareAndSet(false, true)) {
return;
}
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}

private void tryFlush(int size) {
long newSize = bufferedSize.addAndGet(size);
if (newSize >= chunkMemoryLimit) {
channel.flush();
// Subtract what were flushed.
// This is correct for single thread.
// For concurrent calls, this provides a lower bound,
// meaning more data might get flushed then being subtracted.
// This make sure we won't go over the memory limit, but might flush more often than needed.
bufferedSize.addAndGet(-1 * newSize);
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/cdap/http/internal/RequestRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
return;
}
HttpRequest request = (HttpRequest) msg;
BasicHttpResponder responder = new BasicHttpResponder(ctx.channel(), sslEnabled);
BasicHttpResponder responder = new BasicHttpResponder(ctx.channel(), sslEnabled, chunkMemoryLimit);

// Reset the methodInfo for the incoming request error handling
methodInfo = null;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/cdap/http/internal/WrappedHttpResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public void sendChunk(ByteBuf chunk) throws IOException {
chunkResponder.sendChunk(chunk);
}

@Override
public void flush() {
chunkResponder.flush();
}

@Override
public void close() throws IOException {
chunkResponder.close();
Expand Down
13 changes: 13 additions & 0 deletions src/test/java/io/cdap/http/HttpServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,19 @@ public void testChunkResponse() throws IOException {
}
}

@Test
public void testLargeChunkResponse() throws IOException {
// Chunk limit for test is 75K, so we request for 150 chunks, each is 1K in length
HttpURLConnection urlConn = request("/test/v1/largeChunk?s=1024&n=150", HttpMethod.GET);
try {
String response = getContent(urlConn);
String expected = String.join("", Collections.nCopies(150 * 1024, "0"));
Assert.assertEquals(expected, response);
} finally {
urlConn.disconnect();
}
}

@Test
public void testStringQueryParam() throws IOException {
// First send without query, for String type, should get defaulted to null.
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/io/cdap/http/TestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -424,6 +425,19 @@ public void chunk(FullHttpRequest request, HttpResponder responder) throws IOExc
chunker.close();
}

@Path("/largeChunk")
@GET
public void largeChunk(HttpRequest request, HttpResponder responder,
@QueryParam("s") int chunkSize,
@QueryParam("n") int count) throws IOException {
String msg = String.join("", Collections.nCopies(chunkSize, "0"));
try (ChunkResponder chunker = responder.sendChunkStart(HttpResponseStatus.OK)) {
for (int i = 0; i < count; i++) {
chunker.sendChunk(StandardCharsets.UTF_8.encode(msg));
}
}
}

@Path("/produceBody")
@GET
public void produceBody(HttpRequest request, HttpResponder responder,
Expand Down

0 comments on commit 84be17f

Please sign in to comment.