Skip to content

Commit

Permalink
Allow rewinding to re-create lost inputs
Browse files Browse the repository at this point in the history
This remedies the following sequence of events:
1. Build build_tool (e.g. the go builder) from source with remote
   execution and `--remote_download_minimal`.
2. Use build_tool to build some_binary with remote execution.
3. Evict `build_tool` from the remote execution system.
4. Edit the sources to some_binary and attempt to build it again with
   remote execution.

Before this change, Bazel would give an FileNotFoundException
complaining that build_tool couldn't be found (and so couldn't be
uploaded).

After this change, Bazel will notice that it knows how to regenerate the
missing file, and so rewind the graph and re-perform the actions it
needs to be able to build some_binary.

Co-authored-by: Ilya Polyakovskiy <[email protected]>
  • Loading branch information
2 people authored and k1nkreet committed Oct 13, 2022
1 parent 1bae565 commit c745cac
Show file tree
Hide file tree
Showing 38 changed files with 337 additions and 67 deletions.
3 changes: 3 additions & 0 deletions src/main/cpp/blaze.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ static vector<string> GetServerExeArgs(const blaze_util::Path &jvm_path,
if (startup_options.autodetect_server_javabase) {
result.push_back("--default_system_javabase=" + GetSystemJavabase());
}
if (startup_options.experimental_rewind_missing_files) {
result.push_back("--experimental_rewind_missing_files");
}

if (!startup_options.server_jvm_out.IsEmpty()) {
result.push_back("--server_jvm_out=" +
Expand Down
3 changes: 3 additions & 0 deletions src/main/cpp/startup_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ StartupOptions::StartupOptions(const string &product_name,
block_for_lock(true),
host_jvm_debug(false),
autodetect_server_javabase(true),
experimental_rewind_missing_files(false),
batch(false),
batch_cpu_scheduling(false),
io_nice_level(-1),
Expand Down Expand Up @@ -139,6 +140,8 @@ StartupOptions::StartupOptions(const string &product_name,
RegisterNullaryStartupFlag("host_jvm_debug", &host_jvm_debug);
RegisterNullaryStartupFlag("autodetect_server_javabase",
&autodetect_server_javabase);
RegisterNullaryStartupFlag("experimental_rewind_missing_files",
&experimental_rewind_missing_files);
RegisterNullaryStartupFlag("idle_server_tasks", &idle_server_tasks);
RegisterNullaryStartupFlag("shutdown_on_low_sys_mem",
&shutdown_on_low_sys_mem);
Expand Down
2 changes: 2 additions & 0 deletions src/main/cpp/startup_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class StartupOptions {

bool autodetect_server_javabase;

bool experimental_rewind_missing_files;

std::string host_jvm_profile;

std::vector<std::string> host_jvm_args;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,26 @@ protected FailureDetail getFailureDetail(String message) {
.build();
}

public void combineAndThrow(LostInputsExecException other) throws LostInputsExecException {
public static LostInputsExecException combine(LostInputsExecException... es) {
// This uses a HashMap when merging the two lostInputs maps because key collisions are expected.
// In contrast, ImmutableMap.Builder treats collisions as errors. Collisions will happen when
// the two sources of the original exceptions shared knowledge of what was lost. For example,
// a SpawnRunner may discover a lost input and look it up in an action filesystem in which it's
// also lost. The SpawnRunner and the filesystem may then each throw a LostInputsExecException
// with the same information.
Map<String, ActionInput> map = new HashMap<>();
map.putAll(lostInputs);
map.putAll(other.lostInputs);
LostInputsExecException combined =
new LostInputsExecException(
ImmutableMap.copyOf(map), new MergedActionInputDepOwners(owners, other.owners), this);
ActionInputDepOwners mergedOwners = new ActionInputDepOwnerMap(ImmutableSet.of());

for (LostInputsExecException other : es) {
map.putAll(other.lostInputs);
mergedOwners = new MergedActionInputDepOwners(mergedOwners, other.owners);
}
return new LostInputsExecException(ImmutableMap.copyOf(map), mergedOwners);
}

public void combineAndThrow(LostInputsExecException other) throws LostInputsExecException {
LostInputsExecException combined = combine(this, other);
combined.addSuppressed(this);
combined.addSuppressed(other);
throw combined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ java_library(
deps = [
"//src/main/java/com/google/devtools/build/docgen/annot",
"//src/main/java/com/google/devtools/build/lib:runtime",
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/analysis:analysis_cluster",
"//src/main/java/com/google/devtools/build/lib/analysis:blaze_directories",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.FileValue;
import com.google.devtools.build.lib.bazel.debug.WorkspaceRuleEvent;
import com.google.devtools.build.lib.bazel.repository.DecompressorDescriptor;
Expand Down Expand Up @@ -1048,7 +1049,7 @@ private StarlarkExecutionResult executeRemote(
}

return new StarlarkExecutionResult(result.exitCode(), stdout, stderr);
} catch (IOException e) {
} catch (ExecException | IOException e) {
throw Starlark.errorf("remote_execute failed: %s", e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public ImmutableList<SpawnResult> exec(
}
} catch (InterruptedIOException e) {
throw new InterruptedException(e.getMessage());
} catch (LostInputsExecException e) {
throw e;
} catch (IOException e) {
throw new EnvironmentalExecException(
e,
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ java_library(
],
deps = [
":ExecutionStatusException",
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//third_party:guava",
"//third_party:jsr305",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
Expand Down Expand Up @@ -114,7 +115,7 @@ private static class Execution {
this.waitExecutionFunction = waitExecutionFunction;
}

ExecuteResponse start() throws IOException, InterruptedException {
ExecuteResponse start() throws ExecException, IOException, InterruptedException {
// Execute has two components: the Execute call and (optionally) the WaitExecution call.
// This is the simple flow without any errors:
//
Expand Down Expand Up @@ -321,7 +322,7 @@ static ExecuteResponse extractResponseOrThrowIfError(Operation operation) throws
@Override
public ExecuteResponse executeRemotely(
RemoteActionExecutionContext context, ExecuteRequest request, OperationObserver observer)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
Execution execution =
new Execution(
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,21 @@
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputDepOwnerMap;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.Artifact.DerivedArtifact;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.FileArtifactValue.RemoteFileArtifactValue;
import com.google.devtools.build.lib.actions.LostInputsExecException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
Expand All @@ -51,6 +61,7 @@
import io.reactivex.rxjava3.core.SingleEmitter;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -83,20 +94,20 @@ public void ensureInputsPresent(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
String actionId,
boolean force)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
Iterable<Digest> merkleTreeAllDigests;
try (SilentCloseable s = Profiler.instance().profile("merkleTree.getAllDigests()")) {
merkleTreeAllDigests = merkleTree.getAllDigests();
}
Iterable<Digest> allDigests = Iterables.concat(merkleTreeAllDigests, additionalInputs.keySet());

if (Iterables.isEmpty(allDigests)) {
return;
}

Flowable<TransferResult> uploads =
createUploadTasks(context, merkleTree, additionalInputs, allDigests, force)
createUploadTasks(context, merkleTree, additionalInputs, allDigests, actionId, force)
.flatMapPublisher(
result ->
Flowable.using(
Expand All @@ -119,6 +130,7 @@ public void ensureInputsPresent(
Throwable cause = e.getCause();
if (cause != null) {
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
Throwables.throwIfInstanceOf(cause, LostInputsExecException.class);
Throwables.throwIfInstanceOf(cause, IOException.class);
}
throw e;
Expand All @@ -129,7 +141,8 @@ private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs) {
Map<Digest, Message> additionalInputs,
String actionId) {
Directory node = merkleTree.getDirectoryByDigest(digest);
if (node != null) {
return cacheProtocol.uploadBlob(context, digest, node.toByteString());
Expand All @@ -140,7 +153,28 @@ private ListenableFuture<Void> uploadBlob(
if (file.getBytes() != null) {
return cacheProtocol.uploadBlob(context, digest, file.getBytes());
}
return cacheProtocol.uploadFile(context, digest, file.getPath());
return Futures.catchingAsync(cacheProtocol.uploadFile(context, digest, file.getPath()),
// When we avoid downloads (e.g. with --remote_download_minimal), we end up not populating
// paths which may be read from when doing uploads.
// If this happens, the remote is missing a file we expect it to have, and we can identify
// which action produced the file, report that we lost inputs but that rewinding may be
// able to regenerate them.
FileNotFoundException.class, e -> {
if (file.getArtifact() != null) {
ActionInputDepOwnerMap owners = new ActionInputDepOwnerMap(
ImmutableList.of(file.getArtifact()));
RemoteFileArtifactValue artifactValue = RemoteFileArtifactValue.create(
DigestUtil.toBinaryDigest(digest),
digest.getSizeBytes(),
/*locationIndex=*/ 1,
actionId);
owners.put(file.getArtifact(), artifactValue, file.getArtifact());
return Futures.immediateFailedFuture(new LostInputsExecException(
ImmutableMap.of(digest.getHash(), file.getArtifact()),
owners));
}
return Futures.immediateFailedFuture(e);
}, MoreExecutors.directExecutor());
}

Message message = additionalInputs.get(digest);
Expand All @@ -160,21 +194,24 @@ static class UploadTask {
AtomicReference<Disposable> disposable;
SingleEmitter<Boolean> continuation;
Completable completion;
PathOrBytes file;
String actionId;
}

private Single<List<UploadTask>> createUploadTasks(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Iterable<Digest> allDigests,
String actionId,
boolean force) {
return Single.using(
() -> Profiler.instance().profile("collect digests"),
ignored ->
Flowable.fromIterable(allDigests)
.flatMapMaybe(
digest ->
maybeCreateUploadTask(context, merkleTree, additionalInputs, digest, force))
maybeCreateUploadTask(context, merkleTree, additionalInputs, digest, actionId, force))
.collect(toImmutableList()),
SilentCloseable::close);
}
Expand All @@ -184,6 +221,7 @@ private Maybe<UploadTask> maybeCreateUploadTask(
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Digest digest,
String actionId,
boolean force) {
return Maybe.create(
emitter -> {
Expand All @@ -192,6 +230,9 @@ private Maybe<UploadTask> maybeCreateUploadTask(
uploadTask.digest = digest;
uploadTask.disposable = new AtomicReference<>();
uploadTask.completion = Completable.fromObservable(completion);
uploadTask.file = merkleTree.getFileByDigest(digest);
uploadTask.actionId = actionId;

Completable upload =
casUploadCache.execute(
digest,
Expand All @@ -209,7 +250,7 @@ private Maybe<UploadTask> maybeCreateUploadTask(
return toCompletable(
() ->
uploadBlob(
context, uploadTask.digest, merkleTree, additionalInputs),
context, uploadTask.digest, merkleTree, additionalInputs, actionId),
directExecutor());
}),
/* onAlreadyRunning= */ () -> emitter.onSuccess(uploadTask),
Expand Down Expand Up @@ -282,7 +323,27 @@ private Flowable<TransferResult> waitForUploadTasks(List<UploadTask> uploadTasks
() -> Profiler.instance().profile("upload"),
ignored ->
Flowable.fromIterable(uploadTasks)
.flatMapSingle(uploadTask -> toTransferResult(uploadTask.completion)),
.flatMapSingle(uploadTask -> toTransferResult(uploadTask.completion)
.onErrorResumeNext(e -> {
if (e instanceof FileNotFoundException) {
PathOrBytes file = uploadTask.file;
Digest digest = uploadTask.digest;
if (file.getArtifact() != null) {
ActionInputDepOwnerMap owners = new ActionInputDepOwnerMap(
ImmutableList.of(file.getArtifact()));
RemoteFileArtifactValue artifactValue = RemoteFileArtifactValue.create(
DigestUtil.toBinaryDigest(digest),
uploadTask.digest.getSizeBytes(),
/*locationIndex=*/ 1,
uploadTask.actionId);
owners.put(file.getArtifact(), artifactValue, file.getArtifact());
return Single.error(new LostInputsExecException(
ImmutableMap.of(digest.getHash(), file.getArtifact()),
owners));
}
}
return Single.error(e);
})),
SilentCloseable::close);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1373,7 +1373,7 @@ private void reportUploadError(Throwable error) {
* <p>Must be called before calling {@link #executeRemotely}.
*/
public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
checkState(!shutdown.get(), "shutdown");
checkState(mayBeExecutedRemotely(action.getSpawn()), "spawn can't be executed remotely");

Expand All @@ -1388,6 +1388,7 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
.withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
action.getMerkleTree(),
additionalInputs,
action.getActionKey().toString(),
force);
}

Expand All @@ -1399,7 +1400,7 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
*/
public RemoteActionResult executeRemotely(
RemoteAction action, boolean acceptCachedResult, OperationObserver observer)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
checkState(!shutdown.get(), "shutdown");
checkState(mayBeExecutedRemotely(action.getSpawn()), "spawn can't be executed remotely");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.analysis.platform.PlatformUtils;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
Expand Down Expand Up @@ -108,7 +109,7 @@ public ExecutionResult execute(
ImmutableMap<String, String> environment,
String workingDirectory,
Duration timeout)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "repository_rule", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
Expand Down Expand Up @@ -158,7 +159,7 @@ public ExecutionResult execute(
additionalInputs.put(actionDigest, action);
additionalInputs.put(commandHash, command);

remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, /*force=*/ true);
remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, "", /*force=*/ true);
}

try (SilentCloseable c =
Expand Down
Loading

0 comments on commit c745cac

Please sign in to comment.