Skip to content

Commit

Permalink
Improve isBlocked logic in DeduplicatingDirectExchangeBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
linzebing authored and arhimondr committed Mar 15, 2022
1 parent 48e0bfa commit 349b3ce
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -141,7 +142,14 @@ public synchronized ListenableFuture<Void> isBlocked()
}

if (!outputReady.isDone()) {
return nonCancellationPropagating(outputReady);
return nonCancellationPropagating(Futures.transformAsync(outputReady, ignored -> {
synchronized (this) {
if (outputSource != null) {
return outputSource.isBlocked();
}
return immediateVoidFuture();
}
}, directExecutor()));
}

checkState(outputSource != null, "outputSource is expected to be set");
Expand Down Expand Up @@ -821,7 +829,10 @@ public ListenableFuture<Void> isBlocked()
return immediateVoidFuture();
}
if (!exchangeSourceFuture.isDone()) {
return nonCancellationPropagating(asVoid(exchangeSourceFuture));
return nonCancellationPropagating(asVoid(Futures.transformAsync(
exchangeSourceFuture,
exchangeSource -> toListenableFuture(exchangeSource.isBlocked()),
directExecutor())));
}
if (exchangeSource != null) {
CompletableFuture<Void> blocked = exchangeSource.isBlocked();
Expand Down

0 comments on commit 349b3ce

Please sign in to comment.