Skip to content
This repository has been archived by the owner on Aug 28, 2020. It is now read-only.

Commit

Permalink
Add CompletenessChecking Fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Qinusty committed Aug 13, 2020
1 parent f67fecc commit c90b86a
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cmd/bb_asset_hub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
allowUpdatesForInstances[instanceName] = true
}

fetchServer, err := configuration.NewFetcherFromConfiguration(config.Fetcher, assetStore, casBlobAccessCreator)
fetchServer, err := configuration.NewFetcherFromConfiguration(config.Fetcher, assetStore, casBlobAccessCreator, int(config.MaximumMessageSizeBytes))
if err != nil {
log.Fatal("Failed to initialize fetch server from configuration: ", err)
}
Expand Down
4 changes: 4 additions & 0 deletions go_dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ def go_dependencies():
importpath = "github.com/buildbarn/bb-storage",
sum = "h1:XkZNtqFiqfDzn2SYfObEKeGs5JqOIcMniOu/mIXyFYY=",
version = "v0.0.0-20200802055539-f0281e269c07",
# Temporary patch until patch lands upstream
patches = [
"//:patches/com_github_buildbarn_bb_storage/expose_find_missing_queue.diff",
]
)
go_repository(
name = "com_github_bazelbuild_remote_apis",
Expand Down
301 changes: 301 additions & 0 deletions patches/com_github_buildbarn_bb_storage/expose_find_missing_queue.diff
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
diff --git pkg/blobstore/completenesschecking/BUILD.bazel pkg/blobstore/completenesschecking/BUILD.bazel
index 0c0e849..3b46bf7 100644
--- pkg/blobstore/completenesschecking/BUILD.bazel
+++ pkg/blobstore/completenesschecking/BUILD.bazel
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
- srcs = ["completeness_checking_blob_access.go"],
+ srcs = [
+ "completeness_checking_blob_access.go",
+ "find_missing_queue.go"
+ ],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/completenesschecking",
visibility = ["//visibility:public"],
deps = [
diff --git pkg/blobstore/completenesschecking/completeness_checking_blob_access.go pkg/blobstore/completenesschecking/completeness_checking_blob_access.go
index 93db060..b437e58 100644
--- pkg/blobstore/completenesschecking/completeness_checking_blob_access.go
+++ pkg/blobstore/completenesschecking/completeness_checking_blob_access.go
@@ -8,80 +8,8 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/util"
-
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)

-// findMissingQueue is a helper for calling BlobAccess.FindMissing() in
-// batches, as opposed to calling it for individual digests.
-type findMissingQueue struct {
- context context.Context
- instanceName digest.InstanceName
- contentAddressableStorage blobstore.BlobAccess
- batchSize int
-
- pending digest.SetBuilder
-}
-
-// deriveDigest converts a digest embedded into an action result from
-// the wire format to an in-memory representation. If that fails, we
-// assume that some data corruption has occurred. In that case, we
-// should destroy the action result.
-func (q *findMissingQueue) deriveDigest(blobDigest *remoteexecution.Digest) (digest.Digest, error) {
- derivedDigest, err := q.instanceName.NewDigestFromProto(blobDigest)
- if err != nil {
- return digest.BadDigest, util.StatusWrapWithCode(err, codes.NotFound, "Action result contained malformed digest")
- }
- return derivedDigest, err
-}
-
-// Add a digest to the list of digests that are pending to be checked
-// for existence in the Content Addressable Storage.
-func (q *findMissingQueue) add(blobDigest *remoteexecution.Digest) error {
- if blobDigest != nil {
- derivedDigest, err := q.deriveDigest(blobDigest)
- if err != nil {
- return err
- }
-
- if q.pending.Length() >= q.batchSize {
- if err := q.finalize(); err != nil {
- return err
- }
- q.pending = digest.NewSetBuilder()
- }
- q.pending.Add(derivedDigest)
- }
- return nil
-}
-
-// AddDirectory adds all digests contained with a directory to the list
-// of digests pending to be checked for existence.
-func (q *findMissingQueue) addDirectory(directory *remoteexecution.Directory) error {
- if directory == nil {
- return nil
- }
- for _, child := range directory.Files {
- if err := q.add(child.Digest); err != nil {
- return err
- }
- }
- return nil
-}
-
-// Finalize by checking the last batch of digests for existence.
-func (q *findMissingQueue) finalize() error {
- missing, err := q.contentAddressableStorage.FindMissing(q.context, q.pending.Build())
- if err != nil {
- return util.StatusWrap(err, "Failed to determine existence of child objects")
- }
- if digest, ok := missing.First(); ok {
- return status.Errorf(codes.NotFound, "Object %s referenced by the action result is not present in the Content Addressable Storage", digest)
- }
- return nil
-}
-
type completenessCheckingBlobAccess struct {
blobstore.BlobAccess
contentAddressableStorage blobstore.BlobAccess
@@ -113,13 +41,12 @@ func NewCompletenessCheckingBlobAccess(actionCache blobstore.BlobAccess, content
}

func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context, instanceName digest.InstanceName, actionResult *remoteexecution.ActionResult) error {
- findMissingQueue := findMissingQueue{
- context: ctx,
- instanceName: instanceName,
- contentAddressableStorage: ba.contentAddressableStorage,
- batchSize: ba.batchSize,
- pending: digest.NewSetBuilder(),
- }
+ findMissingQueue := NewFindMissingQueue(
+ ctx,
+ instanceName,
+ ba.contentAddressableStorage,
+ ba.batchSize,
+ )

// Iterate over all remoteexecution.Digest fields contained
// within the ActionResult. Check the existence of output
@@ -127,19 +54,19 @@ func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context,
// later on. GetTree() may not necessarily cause those objects
// to be touched.
for _, outputFile := range actionResult.OutputFiles {
- if err := findMissingQueue.add(outputFile.Digest); err != nil {
+ if err := findMissingQueue.Add(outputFile.Digest); err != nil {
return err
}
}
for _, outputDirectory := range actionResult.OutputDirectories {
- if err := findMissingQueue.add(outputDirectory.TreeDigest); err != nil {
+ if err := findMissingQueue.Add(outputDirectory.TreeDigest); err != nil {
return err
}
}
- if err := findMissingQueue.add(actionResult.StdoutDigest); err != nil {
+ if err := findMissingQueue.Add(actionResult.StdoutDigest); err != nil {
return err
}
- if err := findMissingQueue.add(actionResult.StderrDigest); err != nil {
+ if err := findMissingQueue.Add(actionResult.StderrDigest); err != nil {
return err
}

@@ -147,7 +74,7 @@ func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context,
// within output directories (remoteexecution.Tree objects)
// referenced by the ActionResult.
for _, outputDirectory := range actionResult.OutputDirectories {
- treeDigest, err := findMissingQueue.deriveDigest(outputDirectory.TreeDigest)
+ treeDigest, err := findMissingQueue.DeriveDigest(outputDirectory.TreeDigest)
if err != nil {
return err
}
@@ -156,16 +83,16 @@ func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context,
return util.StatusWrapf(err, "Failed to fetch output directory %#v", outputDirectory.Path)
}
tree := treeMessage.(*remoteexecution.Tree)
- if err := findMissingQueue.addDirectory(tree.Root); err != nil {
+ if err := findMissingQueue.AddDirectory(tree.Root); err != nil {
return err
}
for _, child := range tree.Children {
- if err := findMissingQueue.addDirectory(child); err != nil {
+ if err := findMissingQueue.AddDirectory(child); err != nil {
return err
}
}
}
- return findMissingQueue.finalize()
+ return findMissingQueue.Finalize()
}

func (ba *completenessCheckingBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
diff --git pkg/blobstore/completenesschecking/completeness_checking_blob_access_test.go pkg/blobstore/completenesschecking/completeness_checking_blob_access_test.go
index 8f3d5fc..69f4938 100644
--- pkg/blobstore/completenesschecking/completeness_checking_blob_access_test.go
+++ pkg/blobstore/completenesschecking/completeness_checking_blob_access_test.go
@@ -56,7 +56,7 @@ func TestCompletenessCheckingBlobAccess(t *testing.T) {
buffer.Reparable(actionDigest, repairFunc.Call)))

_, err := completenessCheckingBlobAccess.Get(ctx, actionDigest).ToProto(&remoteexecution.ActionResult{}, 1000)
- require.Equal(t, err, status.Error(codes.NotFound, "Action result contained malformed digest: Unknown digest hash length: 24 characters"))
+ require.Equal(t, err, status.Error(codes.NotFound, "Malformed digest found during FindMissing process: Unknown digest hash length: 24 characters"))
})

t.Run("MissingInput", func(t *testing.T) {
@@ -92,7 +92,7 @@ func TestCompletenessCheckingBlobAccess(t *testing.T) {
nil)

_, err := completenessCheckingBlobAccess.Get(ctx, actionDigest).ToProto(&remoteexecution.ActionResult{}, 1000)
- require.Equal(t, err, status.Error(codes.NotFound, "Object 8b1a9953c4611296a827abf8c47804d7-5-hello referenced by the action result is not present in the Content Addressable Storage"))
+ require.Equal(t, err, status.Error(codes.NotFound, "Referenced Object 8b1a9953c4611296a827abf8c47804d7-5-hello is not present in the Content Addressable Storage"))
})

t.Run("FindMissingError", func(t *testing.T) {
diff --git pkg/blobstore/completenesschecking/find_missing_queue.go pkg/blobstore/completenesschecking/find_missing_queue.go
new file mode 100644
index 0000000..be884c2
--- /dev/null
+++ pkg/blobstore/completenesschecking/find_missing_queue.go
@@ -0,0 +1,95 @@
+package completenesschecking
+
+import (
+ "context"
+
+ remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
+ "github.com/buildbarn/bb-storage/pkg/blobstore"
+ "github.com/buildbarn/bb-storage/pkg/digest"
+ "github.com/buildbarn/bb-storage/pkg/util"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// findMissingQueue is a helper for calling BlobAccess.FindMissing() in
+// batches, as opposed to calling it for individual digests.
+type findMissingQueue struct {
+ context context.Context
+ instanceName digest.InstanceName
+ contentAddressableStorage blobstore.BlobAccess
+ batchSize int
+
+ pending digest.SetBuilder
+}
+
+func NewFindMissingQueue(context context.Context, instanceName digest.InstanceName,
+ contentAddressableStorage blobstore.BlobAccess,
+ batchSize int) findMissingQueue {
+ return findMissingQueue {
+ context: context,
+ instanceName: instanceName,
+ contentAddressableStorage: contentAddressableStorage,
+ batchSize: batchSize,
+ pending: digest.NewSetBuilder(),
+ }
+
+}
+
+// DeriveDigest converts a digest embedded into an action result from
+// the wire format to an in-memory representation. If that fails, we
+// assume that some data corruption has occurred. In that case, we
+// should destroy the action result.
+func (q *findMissingQueue) DeriveDigest(blobDigest *remoteexecution.Digest) (digest.Digest, error) {
+ derivedDigest, err := q.instanceName.NewDigestFromProto(blobDigest)
+ if err != nil {
+ return digest.BadDigest, util.StatusWrapWithCode(err, codes.NotFound, "Malformed digest found during FindMissing process")
+ }
+ return derivedDigest, err
+}
+
+// Add a digest to the list of digests that are pending to be checked
+// for existence in the Content Addressable Storage.
+func (q *findMissingQueue) Add(blobDigest *remoteexecution.Digest) error {
+ if blobDigest != nil {
+ derivedDigest, err := q.DeriveDigest(blobDigest)
+ if err != nil {
+ return err
+ }
+
+ if q.pending.Length() >= q.batchSize {
+ if err := q.Finalize(); err != nil {
+ return err
+ }
+ q.pending = digest.NewSetBuilder()
+ }
+ q.pending.Add(derivedDigest)
+ }
+ return nil
+}
+
+// AddDirectory adds all digests contained with a directory to the list
+// of digests pending to be checked for existence.
+func (q *findMissingQueue) AddDirectory(directory *remoteexecution.Directory) error {
+ if directory == nil {
+ return nil
+ }
+ for _, child := range directory.Files {
+ if err := q.Add(child.Digest); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Finalize by checking the last batch of digests for existence.
+func (q *findMissingQueue) Finalize() error {
+ missing, err := q.contentAddressableStorage.FindMissing(q.context, q.pending.Build())
+ if err != nil {
+ return util.StatusWrap(err, "Failed to determine existence of child objects")
+ }
+ if digest, ok := missing.First(); ok {
+ return status.Errorf(codes.NotFound, "Referenced Object %s is not present in the Content Addressable Storage", digest)
+ }
+ return nil
+}
\ No newline at end of file
--
2.23.0

18 changes: 16 additions & 2 deletions pkg/configuration/new_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
// a jsonnet configuration.
func NewFetcherFromConfiguration(configuration *pb.FetcherConfiguration,
assetStore *storage.AssetStore,
casBlobAccessCreator blobstore_configuration.BlobAccessCreator) (remoteasset.FetchServer, error) {
casBlobAccessCreator blobstore_configuration.BlobAccessCreator, maximumMessageSizeBytes int) (remoteasset.FetchServer, error) {
var fetcher remoteasset.FetchServer
switch backend := configuration.Backend.(type) {
case *pb.FetcherConfiguration_Caching:
innerFetcher, err := NewFetcherFromConfiguration(
backend.Caching, assetStore,
casBlobAccessCreator)
casBlobAccessCreator, maximumMessageSizeBytes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -55,6 +55,20 @@ func NewFetcherFromConfiguration(configuration *pb.FetcherConfiguration,
allowUpdatesForInstances)
case *pb.FetcherConfiguration_Error:
fetcher = fetch.NewErrorFetcher(backend.Error)
case *pb.FetcherConfiguration_CompletenessChecking:
cas, err := blobstore_configuration.NewBlobAccessFromConfiguration(
backend.CompletenessChecking.ContentAddressableStorage,
casBlobAccessCreator)
innerFetcher, err := NewFetcherFromConfiguration(
backend.CompletenessChecking.Fetcher, assetStore,
casBlobAccessCreator, maximumMessageSizeBytes)
if err != nil {
return nil, err
}
fetcher = fetch.NewCompletenessCheckingFetcher(
innerFetcher, cas,
int(backend.CompletenessChecking.BatchSize), maximumMessageSizeBytes)

default:
return nil, status.Errorf(codes.InvalidArgument, "Fetcher configuration is invalid as no supported Fetchers are defined.")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/fetch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ go_library(
"http_fetcher.go",
"logging_fetcher.go",
"validating_fetcher.go",
"completeness_checking_fetcher.go"
],
importpath = "github.com/buildbarn/bb-asset-hub/pkg/fetch",
visibility = ["//visibility:public"],
deps = [
"//pkg/storage:go_default_library",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/asset/v1:go_default_library",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/blobstore:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/blobstore/completenesschecking:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/blobstore/buffer:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/digest:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/util:go_default_library",
Expand Down
Loading

0 comments on commit c90b86a

Please sign in to comment.