From d384fbd873467ccf542d23d01f4b4a3cbf8a9fdb Mon Sep 17 00:00:00 2001 From: Erick Fejta Date: Tue, 20 Jul 2021 16:22:50 -0700 Subject: [PATCH] Check preconditions when reading. The official storage client does this incorrectly: it uses the XML api for reads but sends ifGenerationMatch parameters (for the json api) rather than the required x-goog-if-generation-match headers. So instead check this manually when the response comes back. Also fix some related issues (cannot use the testgroup's conditions to read objects, similar thing for reading the config). --- cmd/updater/main.go | 2 +- config/queue.go | 6 +++--- pkg/updater/updater.go | 29 ++++++++++++----------------- pkg/updater/updater_test.go | 4 ++-- util/gcs/BUILD.bazel | 1 + util/gcs/real_gcs.go | 21 +++++++++++++++++++++ 6 files changed, 40 insertions(+), 23 deletions(-) diff --git a/cmd/updater/main.go b/cmd/updater/main.go index d6a13352f..167207581 100644 --- a/cmd/updater/main.go +++ b/cmd/updater/main.go @@ -155,7 +155,7 @@ func main() { "build": opt.buildConcurrency, }).Info("Configured concurrency") - groupUpdater := updater.GCS(opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, updater.SortStarted) + groupUpdater := updater.GCS(client, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, updater.SortStarted) mets := setupMetrics(ctx) diff --git a/config/queue.go b/config/queue.go index 7c11cb91c..4adfcfe0b 100644 --- a/config/queue.go +++ b/config/queue.go @@ -111,8 +111,8 @@ func (q *TestGroupQueue) FixAll(whens map[string]time.Time) error { "group": name, "when": when, }).Info("Fixing groups") + it.when = when } - it.when = when } heap.Init(&q.queue) if len(missing) > 0 { @@ -136,9 +136,9 @@ func (q *TestGroupQueue) Fix(name string, when time.Time) error { "group": name, "when": when, }).Info("Fixed group") + it.when = when + heap.Fix(&q.queue, it.index) } - it.when = when - heap.Fix(&q.queue, it.index) return nil } diff --git a/pkg/updater/updater.go b/pkg/updater/updater.go index e851fea8f..63e008ccf 100644 --- a/pkg/updater/updater.go +++ b/pkg/updater/updater.go @@ -114,7 +114,7 @@ func (mets *Metrics) delay(dur time.Duration) { type GroupUpdater func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error) // GCS returns a GCS-based GroupUpdater, which knows how to process result data stored in GCS. -func GCS(groupTimeout, buildTimeout time.Duration, concurrency int, write bool, sortCols ColumnSorter) GroupUpdater { +func GCS(colClient gcs.Client, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, sortCols ColumnSorter) GroupUpdater { return func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error) { if !tg.UseKubernetesClient { log.Debug("Skipping non-kubernetes client group") @@ -122,7 +122,7 @@ func GCS(groupTimeout, buildTimeout time.Duration, concurrency int, write bool, } ctx, cancel := context.WithTimeout(parent, groupTimeout) defer cancel() - gcsColReader := gcsColumnReader(client, buildTimeout, concurrency) + gcsColReader := gcsColumnReader(colClient, buildTimeout, concurrency) reprocess := 20 * time.Minute // allow 20m for prow to finish uploading artifacts return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, sortCols, reprocess) } @@ -195,13 +195,8 @@ func update(ctx context.Context, client gcs.ConditionalClient, log logrus.FieldL return more, nil } -type testGroupClient interface { - gcs.Opener - gcs.Stater -} - -func updateTestGroups(ctx context.Context, client testGroupClient, q *config.TestGroupQueue, configPath gcs.Path, gridPrefix string, groupNames []string, freq time.Duration) (int64, map[string]int64, error) { - r, attrs, err := client.Open(ctx, configPath) +func updateTestGroups(ctx context.Context, opener gcs.Opener, stater gcs.Stater, q *config.TestGroupQueue, configPath gcs.Path, gridPrefix string, groupNames []string, freq time.Duration) (int64, map[string]int64, error) { + r, attrs, err := opener.Open(ctx, configPath) if err != nil { if !isPreconditionFailed(err) { err = fmt.Errorf("read: %v", err) @@ -239,9 +234,8 @@ func updateTestGroups(ctx context.Context, client testGroupClient, q *config.Tes if err != nil { return configGen, nil, err } - attrs := gcs.Stat(ctx, client, 20, paths...) + attrs := gcs.Stat(ctx, stater, 20, paths...) updates := make(map[string]time.Time, len(attrs)) - now := time.Now() for i, attrs := range attrs { name := groups[i].Name switch { @@ -249,7 +243,6 @@ func updateTestGroups(ctx context.Context, client testGroupClient, q *config.Tes updates[name] = attrs.Attrs.Updated.Add(freq) generations[name] = attrs.Attrs.Generation case attrs.Err == storage.ErrObjectNotExist: - updates[name] = now generations[name] = 0 default: // no change @@ -274,7 +267,7 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics, var q config.TestGroupQueue log.Debug("Fetching testgroup metadata state...") - gen, generations, err := updateTestGroups(ctx, client, &q, configPath, gridPrefix, groupNames, freq) + gen, generations, err := updateTestGroups(ctx, client, client, &q, configPath, gridPrefix, groupNames, freq) if err != nil { return err } @@ -323,7 +316,7 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics, go func() { cond := storage.Conditions{GenerationNotMatch: gen} - client := client.If(&cond, nil) + opener := client.If(&cond, &cond) ticker := time.NewTicker(time.Minute) for { depth, next, when := q.Status() @@ -344,10 +337,12 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics, ticker.Stop() return case <-ticker.C: - if gen, _, err := updateTestGroups(ctx, client, &q, configPath, gridPrefix, groupNames, freq); err != nil { - log.WithError(err).Error("Failed to update configuration") - } else { + gen, _, err := updateTestGroups(ctx, opener, client, &q, configPath, gridPrefix, groupNames, freq) + switch { + case err == nil: cond.GenerationNotMatch = gen + case !isPreconditionFailed(err): + log.WithError(err).Error("Failed to update configuration") } } } diff --git a/pkg/updater/updater_test.go b/pkg/updater/updater_test.go index afc755518..4f918ae6d 100644 --- a/pkg/updater/updater_test.go +++ b/pkg/updater/updater_test.go @@ -76,7 +76,7 @@ func TestGCS(t *testing.T) { // either because the context is canceled or things like client are unset) ctx, cancel := context.WithCancel(context.Background()) cancel() - updater := GCS(0, 0, 0, false, SortStarted) + updater := GCS(nil, 0, 0, 0, false, SortStarted) defer func() { if r := recover(); r != nil { if !tc.fail { @@ -316,7 +316,7 @@ func TestUpdate(t *testing.T) { client.Lister[buildsPath] = fi } - groupUpdater := GCS(*tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, SortStarted) + groupUpdater := GCS(client, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, SortStarted) mets := &Metrics{ Successes: &fakeCounter{}, Errors: &fakeCounter{}, diff --git a/util/gcs/BUILD.bazel b/util/gcs/BUILD.bazel index f12118e0e..26a11bc48 100644 --- a/util/gcs/BUILD.bazel +++ b/util/gcs/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "@com_github_sirupsen_logrus//:go_default_library", "@com_google_cloud_go_storage//:go_default_library", "@io_k8s_api//core/v1:go_default_library", + "@org_golang_google_api//googleapi:go_default_library", "@org_golang_google_api//iterator:go_default_library", "@org_golang_google_api//option:go_default_library", ], diff --git a/util/gcs/real_gcs.go b/util/gcs/real_gcs.go index 94737cf2b..113b12ecc 100644 --- a/util/gcs/real_gcs.go +++ b/util/gcs/real_gcs.go @@ -19,9 +19,11 @@ package gcs import ( "context" "io" + "net/http" "strings" "cloud.google.com/go/storage" + "google.golang.org/api/googleapi" ) var ( @@ -65,9 +67,28 @@ func (rgc realGCSClient) Open(ctx context.Context, path Path) (io.ReadCloser, *s if r == nil { return nil, nil, err } + if err == nil && rgc.readCond != nil { + err = checkPreconditions(r.Attrs, rgc.readCond) + } return r, &r.Attrs, err } +var ( + errPreconditions = googleapi.Error{ + Code: http.StatusPreconditionFailed, + } +) + +func checkPreconditions(attrs storage.ReaderObjectAttrs, cond *storage.Conditions) error { + if g := cond.GenerationMatch; g > 0 && g != attrs.Generation { + return &errPreconditions + } + if g := cond.GenerationNotMatch; g > 0 && g == attrs.Generation { + return &errPreconditions + } + return nil +} + func (rgc realGCSClient) Objects(ctx context.Context, path Path, delimiter, startOffset string) Iterator { p := path.Object() if !strings.HasSuffix(p, "/") {