Skip to content

Commit

Permalink
[8.1.0] Use Single.using instead of onError/onSuccess for async…
Browse files Browse the repository at this point in the history
… uploads (#25251)

Users have reported hangs in Bazel's asynchronous remote cache uploads
that may be happening because neither `onSuccess` nor `onError` is
called on the observer.

Work towards #25232

Closes #25231.

PiperOrigin-RevId: 725235495
Change-Id: I20c3aaa2ee57a52041dea0b3c17356445f2bbc34

Commit
d4c9b92

Co-authored-by: Fabian Meumertzheim <[email protected]>
  • Loading branch information
bazel-io and fmeum authored Feb 11, 2025
1 parent 3b0d75a commit cf3ebdf
Showing 1 changed file with 28 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,8 @@
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import io.grpc.Status.Code;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -160,6 +157,7 @@
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -1794,45 +1792,33 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult, Runnable

if (remoteOptions.remoteCacheAsync
&& !action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
Single.using(
combinedCache::retain,
combinedCache ->
buildUploadManifestAsync(action, spawnResult)
.flatMap(
manifest ->
manifest.uploadAsync(
action.getRemoteActionExecutionContext(),
combinedCache,
reporter)),
CombinedCache::release)
.subscribeOn(scheduler)
.subscribe(
new SingleObserver<ActionResult>() {
long startTime = 0;

@Override
public void onSubscribe(@NonNull Disposable d) {
backgroundTaskPhaser.register();
startTime = Profiler.nanoTimeMaybe();
}

@Override
public void onSuccess(@NonNull ActionResult actionResult) {
Profiler.instance()
.completeTask(startTime, ProfilerTask.UPLOAD_TIME, "upload outputs");
backgroundTaskPhaser.arriveAndDeregister();
onUploadComplete.run();
}

@Override
public void onError(@NonNull Throwable e) {
Profiler.instance()
.completeTask(startTime, ProfilerTask.UPLOAD_TIME, "upload outputs");
backgroundTaskPhaser.arriveAndDeregister();
reportUploadError(e);
onUploadComplete.run();
}
});
AtomicLong startTime = new AtomicLong();
var unused =
Single.using(
() -> {
backgroundTaskPhaser.register();
CombinedCache cache = combinedCache.retain();
startTime.set(Profiler.nanoTimeMaybe());
return cache;
},
combinedCache ->
buildUploadManifestAsync(action, spawnResult)
.flatMap(
manifest ->
manifest.uploadAsync(
action.getRemoteActionExecutionContext(),
combinedCache,
reporter)),
cacheResource -> {
Profiler.instance()
.completeTask(startTime.get(), ProfilerTask.UPLOAD_TIME, "upload outputs");
backgroundTaskPhaser.arriveAndDeregister();
onUploadComplete.run();
cacheResource.release();
},
/* eager= */ false)
.subscribeOn(scheduler)
.subscribe(result -> {}, this::reportUploadError);
} else {
try (SilentCloseable c =
Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
Expand Down

0 comments on commit cf3ebdf

Please sign in to comment.