Skip to content

Commit

Permalink
Close stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Shintaro Onuma committed Feb 15, 2024
1 parent a0b30bb commit af0d7fc
Showing 1 changed file with 11 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -693,35 +693,20 @@ private CompletableFuture<Boolean> closeStream(
return CompletableFuture.completedFuture(false);
}

//System.out.println("CLOSING STREAM");

// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
CompletableFuture<Boolean> operation;
SDKStreamDrainer drainer = new SDKStreamDrainer(
uri,
wrappedStream,
shouldAbort,
(int) remaining,
streamStatistics,
reason);

if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
// don't bother with async IO if the caller plans to wait for
// the result, there's an abort (which is fast), or
// there is not much data to read.
operation = CompletableFuture.completedFuture(drainer.apply());
System.out.println("CLOSING STREAM (NOT ABORTING)");

} else {
LOG.debug("initiating asynchronous drain of {} bytes", remaining);
// schedule an async drain/abort
operation = client.submit(drainer);
CompletableFuture<Boolean> operation;
try {
wrappedStream.close();
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
System.out.println(e.getLocalizedMessage());
}

operation = CompletableFuture.completedFuture(true);

// either the stream is closed in the blocking call or the async call is
// submitted with its own copy of the references
wrappedStream = null;
Expand Down

0 comments on commit af0d7fc

Please sign in to comment.