diff --git a/lib/scala/downloader/src/main/java/org/enso/downloader/http/PathProgressBodyHandler.java b/lib/scala/downloader/src/main/java/org/enso/downloader/http/PathProgressBodyHandler.java index 02f94048e40c..55a7a3a71e99 100644 --- a/lib/scala/downloader/src/main/java/org/enso/downloader/http/PathProgressBodyHandler.java +++ b/lib/scala/downloader/src/main/java/org/enso/downloader/http/PathProgressBodyHandler.java @@ -14,11 +14,15 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import org.enso.cli.task.TaskProgressImplementation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; import scala.Some; /** A {@link HttpResponse} body handler for {@link Path} that keeps track of the progress. */ class PathProgressBodyHandler implements HttpResponse.BodyHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(PathProgressBodyHandler.class); + private final Path destination; private final TaskProgressImplementation progress; private Long total; @@ -41,6 +45,7 @@ public BodySubscriber apply(ResponseInfo responseInfo) { var reportedLenOpt = responseInfo.headers().firstValueAsLong("Content-Length"); if (reportedLenOpt.isPresent()) { total = reportedLenOpt.getAsLong(); + LOGGER.debug("Content-Length: {}", total); } } if (total != null) { @@ -48,6 +53,7 @@ public BodySubscriber apply(ResponseInfo responseInfo) { } else { progress.reportProgress(0, Option.empty()); } + LOGGER.debug("Initializing download into {}, estimated total is {}", destination, total); WritableByteChannel destChannel; try { destChannel = @@ -62,6 +68,7 @@ private class ProgressSubscriber implements BodySubscriber { private Flow.Subscription subscription; private final WritableByteChannel destChannel; private final CompletableFuture result = new CompletableFuture<>(); + private long downloaded; ProgressSubscriber(WritableByteChannel destChannel) { this.destChannel = destChannel; @@ -78,7 +85,8 @@ public void onNext(List items) { try { for (ByteBuffer item : items) { var len = item.remaining(); - progress.reportProgress(len, total == null ? Option.empty() : Some.apply(total)); + downloaded += len; + progress.reportProgress(downloaded, Option.apply(total)); destChannel.write(item); } subscription.request(1); @@ -90,6 +98,12 @@ public void onNext(List items) { @Override public void onError(Throwable throwable) { + LOGGER.warn( + "Error while downloading into {}. Download progress {}/{}", + destination, + downloaded, + total, + throwable); try { destChannel.close(); } catch (IOException e) { @@ -100,6 +114,7 @@ public void onError(Throwable throwable) { @Override public void onComplete() { + LOGGER.debug("Downloaded complete into {}", destination); try { destChannel.close(); } catch (IOException e) { @@ -110,7 +125,7 @@ public void onComplete() { @Override public CompletionStage getBody() { - return CompletableFuture.completedFuture(destination); + return result; } } } diff --git a/lib/scala/downloader/src/main/java/org/enso/downloader/http/StringProgressBodyHandler.java b/lib/scala/downloader/src/main/java/org/enso/downloader/http/StringProgressBodyHandler.java index ba725b9d4923..eabff17c4a96 100644 --- a/lib/scala/downloader/src/main/java/org/enso/downloader/http/StringProgressBodyHandler.java +++ b/lib/scala/downloader/src/main/java/org/enso/downloader/http/StringProgressBodyHandler.java @@ -6,7 +6,6 @@ import java.net.http.HttpResponse.BodySubscriber; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import java.nio.file.Path; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -15,12 +14,13 @@ import scala.Option; import scala.Some; -/** A {@link HttpResponse} body handler for {@link Path} that keeps track of the progress. */ +/** A {@link HttpResponse} body handler for {@link String} that keeps track of the progress. */ public class StringProgressBodyHandler implements HttpResponse.BodyHandler { private final ByteArrayOutputStream destination = new ByteArrayOutputStream(); private final TaskProgressImplementation progress; private final Charset encoding; private Long total; + private long downloaded; private StringProgressBodyHandler( TaskProgressImplementation progress, Charset encoding, Long total) { @@ -64,7 +64,8 @@ public void onSubscribe(Flow.Subscription subscription) { public void onNext(List items) { for (ByteBuffer item : items) { var len = item.remaining(); - progress.reportProgress(len, total == null ? Option.empty() : Some.apply(total)); + downloaded += len; + progress.reportProgress(downloaded, Option.apply(total)); byte[] bytes = new byte[len]; item.get(bytes); destination.write(bytes, 0, bytes.length); diff --git a/lib/scala/downloader/src/test/java/org/enso/downloader/http/HttpDownloaderTest.java b/lib/scala/downloader/src/test/java/org/enso/downloader/http/HttpDownloaderTest.java index 8bf5a21df4f7..37d40d3643c9 100644 --- a/lib/scala/downloader/src/test/java/org/enso/downloader/http/HttpDownloaderTest.java +++ b/lib/scala/downloader/src/test/java/org/enso/downloader/http/HttpDownloaderTest.java @@ -6,6 +6,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertTrue; @@ -13,8 +14,11 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -97,25 +101,25 @@ public void handleResourceNotFound() throws Exception { @Test public void downloadBigFileWithProgress() throws Exception { - var uriStr = "http://localhost:" + port + "/files/big.tgz"; + var uriStr = "http://localhost:" + port + "/files/big.txt"; var uri = new URI(uriStr); - Path dest = Files.createTempFile("enso-downloader-test", ".tgz"); - final int[] progressUpdateCalls = {0}; + Path dest = Files.createTempFile("enso-downloader-test", ".txt"); + List progressCalls = new ArrayList<>(); final int[] doneCalls = {0}; var task = HTTPDownload.download(uri, dest); var progressListener = new ProgressListener() { @Override public void progressUpdate(long done, Option total) { - progressUpdateCalls[0]++; + progressCalls.add(done); assertThat(total.isDefined(), is(true)); assertThat(total.get(), instanceOf(Long.class)); long reportedTotal = (Long) total.get(); assertThat(reportedTotal, is(BigFileHandler.BIG_FILE_SIZE)); assertThat( - "Should send and report just chunk, not the whole file", + "Reported progress should not exceed total size", done, - lessThan(reportedTotal)); + lessThanOrEqualTo(reportedTotal)); } @Override @@ -129,8 +133,13 @@ public void done(Try result) { var resp = task.force(); assertThat(resp.toFile().exists(), is(true)); assertThat("Done was called exactly once", doneCalls[0], is(1)); - assertThat( - "Progress reported was called at least once", progressUpdateCalls[0], greaterThan(0)); + assertThat("Progress reported was called at least twice", progressCalls.size(), greaterThan(1)); + for (int i = 0; i < progressCalls.size() - 1; i++) { + var progress = progressCalls.get(i); + var nextProgress = progressCalls.get(i + 1); + assertThat("Progress should be increasing", progress, lessThan(nextProgress)); + } + BigFileHandler.assertSameFileContent(dest); Files.deleteIfExists(dest); } @@ -191,33 +200,41 @@ protected void doHandle(HttpExchange exchange) throws IOException { } private static class BigFileHandler extends SimpleHttpHandler { - private static final int CHUNK_SIZE = 1024; + private static final int CHUNK_SIZE = 4096; private static final long BIG_FILE_SIZE = 10 * CHUNK_SIZE; - private static final byte[] BIG_FILE_BYTES = new byte[Math.toIntExact(BIG_FILE_SIZE)]; + private static final byte[] BIG_FILE_CONTENT; static { var rnd = new Random(42); - rnd.nextBytes(BIG_FILE_BYTES); + var sb = new StringBuilder(); + for (int i = 0; i < BIG_FILE_SIZE; i++) { + char c = (char) (rnd.nextInt(26) + 'a'); + sb.append(c); + } + BIG_FILE_CONTENT = sb.toString().getBytes(StandardCharsets.UTF_8); } @Override protected void doHandle(HttpExchange exchange) throws IOException { - if (exchange.getRequestURI().toString().equals("/files/big.tgz")) { + if (exchange.getRequestURI().toString().equals("/files/big.txt")) { long chunks = BIG_FILE_SIZE / CHUNK_SIZE; exchange.getResponseHeaders().set("Content-Length", Long.toString(BIG_FILE_SIZE)); - exchange.getResponseHeaders().set("Content-Type", "application/x-gzip"); - // Set responseLength to 0 to indicate that the response length is unknown - // and force chunking the response. - exchange.sendResponseHeaders(200, 0); + exchange.getResponseHeaders().set("Content-Type", "text/plain; charset=utf-8"); + exchange.getResponseHeaders().set("Transfer-Encoding", "chunked"); + exchange.sendResponseHeaders(200, BIG_FILE_SIZE); try (var os = exchange.getResponseBody()) { for (int i = 0; i < chunks; i++) { - os.write(BIG_FILE_BYTES, i * CHUNK_SIZE, CHUNK_SIZE); - os.flush(); + os.write(BIG_FILE_CONTENT, i * CHUNK_SIZE, CHUNK_SIZE); } } } else { exchange.sendResponseHeaders(404, -1); } } + + static void assertSameFileContent(Path file) throws IOException { + byte[] readbytes = Files.readAllBytes(file); + assertThat(readbytes, is(BIG_FILE_CONTENT)); + } } }