Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PathProgressBodyHandler does not return immediately #9222

Merged
merged 4 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import org.enso.cli.task.TaskProgressImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;

/** A {@link HttpResponse} body handler for {@link Path} that keeps track of the progress. */
class PathProgressBodyHandler implements HttpResponse.BodyHandler<Path> {
private static final Logger LOGGER = LoggerFactory.getLogger(PathProgressBodyHandler.class);

private final Path destination;
private final TaskProgressImplementation<Path> progress;
private Long total;
Expand All @@ -41,13 +45,15 @@ public BodySubscriber<Path> apply(ResponseInfo responseInfo) {
var reportedLenOpt = responseInfo.headers().firstValueAsLong("Content-Length");
if (reportedLenOpt.isPresent()) {
total = reportedLenOpt.getAsLong();
LOGGER.debug("Content-Length: {}", total);
}
}
if (total != null) {
progress.reportProgress(0, Some.apply(total));
} else {
progress.reportProgress(0, Option.empty());
}
LOGGER.debug("Initializing download into {}, estimated total is {}", destination, total);
WritableByteChannel destChannel;
try {
destChannel =
Expand All @@ -62,6 +68,7 @@ private class ProgressSubscriber implements BodySubscriber<Path> {
private Flow.Subscription subscription;
private final WritableByteChannel destChannel;
private final CompletableFuture<Path> result = new CompletableFuture<>();
private long downloaded;

ProgressSubscriber(WritableByteChannel destChannel) {
this.destChannel = destChannel;
Expand All @@ -78,7 +85,8 @@ public void onNext(List<ByteBuffer> items) {
try {
for (ByteBuffer item : items) {
var len = item.remaining();
progress.reportProgress(len, total == null ? Option.empty() : Some.apply(total));
downloaded += len;
progress.reportProgress(downloaded, total == null ? Option.empty() : Some.apply(total));
destChannel.write(item);
}
subscription.request(1);
Expand All @@ -90,6 +98,12 @@ public void onNext(List<ByteBuffer> items) {

@Override
public void onError(Throwable throwable) {
LOGGER.warn(
"Error while downloading into {}. Download progress {}/{}",
destination,
downloaded,
total,
throwable);
try {
destChannel.close();
} catch (IOException e) {
Expand All @@ -100,6 +114,7 @@ public void onError(Throwable throwable) {

@Override
public void onComplete() {
LOGGER.debug("Downloaded complete into {}", destination);
try {
destChannel.close();
} catch (IOException e) {
Expand All @@ -110,7 +125,7 @@ public void onComplete() {

@Override
public CompletionStage<Path> getBody() {
return CompletableFuture.completedFuture(destination);
return result;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the gist of the problem. This handler claimed that the download was finished (returned CompletableFuture.completedFuture almost immediately.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.net.http.HttpResponse.BodySubscriber;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -15,12 +14,13 @@
import scala.Option;
import scala.Some;

/** A {@link HttpResponse} body handler for {@link Path} that keeps track of the progress. */
/** A {@link HttpResponse} body handler for {@link String} that keeps track of the progress. */
public class StringProgressBodyHandler implements HttpResponse.BodyHandler<String> {
private final ByteArrayOutputStream destination = new ByteArrayOutputStream();
private final TaskProgressImplementation<?> progress;
private final Charset encoding;
private Long total;
private long downloaded;

private StringProgressBodyHandler(
TaskProgressImplementation<?> progress, Charset encoding, Long total) {
Expand Down Expand Up @@ -64,7 +64,8 @@ public void onSubscribe(Flow.Subscription subscription) {
public void onNext(List<ByteBuffer> items) {
for (ByteBuffer item : items) {
var len = item.remaining();
progress.reportProgress(len, total == null ? Option.empty() : Some.apply(total));
downloaded += len;
progress.reportProgress(downloaded, total == null ? Option.empty() : Some.apply(total));
Akirathan marked this conversation as resolved.
Show resolved Hide resolved
byte[] bytes = new byte[len];
item.get(bytes);
destination.write(bytes, 0, bytes.length);
Expand Down
Loading