Skip to content

Commit

Permalink
Make bazel run works with minimal mode (#16821)
Browse files Browse the repository at this point in the history
Always use `ToplevelArtifactsDownloader` when building without the bytes.

It checks the combination of current command (e.g. `build`, `run`, etc.) and download mode (e.g. `toplevel`, `minimal`) to decide whether download outputs for the toplevel targets or not.

Also in the `RunCommand`, we wait for the background downloads before checking the local filesystem.

Fixes #11920.

Closes #16545.

PiperOrigin-RevId: 487181884
Change-Id: I6b1a78a0d032d2cac8093eecf9c4d2e76f90380f
  • Loading branch information
coeuvre authored Nov 22, 2022
1 parent 18e5e8c commit 845077f
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -564,4 +564,8 @@ public void finalizeAction(Action action, MetadataHandler metadataHandler) {
prefetchFiles(outputsToDownload, metadataHandler, Priority.LOW);
}
}

public void flushOutputTree() throws InterruptedException {
downloadCache.awaitInProgressTasks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -928,22 +928,21 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
remoteOutputService.setActionInputFetcher(actionInputFetcher);
actionContextProvider.setActionInputFetcher(actionInputFetcher);

if (remoteOutputsMode.downloadToplevelOutputsOnly()) {
toplevelArtifactsDownloader =
new ToplevelArtifactsDownloader(
env.getCommandName(),
env.getSkyframeExecutor().getEvaluator(),
actionInputFetcher,
(path) -> {
FileSystem fileSystem = path.getFileSystem();
Preconditions.checkState(
fileSystem instanceof RemoteActionFileSystem,
"fileSystem must be an instance of RemoteActionFileSystem");
return ((RemoteActionFileSystem) path.getFileSystem())
.getRemoteMetadata(path.asFragment());
});
env.getEventBus().register(toplevelArtifactsDownloader);
}
toplevelArtifactsDownloader =
new ToplevelArtifactsDownloader(
env.getCommandName(),
remoteOutputsMode.downloadToplevelOutputsOnly(),
env.getSkyframeExecutor().getEvaluator(),
actionInputFetcher,
(path) -> {
FileSystem fileSystem = path.getFileSystem();
Preconditions.checkState(
fileSystem instanceof RemoteActionFileSystem,
"fileSystem must be an instance of RemoteActionFileSystem");
return ((RemoteActionFileSystem) path.getFileSystem())
.getRemoteMetadata(path.asFragment());
});
env.getEventBus().register(toplevelArtifactsDownloader);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ public ModifiedFileSet startBuild(
return ModifiedFileSet.EVERYTHING_MODIFIED;
}

@Override
public void flushOutputTree() throws InterruptedException {
if (actionInputFetcher != null) {
actionInputFetcher.flushOutputTree();
}
}

@Override
public void finalizeBuild(boolean buildSuccessful) {
// Intentionally left empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ private enum CommandMode {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final CommandMode commandMode;
private final boolean downloadToplevel;
private final MemoizingEvaluator memoizingEvaluator;
private final AbstractActionInputPrefetcher actionInputPrefetcher;
private final PathToMetadataConverter pathToMetadataConverter;

public ToplevelArtifactsDownloader(
String commandName,
boolean downloadToplevel,
MemoizingEvaluator memoizingEvaluator,
AbstractActionInputPrefetcher actionInputPrefetcher,
PathToMetadataConverter pathToMetadataConverter) {
Expand All @@ -84,6 +86,7 @@ public ToplevelArtifactsDownloader(
default:
this.commandMode = CommandMode.UNKNOWN;
}
this.downloadToplevel = downloadToplevel;
this.memoizingEvaluator = memoizingEvaluator;
this.actionInputPrefetcher = actionInputPrefetcher;
this.pathToMetadataConverter = pathToMetadataConverter;
Expand Down Expand Up @@ -133,6 +136,10 @@ public void onFailure(Throwable throwable) {
@Subscribe
@AllowConcurrentEvents
public void onAspectComplete(AspectCompleteEvent event) {
if (!shouldDownloadToplevelOutputs(event.getAspectKey().getBaseConfiguredTargetKey())) {
return;
}

if (event.failed()) {
return;
}
Expand All @@ -143,7 +150,7 @@ public void onAspectComplete(AspectCompleteEvent event) {
@Subscribe
@AllowConcurrentEvents
public void onTargetComplete(TargetCompleteEvent event) {
if (!shouldDownloadToplevelOutputsForTarget(event.getConfiguredTargetKey())) {
if (!shouldDownloadToplevelOutputs(event.getConfiguredTargetKey())) {
return;
}

Expand All @@ -156,28 +163,32 @@ public void onTargetComplete(TargetCompleteEvent event) {
event.getExecutableTargetData().getRunfiles());
}

private boolean shouldDownloadToplevelOutputsForTarget(ConfiguredTargetKey configuredTargetKey) {
if (commandMode != CommandMode.TEST) {
return true;
}

// Do not download test binary in test mode.
try {
var configuredTargetValue =
(ConfiguredTargetValue) memoizingEvaluator.getExistingValue(configuredTargetKey);
if (configuredTargetValue == null) {
return false;
}
ConfiguredTarget configuredTarget = configuredTargetValue.getConfiguredTarget();
if (configuredTarget instanceof RuleConfiguredTarget) {
var ruleConfiguredTarget = (RuleConfiguredTarget) configuredTarget;
var isTestRule = isTestRuleName(ruleConfiguredTarget.getRuleClassString());
return !isTestRule;
}
return true;
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
return false;
private boolean shouldDownloadToplevelOutputs(ConfiguredTargetKey configuredTargetKey) {
switch (commandMode) {
case RUN:
// Always download outputs of toplevel targets in RUN mode
return true;
case TEST:
// Do not download test binary in test mode.
try {
var configuredTargetValue =
(ConfiguredTargetValue) memoizingEvaluator.getExistingValue(configuredTargetKey);
if (configuredTargetValue == null) {
return false;
}
ConfiguredTarget configuredTarget = configuredTargetValue.getConfiguredTarget();
if (configuredTarget instanceof RuleConfiguredTarget) {
var ruleConfiguredTarget = (RuleConfiguredTarget) configuredTarget;
var isTestRule = isTestRuleName(ruleConfiguredTarget.getRuleClassString());
return !isTestRule;
}
return true;
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
return false;
}
default:
return downloadToplevel;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -131,6 +133,8 @@ class Execution extends Single<ValueT> implements SingleObserver<ValueT> {
@GuardedBy("lock")
private final List<SingleObserver<? super ValueT>> observers = new ArrayList<>();

private final AsyncSubject<ValueT> completion = AsyncSubject.create();

Execution(KeyT key, Single<ValueT> upstream) {
this.key = key;
this.upstream = upstream;
Expand Down Expand Up @@ -182,6 +186,9 @@ public void onSuccess(@NonNull ValueT value) {
observer.onSuccess(value);
}

completion.onNext(value);
completion.onComplete();

maybeNotifyTermination();
}
}
Expand All @@ -198,6 +205,8 @@ public void onError(@NonNull Throwable error) {
observer.onError(error);
}

completion.onError(error);

maybeNotifyTermination();
}
}
Expand Down Expand Up @@ -348,6 +357,39 @@ public void shutdown() {
}
}

/**
* Waits for the in-progress tasks to finish. Any tasks that are submitted after the call are not
* waited.
*/
public void awaitInProgressTasks() throws InterruptedException {
Completable completable =
Completable.defer(
() -> {
ImmutableList<Execution> executions;
synchronized (lock) {
executions = ImmutableList.copyOf(inProgress.values());
}

if (executions.isEmpty()) {
return Completable.complete();
}

return Completable.fromPublisher(
Flowable.fromIterable(executions)
.flatMapSingle(e -> Single.fromObservable(e.completion)));
});

try {
completable.blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
throwIfInstanceOf(cause, InterruptedException.class);
}
throw e;
}
}

/** Waits for the channel to become terminated. */
public void awaitTermination() throws InterruptedException {
Completable completable =
Expand Down Expand Up @@ -493,6 +535,14 @@ public void shutdown() {
cache.shutdown();
}

/**
* Waits for the in-progress tasks to finish. Any tasks that are submitted after the call are
* not waited.
*/
public void awaitInProgressTasks() throws InterruptedException {
cache.awaitInProgressTasks();
}

/** Waits for the cache to become terminated. */
public void awaitTermination() throws InterruptedException {
cache.awaitTermination();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,17 @@ public BlazeCommandResult exec(CommandEnvironment env, OptionsParsingResult opti
return BlazeCommandResult.detailedExitCode(result.getDetailedExitCode());
}

// If Bazel is using an output service (e.g. Build without the Bytes), the toplevel outputs
// might still be downloading in the background. Flush the output tree to wait for all the
// downloads complete.
if (env.getOutputService() != null) {
try {
env.getOutputService().flushOutputTree();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}

// Make sure that we have exactly 1 built target (excluding --run_under),
// and that it is executable.
// These checks should only fail if keepGoing is true, because we already did
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public default boolean shouldTrustRemoteArtifacts() {
ModifiedFileSet startBuild(EventHandler eventHandler, UUID buildId, boolean finalizeActions)
throws BuildFailedException, AbruptExitException, InterruptedException;

/** Flush and wait for in-progress downloads. */
default void flushOutputTree() throws InterruptedException {}

/**
* Finish the build.
*
Expand Down
22 changes: 22 additions & 0 deletions src/test/shell/bazel/remote/build_without_the_bytes_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1319,4 +1319,26 @@ EOF
[[ ! -e "bazel-bin/a/liblib.jdeps" ]] || fail "bazel-bin/a/liblib.jdeps shouldn't exist"
}

function test_bazel_run_with_minimal() {
# Test that `bazel run` works in minimal mode.
mkdir -p a

cat > a/BUILD <<'EOF'
genrule(
name = 'bin',
srcs = [],
outs = ['bin.out'],
cmd = "echo 'echo bin-message' > $@",
executable = True,
)
EOF

bazel run \
--remote_executor=grpc://localhost:${worker_port} \
--remote_download_minimal \
//a:bin >& $TEST_log || fail "Failed to run //a:bin"

expect_log "bin-message"
}

run_suite "Build without the Bytes tests"

0 comments on commit 845077f

Please sign in to comment.