Skip to content

Commit

Permalink
PathProgressBodyHandler does not return immediately (#9222)
Browse files Browse the repository at this point in the history
Fixes downloading of files. It used to fail because it returned almost immediately. Also fixes progress reporting when fetching a String.

# Important Notes
Tested by removing appropriate engine and runtime from `$HOME/.local/share/enso` and with manually running:
```
java -jar launcher.jar --launcher-log-level trace install engine 2024.1.1-nightly.2024.2.29
```
  • Loading branch information
Akirathan authored Feb 29, 2024
1 parent c44b7f2 commit 53c1b3f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import org.enso.cli.task.TaskProgressImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;

/** A {@link HttpResponse} body handler for {@link Path} that keeps track of the progress. */
class PathProgressBodyHandler implements HttpResponse.BodyHandler<Path> {
private static final Logger LOGGER = LoggerFactory.getLogger(PathProgressBodyHandler.class);

private final Path destination;
private final TaskProgressImplementation<Path> progress;
private Long total;
Expand All @@ -41,13 +45,15 @@ public BodySubscriber<Path> apply(ResponseInfo responseInfo) {
var reportedLenOpt = responseInfo.headers().firstValueAsLong("Content-Length");
if (reportedLenOpt.isPresent()) {
total = reportedLenOpt.getAsLong();
LOGGER.debug("Content-Length: {}", total);
}
}
if (total != null) {
progress.reportProgress(0, Some.apply(total));
} else {
progress.reportProgress(0, Option.empty());
}
LOGGER.debug("Initializing download into {}, estimated total is {}", destination, total);
WritableByteChannel destChannel;
try {
destChannel =
Expand All @@ -62,6 +68,7 @@ private class ProgressSubscriber implements BodySubscriber<Path> {
private Flow.Subscription subscription;
private final WritableByteChannel destChannel;
private final CompletableFuture<Path> result = new CompletableFuture<>();
private long downloaded;

ProgressSubscriber(WritableByteChannel destChannel) {
this.destChannel = destChannel;
Expand All @@ -78,7 +85,8 @@ public void onNext(List<ByteBuffer> items) {
try {
for (ByteBuffer item : items) {
var len = item.remaining();
progress.reportProgress(len, total == null ? Option.empty() : Some.apply(total));
downloaded += len;
progress.reportProgress(downloaded, Option.apply(total));
destChannel.write(item);
}
subscription.request(1);
Expand All @@ -90,6 +98,12 @@ public void onNext(List<ByteBuffer> items) {

@Override
public void onError(Throwable throwable) {
LOGGER.warn(
"Error while downloading into {}. Download progress {}/{}",
destination,
downloaded,
total,
throwable);
try {
destChannel.close();
} catch (IOException e) {
Expand All @@ -100,6 +114,7 @@ public void onError(Throwable throwable) {

@Override
public void onComplete() {
LOGGER.debug("Downloaded complete into {}", destination);
try {
destChannel.close();
} catch (IOException e) {
Expand All @@ -110,7 +125,7 @@ public void onComplete() {

@Override
public CompletionStage<Path> getBody() {
return CompletableFuture.completedFuture(destination);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.net.http.HttpResponse.BodySubscriber;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -15,12 +14,13 @@
import scala.Option;
import scala.Some;

/** A {@link HttpResponse} body handler for {@link Path} that keeps track of the progress. */
/** A {@link HttpResponse} body handler for {@link String} that keeps track of the progress. */
public class StringProgressBodyHandler implements HttpResponse.BodyHandler<String> {
private final ByteArrayOutputStream destination = new ByteArrayOutputStream();
private final TaskProgressImplementation<?> progress;
private final Charset encoding;
private Long total;
private long downloaded;

private StringProgressBodyHandler(
TaskProgressImplementation<?> progress, Charset encoding, Long total) {
Expand Down Expand Up @@ -64,7 +64,8 @@ public void onSubscribe(Flow.Subscription subscription) {
public void onNext(List<ByteBuffer> items) {
for (ByteBuffer item : items) {
var len = item.remaining();
progress.reportProgress(len, total == null ? Option.empty() : Some.apply(total));
downloaded += len;
progress.reportProgress(downloaded, Option.apply(total));
byte[] bytes = new byte[len];
item.get(bytes);
destination.write(bytes, 0, bytes.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertTrue;

import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -97,25 +101,25 @@ public void handleResourceNotFound() throws Exception {

@Test
public void downloadBigFileWithProgress() throws Exception {
var uriStr = "http://localhost:" + port + "/files/big.tgz";
var uriStr = "http://localhost:" + port + "/files/big.txt";
var uri = new URI(uriStr);
Path dest = Files.createTempFile("enso-downloader-test", ".tgz");
final int[] progressUpdateCalls = {0};
Path dest = Files.createTempFile("enso-downloader-test", ".txt");
List<Long> progressCalls = new ArrayList<>();
final int[] doneCalls = {0};
var task = HTTPDownload.download(uri, dest);
var progressListener =
new ProgressListener<Path>() {
@Override
public void progressUpdate(long done, Option<Object> total) {
progressUpdateCalls[0]++;
progressCalls.add(done);
assertThat(total.isDefined(), is(true));
assertThat(total.get(), instanceOf(Long.class));
long reportedTotal = (Long) total.get();
assertThat(reportedTotal, is(BigFileHandler.BIG_FILE_SIZE));
assertThat(
"Should send and report just chunk, not the whole file",
"Reported progress should not exceed total size",
done,
lessThan(reportedTotal));
lessThanOrEqualTo(reportedTotal));
}

@Override
Expand All @@ -129,8 +133,13 @@ public void done(Try<Path> result) {
var resp = task.force();
assertThat(resp.toFile().exists(), is(true));
assertThat("Done was called exactly once", doneCalls[0], is(1));
assertThat(
"Progress reported was called at least once", progressUpdateCalls[0], greaterThan(0));
assertThat("Progress reported was called at least twice", progressCalls.size(), greaterThan(1));
for (int i = 0; i < progressCalls.size() - 1; i++) {
var progress = progressCalls.get(i);
var nextProgress = progressCalls.get(i + 1);
assertThat("Progress should be increasing", progress, lessThan(nextProgress));
}
BigFileHandler.assertSameFileContent(dest);
Files.deleteIfExists(dest);
}

Expand Down Expand Up @@ -191,33 +200,41 @@ protected void doHandle(HttpExchange exchange) throws IOException {
}

private static class BigFileHandler extends SimpleHttpHandler {
private static final int CHUNK_SIZE = 1024;
private static final int CHUNK_SIZE = 4096;
private static final long BIG_FILE_SIZE = 10 * CHUNK_SIZE;
private static final byte[] BIG_FILE_BYTES = new byte[Math.toIntExact(BIG_FILE_SIZE)];
private static final byte[] BIG_FILE_CONTENT;

static {
var rnd = new Random(42);
rnd.nextBytes(BIG_FILE_BYTES);
var sb = new StringBuilder();
for (int i = 0; i < BIG_FILE_SIZE; i++) {
char c = (char) (rnd.nextInt(26) + 'a');
sb.append(c);
}
BIG_FILE_CONTENT = sb.toString().getBytes(StandardCharsets.UTF_8);
}

@Override
protected void doHandle(HttpExchange exchange) throws IOException {
if (exchange.getRequestURI().toString().equals("/files/big.tgz")) {
if (exchange.getRequestURI().toString().equals("/files/big.txt")) {
long chunks = BIG_FILE_SIZE / CHUNK_SIZE;
exchange.getResponseHeaders().set("Content-Length", Long.toString(BIG_FILE_SIZE));
exchange.getResponseHeaders().set("Content-Type", "application/x-gzip");
// Set responseLength to 0 to indicate that the response length is unknown
// and force chunking the response.
exchange.sendResponseHeaders(200, 0);
exchange.getResponseHeaders().set("Content-Type", "text/plain; charset=utf-8");
exchange.getResponseHeaders().set("Transfer-Encoding", "chunked");
exchange.sendResponseHeaders(200, BIG_FILE_SIZE);
try (var os = exchange.getResponseBody()) {
for (int i = 0; i < chunks; i++) {
os.write(BIG_FILE_BYTES, i * CHUNK_SIZE, CHUNK_SIZE);
os.flush();
os.write(BIG_FILE_CONTENT, i * CHUNK_SIZE, CHUNK_SIZE);
}
}
} else {
exchange.sendResponseHeaders(404, -1);
}
}

static void assertSameFileContent(Path file) throws IOException {
byte[] readbytes = Files.readAllBytes(file);
assertThat(readbytes, is(BIG_FILE_CONTENT));
}
}
}

0 comments on commit 53c1b3f

Please sign in to comment.