Skip to content

Commit

Permalink
remote: re-implement merkle tree buildling
Browse files Browse the repository at this point in the history
The TreeNodeRepository served us well, but has become a bit dated over
time and it's increasingly hard to understand and to fix bugs. Additionally,
it was written with the initial intend of incrementally updating the merkle
tree between actions, however internal performance analysis has shown
that doing so is 1) hard to implement reliably and 2) would increase
memory consumption multifold.

This is a rewrite of the code that also fixes bugs like #4663. MerkleTree
implements a visitor pattern over an InputTree and is responsible for
serializing the InputTree to the protobuf merkle tree that's used by
remote caching / execution.

Benchmarks on my local machine have shown this implementation to
consumes between 2-10x less wall time than the current implementation.
Tests by users have shown equally encouraging speedups [1].

[1] https://groups.google.com/d/msg/bazel-discuss/wPHrqm2z8lU/mzoRF236GQAJ

Closes #7583.

PiperOrigin-RevId: 237421145
  • Loading branch information
buchgr authored and copybara-github committed Mar 8, 2019
1 parent 04f28cb commit 7f72544
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 866 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ public void onFailure(Throwable t) {
* @throws IOException in case of a cache miss or if the remote cache is unavailable.
* @throws ExecException in case clean up after a failed download failed.
*/
// TODO(olaola): will need to amend to include the TreeNodeRepository for updating.
public void download(ActionResult result, Path execRoot, FileOutErr outErr)
throws ExecException, IOException, InterruptedException {
Context ctx = Context.current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.devtools.build.lib.remote;

import static java.lang.String.format;

import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionCacheGrpc;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
Expand Down Expand Up @@ -44,12 +46,13 @@
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.Message;
import io.grpc.CallCredentials;
import io.grpc.Context;
import io.grpc.Status;
Expand All @@ -59,8 +62,6 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -199,49 +200,41 @@ private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
* end-point from the executor itself, so the functionality lives here.
*/
public void ensureInputsPresent(
TreeNodeRepository repository, Path execRoot, TreeNode root, Action action, Command command)
MerkleTree merkleTree, Map<Digest, Message> additionalInputs, Path execRoot)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
Digest actionDigest = digestUtil.compute(action);
Digest commandDigest = digestUtil.compute(command);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
ImmutableSet<Digest> missingDigests =
getMissingDigests(
Iterables.concat(
repository.getAllDigests(root), ImmutableList.of(actionDigest, commandDigest)));

List<Chunker> toUpload = new ArrayList<>();
// Only upload data that was missing from the cache.
Map<Digest, ActionInput> missingActionInputs = new HashMap<>();
Map<Digest, Directory> missingTreeNodes = new HashMap<>();
HashSet<Digest> missingTreeDigests = new HashSet<>(missingDigests);
missingTreeDigests.remove(commandDigest);
missingTreeDigests.remove(actionDigest);
repository.getDataFromDigests(missingTreeDigests, missingActionInputs, missingTreeNodes);

if (missingDigests.contains(actionDigest)) {
toUpload.add(
Chunker.builder(digestUtil).setInput(actionDigest, action.toByteArray()).build());
}
if (missingDigests.contains(commandDigest)) {
toUpload.add(
Chunker.builder(digestUtil).setInput(commandDigest, command.toByteArray()).build());
}
if (!missingTreeNodes.isEmpty()) {
for (Map.Entry<Digest, Directory> entry : missingTreeNodes.entrySet()) {
Digest digest = entry.getKey();
Directory d = entry.getValue();
toUpload.add(Chunker.builder(digestUtil).setInput(digest, d.toByteArray()).build());
getMissingDigests(Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet()));
List<Chunker> inputsToUpload = new ArrayList<>(missingDigests.size());
for (Digest missingDigest : missingDigests) {
Directory node = merkleTree.getDirectoryByDigest(missingDigest);
final Chunker c;
if (node != null) {
c = Chunker.builder(digestUtil).setInput(missingDigest, node.toByteArray()).build();
inputsToUpload.add(c);
continue;
}
}
if (!missingActionInputs.isEmpty()) {
for (Map.Entry<Digest, ActionInput> entry : missingActionInputs.entrySet()) {
Digest digest = entry.getKey();
ActionInput actionInput = entry.getValue();
toUpload.add(Chunker.builder(digestUtil).setInput(digest, actionInput, execRoot).build());

ActionInput file = merkleTree.getInputByDigest(missingDigest);
if (file != null) {
c = Chunker.builder(digestUtil).setInput(missingDigest, file, execRoot).build();
inputsToUpload.add(c);
continue;
}

Message message = additionalInputs.get(missingDigest);
if (message != null) {
c = Chunker.builder(digestUtil).setInput(missingDigest, message.toByteArray()).build();
inputsToUpload.add(c);
continue;
}

throw new IOException(
format(
"getMissingDigests returned a missing digest that has not been requested: %s",
missingDigest));
}
uploader.uploadBlobs(toUpload, true);

uploader.uploadBlobs(inputsToUpload, /* forceUpload= */ true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Command;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Platform;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
Expand All @@ -36,7 +37,7 @@
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
Expand Down Expand Up @@ -100,14 +101,9 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
}

SortedMap<PathFragment, ActionInput> inputMap = context.getInputMapping(true);
// Temporary hack: the TreeNodeRepository should be created and maintained upstream!
TreeNodeRepository repository =
new TreeNodeRepository(execRoot, context.getMetadataProvider(), digestUtil);
TreeNode inputRoot;
try (SilentCloseable c = Profiler.instance().profile("RemoteCache.computeMerkleDigests")) {
inputRoot = repository.buildFromActionInputs(inputMap);
repository.computeMerkleDigests(inputRoot);
}
MerkleTree merkleTree =
MerkleTree.build(inputMap, context.getMetadataProvider(), execRoot, digestUtil);
Digest merkleTreeRoot = merkleTree.getRootDigest();

// Get the remote platform properties.
Platform platform =
Expand All @@ -122,10 +118,7 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
try (SilentCloseable c = Profiler.instance().profile("RemoteCache.buildAction")) {
action =
RemoteSpawnRunner.buildAction(
digestUtil.compute(command),
repository.getMerkleDigest(inputRoot),
context.getTimeout(),
true);
digestUtil.compute(command), merkleTreeRoot, context.getTimeout(), true);
// Look up action cache, and reuse the action output if it is found.
actionKey = digestUtil.computeActionKey(action);
}
Expand All @@ -144,8 +137,6 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
}
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try (SilentCloseable c = Profiler.instance().profile("RemoteCache.download")) {
remoteCache.download(result, execRoot, context.getFileOutErr());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.CommandLines.ParamFileActionInput;
import com.google.devtools.build.lib.actions.EnvironmentalExecException;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnResult;
import com.google.devtools.build.lib.actions.SpawnResult.Status;
Expand All @@ -52,14 +52,15 @@
import com.google.devtools.build.lib.exec.SpawnRunner;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.Context;
Expand Down Expand Up @@ -146,33 +147,31 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
boolean spawnCachable = Spawns.mayBeCached(spawn);

context.report(ProgressStatus.EXECUTING, getName());
// Temporary hack: the TreeNodeRepository should be created and maintained upstream!
MetadataProvider inputFileCache = context.getMetadataProvider();
TreeNodeRepository repository = new TreeNodeRepository(execRoot, inputFileCache, digestUtil);

SortedMap<PathFragment, ActionInput> inputMap = context.getInputMapping(true);
TreeNode inputRoot;
try (SilentCloseable c = Profiler.instance().profile("Remote.computeMerkleDigests")) {
inputRoot = repository.buildFromActionInputs(inputMap);
repository.computeMerkleDigests(inputRoot);
}
final MerkleTree merkleTree =
MerkleTree.build(inputMap, context.getMetadataProvider(), execRoot, digestUtil);
maybeWriteParamFilesLocally(spawn);

// Get the remote platform properties.
Platform platform =
parsePlatform(spawn.getExecutionPlatform(), remoteOptions.remoteDefaultPlatformProperties);

Command command =
buildCommand(
spawn.getOutputFiles(), spawn.getArguments(), spawn.getEnvironment(), platform);
Action action;
ActionKey actionKey;
final Command command;
final Digest commandHash;
final Action action;
final ActionKey actionKey;
try (SilentCloseable c = Profiler.instance().profile("Remote.buildAction")) {
command =
buildCommand(
spawn.getOutputFiles(), spawn.getArguments(), spawn.getEnvironment(), platform);
commandHash = digestUtil.compute(command);
action =
buildAction(
digestUtil.compute(command),
repository.getMerkleDigest(inputRoot),
commandHash,
merkleTree.getRootDigest(),
context.getTimeout(),
spawnCachable);
Spawns.mayBeCached(spawn));
actionKey = digestUtil.computeActionKey(action);
}

Expand Down Expand Up @@ -241,7 +240,10 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
() -> {
// Upload the command and all the inputs into the remote cache.
try (SilentCloseable c = Profiler.instance().profile("Remote.uploadInputs")) {
remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, action, command);
Map<Digest, Message> additionalInputs = Maps.newHashMapWithExpectedSize(2);
additionalInputs.put(actionKey.getDigest(), action);
additionalInputs.put(commandHash, command);
remoteCache.ensureInputsPresent(merkleTree, additionalInputs, execRoot);
}
ExecuteResponse reply;
try (SilentCloseable c = Profiler.instance().profile("Remote.executeRemotely")) {
Expand Down
Loading

0 comments on commit 7f72544

Please sign in to comment.