From c90b86a86e5c07bcf549a4944e51ce9603d321ba Mon Sep 17 00:00:00 2001 From: Josh Smith Date: Thu, 13 Aug 2020 19:17:08 +0100 Subject: [PATCH] Add CompletenessChecking Fetcher --- cmd/bb_asset_hub/main.go | 2 +- go_dependencies.bzl | 4 + .../expose_find_missing_queue.diff | 301 ++++++++++++++++++ pkg/configuration/new_fetcher.go | 18 +- pkg/fetch/BUILD.bazel | 3 + pkg/fetch/completeness_checking_fetcher.go | 92 ++++++ .../bb_asset_hub/fetch/fetcher.proto | 13 + 7 files changed, 430 insertions(+), 3 deletions(-) create mode 100644 patches/com_github_buildbarn_bb_storage/expose_find_missing_queue.diff create mode 100644 pkg/fetch/completeness_checking_fetcher.go diff --git a/cmd/bb_asset_hub/main.go b/cmd/bb_asset_hub/main.go index 32e12de..6c4abdc 100644 --- a/cmd/bb_asset_hub/main.go +++ b/cmd/bb_asset_hub/main.go @@ -71,7 +71,7 @@ func main() { allowUpdatesForInstances[instanceName] = true } - fetchServer, err := configuration.NewFetcherFromConfiguration(config.Fetcher, assetStore, casBlobAccessCreator) + fetchServer, err := configuration.NewFetcherFromConfiguration(config.Fetcher, assetStore, casBlobAccessCreator, int(config.MaximumMessageSizeBytes)) if err != nil { log.Fatal("Failed to initialize fetch server from configuration: ", err) } diff --git a/go_dependencies.bzl b/go_dependencies.bzl index 8859707..52da09b 100644 --- a/go_dependencies.bzl +++ b/go_dependencies.bzl @@ -6,6 +6,10 @@ def go_dependencies(): importpath = "github.com/buildbarn/bb-storage", sum = "h1:XkZNtqFiqfDzn2SYfObEKeGs5JqOIcMniOu/mIXyFYY=", version = "v0.0.0-20200802055539-f0281e269c07", + # Temporary patch until patch lands upstream + patches = [ + "//:patches/com_github_buildbarn_bb_storage/expose_find_missing_queue.diff", + ] ) go_repository( name = "com_github_bazelbuild_remote_apis", diff --git a/patches/com_github_buildbarn_bb_storage/expose_find_missing_queue.diff b/patches/com_github_buildbarn_bb_storage/expose_find_missing_queue.diff new file mode 100644 index 0000000..4fa9aec --- /dev/null +++ b/patches/com_github_buildbarn_bb_storage/expose_find_missing_queue.diff @@ -0,0 +1,301 @@ +diff --git pkg/blobstore/completenesschecking/BUILD.bazel pkg/blobstore/completenesschecking/BUILD.bazel +index 0c0e849..3b46bf7 100644 +--- pkg/blobstore/completenesschecking/BUILD.bazel ++++ pkg/blobstore/completenesschecking/BUILD.bazel +@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + + go_library( + name = "go_default_library", +- srcs = ["completeness_checking_blob_access.go"], ++ srcs = [ ++ "completeness_checking_blob_access.go", ++ "find_missing_queue.go" ++ ], + importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/completenesschecking", + visibility = ["//visibility:public"], + deps = [ +diff --git pkg/blobstore/completenesschecking/completeness_checking_blob_access.go pkg/blobstore/completenesschecking/completeness_checking_blob_access.go +index 93db060..b437e58 100644 +--- pkg/blobstore/completenesschecking/completeness_checking_blob_access.go ++++ pkg/blobstore/completenesschecking/completeness_checking_blob_access.go +@@ -8,80 +8,8 @@ import ( + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/util" +- +- "google.golang.org/grpc/codes" +- "google.golang.org/grpc/status" + ) + +-// findMissingQueue is a helper for calling BlobAccess.FindMissing() in +-// batches, as opposed to calling it for individual digests. +-type findMissingQueue struct { +- context context.Context +- instanceName digest.InstanceName +- contentAddressableStorage blobstore.BlobAccess +- batchSize int +- +- pending digest.SetBuilder +-} +- +-// deriveDigest converts a digest embedded into an action result from +-// the wire format to an in-memory representation. If that fails, we +-// assume that some data corruption has occurred. In that case, we +-// should destroy the action result. +-func (q *findMissingQueue) deriveDigest(blobDigest *remoteexecution.Digest) (digest.Digest, error) { +- derivedDigest, err := q.instanceName.NewDigestFromProto(blobDigest) +- if err != nil { +- return digest.BadDigest, util.StatusWrapWithCode(err, codes.NotFound, "Action result contained malformed digest") +- } +- return derivedDigest, err +-} +- +-// Add a digest to the list of digests that are pending to be checked +-// for existence in the Content Addressable Storage. +-func (q *findMissingQueue) add(blobDigest *remoteexecution.Digest) error { +- if blobDigest != nil { +- derivedDigest, err := q.deriveDigest(blobDigest) +- if err != nil { +- return err +- } +- +- if q.pending.Length() >= q.batchSize { +- if err := q.finalize(); err != nil { +- return err +- } +- q.pending = digest.NewSetBuilder() +- } +- q.pending.Add(derivedDigest) +- } +- return nil +-} +- +-// AddDirectory adds all digests contained with a directory to the list +-// of digests pending to be checked for existence. +-func (q *findMissingQueue) addDirectory(directory *remoteexecution.Directory) error { +- if directory == nil { +- return nil +- } +- for _, child := range directory.Files { +- if err := q.add(child.Digest); err != nil { +- return err +- } +- } +- return nil +-} +- +-// Finalize by checking the last batch of digests for existence. +-func (q *findMissingQueue) finalize() error { +- missing, err := q.contentAddressableStorage.FindMissing(q.context, q.pending.Build()) +- if err != nil { +- return util.StatusWrap(err, "Failed to determine existence of child objects") +- } +- if digest, ok := missing.First(); ok { +- return status.Errorf(codes.NotFound, "Object %s referenced by the action result is not present in the Content Addressable Storage", digest) +- } +- return nil +-} +- + type completenessCheckingBlobAccess struct { + blobstore.BlobAccess + contentAddressableStorage blobstore.BlobAccess +@@ -113,13 +41,12 @@ func NewCompletenessCheckingBlobAccess(actionCache blobstore.BlobAccess, content + } + + func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context, instanceName digest.InstanceName, actionResult *remoteexecution.ActionResult) error { +- findMissingQueue := findMissingQueue{ +- context: ctx, +- instanceName: instanceName, +- contentAddressableStorage: ba.contentAddressableStorage, +- batchSize: ba.batchSize, +- pending: digest.NewSetBuilder(), +- } ++ findMissingQueue := NewFindMissingQueue( ++ ctx, ++ instanceName, ++ ba.contentAddressableStorage, ++ ba.batchSize, ++ ) + + // Iterate over all remoteexecution.Digest fields contained + // within the ActionResult. Check the existence of output +@@ -127,19 +54,19 @@ func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context, + // later on. GetTree() may not necessarily cause those objects + // to be touched. + for _, outputFile := range actionResult.OutputFiles { +- if err := findMissingQueue.add(outputFile.Digest); err != nil { ++ if err := findMissingQueue.Add(outputFile.Digest); err != nil { + return err + } + } + for _, outputDirectory := range actionResult.OutputDirectories { +- if err := findMissingQueue.add(outputDirectory.TreeDigest); err != nil { ++ if err := findMissingQueue.Add(outputDirectory.TreeDigest); err != nil { + return err + } + } +- if err := findMissingQueue.add(actionResult.StdoutDigest); err != nil { ++ if err := findMissingQueue.Add(actionResult.StdoutDigest); err != nil { + return err + } +- if err := findMissingQueue.add(actionResult.StderrDigest); err != nil { ++ if err := findMissingQueue.Add(actionResult.StderrDigest); err != nil { + return err + } + +@@ -147,7 +74,7 @@ func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context, + // within output directories (remoteexecution.Tree objects) + // referenced by the ActionResult. + for _, outputDirectory := range actionResult.OutputDirectories { +- treeDigest, err := findMissingQueue.deriveDigest(outputDirectory.TreeDigest) ++ treeDigest, err := findMissingQueue.DeriveDigest(outputDirectory.TreeDigest) + if err != nil { + return err + } +@@ -156,16 +83,16 @@ func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context, + return util.StatusWrapf(err, "Failed to fetch output directory %#v", outputDirectory.Path) + } + tree := treeMessage.(*remoteexecution.Tree) +- if err := findMissingQueue.addDirectory(tree.Root); err != nil { ++ if err := findMissingQueue.AddDirectory(tree.Root); err != nil { + return err + } + for _, child := range tree.Children { +- if err := findMissingQueue.addDirectory(child); err != nil { ++ if err := findMissingQueue.AddDirectory(child); err != nil { + return err + } + } + } +- return findMissingQueue.finalize() ++ return findMissingQueue.Finalize() + } + + func (ba *completenessCheckingBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer { +diff --git pkg/blobstore/completenesschecking/completeness_checking_blob_access_test.go pkg/blobstore/completenesschecking/completeness_checking_blob_access_test.go +index 8f3d5fc..69f4938 100644 +--- pkg/blobstore/completenesschecking/completeness_checking_blob_access_test.go ++++ pkg/blobstore/completenesschecking/completeness_checking_blob_access_test.go +@@ -56,7 +56,7 @@ func TestCompletenessCheckingBlobAccess(t *testing.T) { + buffer.Reparable(actionDigest, repairFunc.Call))) + + _, err := completenessCheckingBlobAccess.Get(ctx, actionDigest).ToProto(&remoteexecution.ActionResult{}, 1000) +- require.Equal(t, err, status.Error(codes.NotFound, "Action result contained malformed digest: Unknown digest hash length: 24 characters")) ++ require.Equal(t, err, status.Error(codes.NotFound, "Malformed digest found during FindMissing process: Unknown digest hash length: 24 characters")) + }) + + t.Run("MissingInput", func(t *testing.T) { +@@ -92,7 +92,7 @@ func TestCompletenessCheckingBlobAccess(t *testing.T) { + nil) + + _, err := completenessCheckingBlobAccess.Get(ctx, actionDigest).ToProto(&remoteexecution.ActionResult{}, 1000) +- require.Equal(t, err, status.Error(codes.NotFound, "Object 8b1a9953c4611296a827abf8c47804d7-5-hello referenced by the action result is not present in the Content Addressable Storage")) ++ require.Equal(t, err, status.Error(codes.NotFound, "Referenced Object 8b1a9953c4611296a827abf8c47804d7-5-hello is not present in the Content Addressable Storage")) + }) + + t.Run("FindMissingError", func(t *testing.T) { +diff --git pkg/blobstore/completenesschecking/find_missing_queue.go pkg/blobstore/completenesschecking/find_missing_queue.go +new file mode 100644 +index 0000000..be884c2 +--- /dev/null ++++ pkg/blobstore/completenesschecking/find_missing_queue.go +@@ -0,0 +1,95 @@ ++package completenesschecking ++ ++import ( ++ "context" ++ ++ remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" ++ "github.com/buildbarn/bb-storage/pkg/blobstore" ++ "github.com/buildbarn/bb-storage/pkg/digest" ++ "github.com/buildbarn/bb-storage/pkg/util" ++ ++ "google.golang.org/grpc/codes" ++ "google.golang.org/grpc/status" ++) ++ ++// findMissingQueue is a helper for calling BlobAccess.FindMissing() in ++// batches, as opposed to calling it for individual digests. ++type findMissingQueue struct { ++ context context.Context ++ instanceName digest.InstanceName ++ contentAddressableStorage blobstore.BlobAccess ++ batchSize int ++ ++ pending digest.SetBuilder ++} ++ ++func NewFindMissingQueue(context context.Context, instanceName digest.InstanceName, ++ contentAddressableStorage blobstore.BlobAccess, ++ batchSize int) findMissingQueue { ++ return findMissingQueue { ++ context: context, ++ instanceName: instanceName, ++ contentAddressableStorage: contentAddressableStorage, ++ batchSize: batchSize, ++ pending: digest.NewSetBuilder(), ++ } ++ ++} ++ ++// DeriveDigest converts a digest embedded into an action result from ++// the wire format to an in-memory representation. If that fails, we ++// assume that some data corruption has occurred. In that case, we ++// should destroy the action result. ++func (q *findMissingQueue) DeriveDigest(blobDigest *remoteexecution.Digest) (digest.Digest, error) { ++ derivedDigest, err := q.instanceName.NewDigestFromProto(blobDigest) ++ if err != nil { ++ return digest.BadDigest, util.StatusWrapWithCode(err, codes.NotFound, "Malformed digest found during FindMissing process") ++ } ++ return derivedDigest, err ++} ++ ++// Add a digest to the list of digests that are pending to be checked ++// for existence in the Content Addressable Storage. ++func (q *findMissingQueue) Add(blobDigest *remoteexecution.Digest) error { ++ if blobDigest != nil { ++ derivedDigest, err := q.DeriveDigest(blobDigest) ++ if err != nil { ++ return err ++ } ++ ++ if q.pending.Length() >= q.batchSize { ++ if err := q.Finalize(); err != nil { ++ return err ++ } ++ q.pending = digest.NewSetBuilder() ++ } ++ q.pending.Add(derivedDigest) ++ } ++ return nil ++} ++ ++// AddDirectory adds all digests contained with a directory to the list ++// of digests pending to be checked for existence. ++func (q *findMissingQueue) AddDirectory(directory *remoteexecution.Directory) error { ++ if directory == nil { ++ return nil ++ } ++ for _, child := range directory.Files { ++ if err := q.Add(child.Digest); err != nil { ++ return err ++ } ++ } ++ return nil ++} ++ ++// Finalize by checking the last batch of digests for existence. ++func (q *findMissingQueue) Finalize() error { ++ missing, err := q.contentAddressableStorage.FindMissing(q.context, q.pending.Build()) ++ if err != nil { ++ return util.StatusWrap(err, "Failed to determine existence of child objects") ++ } ++ if digest, ok := missing.First(); ok { ++ return status.Errorf(codes.NotFound, "Referenced Object %s is not present in the Content Addressable Storage", digest) ++ } ++ return nil ++} +\ No newline at end of file +-- +2.23.0 + diff --git a/pkg/configuration/new_fetcher.go b/pkg/configuration/new_fetcher.go index 23f956e..e6e87b1 100644 --- a/pkg/configuration/new_fetcher.go +++ b/pkg/configuration/new_fetcher.go @@ -20,13 +20,13 @@ import ( // a jsonnet configuration. func NewFetcherFromConfiguration(configuration *pb.FetcherConfiguration, assetStore *storage.AssetStore, - casBlobAccessCreator blobstore_configuration.BlobAccessCreator) (remoteasset.FetchServer, error) { + casBlobAccessCreator blobstore_configuration.BlobAccessCreator, maximumMessageSizeBytes int) (remoteasset.FetchServer, error) { var fetcher remoteasset.FetchServer switch backend := configuration.Backend.(type) { case *pb.FetcherConfiguration_Caching: innerFetcher, err := NewFetcherFromConfiguration( backend.Caching, assetStore, - casBlobAccessCreator) + casBlobAccessCreator, maximumMessageSizeBytes) if err != nil { return nil, err } @@ -55,6 +55,20 @@ func NewFetcherFromConfiguration(configuration *pb.FetcherConfiguration, allowUpdatesForInstances) case *pb.FetcherConfiguration_Error: fetcher = fetch.NewErrorFetcher(backend.Error) + case *pb.FetcherConfiguration_CompletenessChecking: + cas, err := blobstore_configuration.NewBlobAccessFromConfiguration( + backend.CompletenessChecking.ContentAddressableStorage, + casBlobAccessCreator) + innerFetcher, err := NewFetcherFromConfiguration( + backend.CompletenessChecking.Fetcher, assetStore, + casBlobAccessCreator, maximumMessageSizeBytes) + if err != nil { + return nil, err + } + fetcher = fetch.NewCompletenessCheckingFetcher( + innerFetcher, cas, + int(backend.CompletenessChecking.BatchSize), maximumMessageSizeBytes) + default: return nil, status.Errorf(codes.InvalidArgument, "Fetcher configuration is invalid as no supported Fetchers are defined.") } diff --git a/pkg/fetch/BUILD.bazel b/pkg/fetch/BUILD.bazel index b5a53eb..51ce982 100644 --- a/pkg/fetch/BUILD.bazel +++ b/pkg/fetch/BUILD.bazel @@ -8,13 +8,16 @@ go_library( "http_fetcher.go", "logging_fetcher.go", "validating_fetcher.go", + "completeness_checking_fetcher.go" ], importpath = "github.com/buildbarn/bb-asset-hub/pkg/fetch", visibility = ["//visibility:public"], deps = [ "//pkg/storage:go_default_library", "@com_github_bazelbuild_remote_apis//build/bazel/remote/asset/v1:go_default_library", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library", "@com_github_buildbarn_bb_storage//pkg/blobstore:go_default_library", + "@com_github_buildbarn_bb_storage//pkg/blobstore/completenesschecking:go_default_library", "@com_github_buildbarn_bb_storage//pkg/blobstore/buffer:go_default_library", "@com_github_buildbarn_bb_storage//pkg/digest:go_default_library", "@com_github_buildbarn_bb_storage//pkg/util:go_default_library", diff --git a/pkg/fetch/completeness_checking_fetcher.go b/pkg/fetch/completeness_checking_fetcher.go new file mode 100644 index 0000000..2785d17 --- /dev/null +++ b/pkg/fetch/completeness_checking_fetcher.go @@ -0,0 +1,92 @@ +package fetch + +import ( + "context" + + "github.com/buildbarn/bb-storage/pkg/blobstore" + bb_digest "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/blobstore/completenesschecking" + "github.com/buildbarn/bb-storage/pkg/util" + + remoteasset "github.com/bazelbuild/remote-apis/build/bazel/remote/asset/v1" + remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + // protostatus "google.golang.org/genproto/googleapis/rpc/status" + // "google.golang.org/grpc/status" +) + +type completenessCheckingFetcher struct { + fetcher remoteasset.FetchServer + contentAddressableStorage blobstore.BlobAccess + batchSize int + maximumMessageSizeBytes int +} + +// NewErrorFetcher creates a Remote Asset API Fetch service which simply returns a +// set gRPC status +func NewCompletenessCheckingFetcher(fetcher remoteasset.FetchServer, contentAddressableStorage blobstore.BlobAccess, + batchSize int, maximumMessageSizeBytes int) remoteasset.FetchServer { + return &completenessCheckingFetcher{ + fetcher: fetcher, + contentAddressableStorage: contentAddressableStorage, + batchSize: batchSize, + maximumMessageSizeBytes: maximumMessageSizeBytes, + } +} + +func (cf *completenessCheckingFetcher) FetchBlob(ctx context.Context, req *remoteasset.FetchBlobRequest) (*remoteasset.FetchBlobResponse, error) { + response, err := cf.fetcher.FetchBlob(ctx, req) + instanceName, err := bb_digest.NewInstanceName(req.InstanceName) + findMissingQueue := completenesschecking.NewFindMissingQueue(ctx, instanceName, cf.contentAddressableStorage, cf.batchSize) + + if err := findMissingQueue.Add(response.BlobDigest); err != nil { + // TODO: Delete asset reference and retry cf.fetcher.FetchBlob() + return nil, util.StatusWrapf(err, "Failed completeness check whilst fetching blob %s", response.Uri) + } + if err := findMissingQueue.Finalize(); err != nil { + // TODO: Delete asset reference and retry cf.fetcher.FetchBlob() + return nil, util.StatusWrapf(err, "Failed completeness check whilst fetching blob %s", response.Uri) + } + + return response, err +} + +func (cf *completenessCheckingFetcher) FetchDirectory(ctx context.Context, req *remoteasset.FetchDirectoryRequest) (*remoteasset.FetchDirectoryResponse, error) { + response, err := cf.fetcher.FetchDirectory(ctx, req) + + instanceName, err := bb_digest.NewInstanceName(req.InstanceName) + + if err := cf.checkDirectoryCompleteness(ctx, instanceName, response.RootDirectoryDigest); err != nil { + // TODO: Handle failed completeness? + return nil, util.StatusWrapf(err, "Failed completeness check whilst fetching directory %s", response.Uri) + } + + return response, err +} + +// Fetch the tree associated with the root digest and +// Iterate over all remoteexecution.Digest fields below the root +// directory (remoteexecution.Tree objects) +// referenced by the ActionResult. +func (cf *completenessCheckingFetcher) checkDirectoryCompleteness(ctx context.Context, instanceName bb_digest.InstanceName, + rootDigest *remoteexecution.Digest) error { + findMissingQueue := completenesschecking.NewFindMissingQueue(ctx, instanceName, cf.contentAddressableStorage, cf.batchSize) + + treeDigest, err := findMissingQueue.DeriveDigest(rootDigest) + if err != nil { + return err + } + treeMessage, err := cf.contentAddressableStorage.Get(ctx, treeDigest).ToProto(&remoteexecution.Tree{}, cf.maximumMessageSizeBytes) + if err != nil { + return util.StatusWrapf(err, "Referenced Directory Tree %s is not present in the Content Addressable Storage", treeDigest) + } + tree := treeMessage.(*remoteexecution.Tree) + if err := findMissingQueue.AddDirectory(tree.Root); err != nil { + return err + } + for _, child := range tree.Children { + if err := findMissingQueue.AddDirectory(child); err != nil { + return err + } + } + return findMissingQueue.Finalize() +} diff --git a/pkg/proto/configuration/bb_asset_hub/fetch/fetcher.proto b/pkg/proto/configuration/bb_asset_hub/fetch/fetcher.proto index 36b1600..92ee5eb 100644 --- a/pkg/proto/configuration/bb_asset_hub/fetch/fetcher.proto +++ b/pkg/proto/configuration/bb_asset_hub/fetch/fetcher.proto @@ -22,6 +22,10 @@ message FetcherConfiguration { // Note that in jsonnet configuration, 'error' will need to be in quotes to // avoid collision with a protected keyword google.rpc.Status error = 3; + + // Ensures consistency between digests in remote asset responses and + // the contents of the associated CAS. + CompletenessCheckingFetcherConfiguration completeness_checking = 4; } message HttpFetcherConfiguration { @@ -32,4 +36,13 @@ message FetcherConfiguration { // List of instances which can trigger an upload to the CAS repeated string allow_updates_for_instances = 2; } + + message CompletenessCheckingFetcherConfiguration { + FetcherConfiguration fetcher = 1; + // Configuration for blob storage. + buildbarn.configuration.blobstore.BlobAccessConfiguration + content_addressable_storage = 2; + + uint32 batch_size = 3; + } }