Skip to content

Commit

Permalink
Check preconditions when reading.
Browse files Browse the repository at this point in the history
The official storage client does this incorrectly: it uses the XML api for reads but sends ifGenerationMatch
parameters (for the json api) rather than the required x-goog-if-generation-match headers. So instead check
this manually when the response comes back.

Also fix some related issues (cannot use the testgroup's conditions to read objects, similar thing for reading
the config).
  • Loading branch information
fejta committed Jul 20, 2021
1 parent 1bd8237 commit d384fbd
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cmd/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func main() {
"build": opt.buildConcurrency,
}).Info("Configured concurrency")

groupUpdater := updater.GCS(opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, updater.SortStarted)
groupUpdater := updater.GCS(client, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, updater.SortStarted)

mets := setupMetrics(ctx)

Expand Down
6 changes: 3 additions & 3 deletions config/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (q *TestGroupQueue) FixAll(whens map[string]time.Time) error {
"group": name,
"when": when,
}).Info("Fixing groups")
it.when = when
}
it.when = when
}
heap.Init(&q.queue)
if len(missing) > 0 {
Expand All @@ -136,9 +136,9 @@ func (q *TestGroupQueue) Fix(name string, when time.Time) error {
"group": name,
"when": when,
}).Info("Fixed group")
it.when = when
heap.Fix(&q.queue, it.index)
}
it.when = when
heap.Fix(&q.queue, it.index)
return nil
}

Expand Down
29 changes: 12 additions & 17 deletions pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ func (mets *Metrics) delay(dur time.Duration) {
type GroupUpdater func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error)

// GCS returns a GCS-based GroupUpdater, which knows how to process result data stored in GCS.
func GCS(groupTimeout, buildTimeout time.Duration, concurrency int, write bool, sortCols ColumnSorter) GroupUpdater {
func GCS(colClient gcs.Client, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, sortCols ColumnSorter) GroupUpdater {
return func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error) {
if !tg.UseKubernetesClient {
log.Debug("Skipping non-kubernetes client group")
return false, nil
}
ctx, cancel := context.WithTimeout(parent, groupTimeout)
defer cancel()
gcsColReader := gcsColumnReader(client, buildTimeout, concurrency)
gcsColReader := gcsColumnReader(colClient, buildTimeout, concurrency)
reprocess := 20 * time.Minute // allow 20m for prow to finish uploading artifacts
return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, sortCols, reprocess)
}
Expand Down Expand Up @@ -195,13 +195,8 @@ func update(ctx context.Context, client gcs.ConditionalClient, log logrus.FieldL
return more, nil
}

type testGroupClient interface {
gcs.Opener
gcs.Stater
}

func updateTestGroups(ctx context.Context, client testGroupClient, q *config.TestGroupQueue, configPath gcs.Path, gridPrefix string, groupNames []string, freq time.Duration) (int64, map[string]int64, error) {
r, attrs, err := client.Open(ctx, configPath)
func updateTestGroups(ctx context.Context, opener gcs.Opener, stater gcs.Stater, q *config.TestGroupQueue, configPath gcs.Path, gridPrefix string, groupNames []string, freq time.Duration) (int64, map[string]int64, error) {
r, attrs, err := opener.Open(ctx, configPath)
if err != nil {
if !isPreconditionFailed(err) {
err = fmt.Errorf("read: %v", err)
Expand Down Expand Up @@ -239,17 +234,15 @@ func updateTestGroups(ctx context.Context, client testGroupClient, q *config.Tes
if err != nil {
return configGen, nil, err
}
attrs := gcs.Stat(ctx, client, 20, paths...)
attrs := gcs.Stat(ctx, stater, 20, paths...)
updates := make(map[string]time.Time, len(attrs))
now := time.Now()
for i, attrs := range attrs {
name := groups[i].Name
switch {
case attrs.Attrs != nil:
updates[name] = attrs.Attrs.Updated.Add(freq)
generations[name] = attrs.Attrs.Generation
case attrs.Err == storage.ErrObjectNotExist:
updates[name] = now
generations[name] = 0
default:
// no change
Expand All @@ -274,7 +267,7 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics,
var q config.TestGroupQueue

log.Debug("Fetching testgroup metadata state...")
gen, generations, err := updateTestGroups(ctx, client, &q, configPath, gridPrefix, groupNames, freq)
gen, generations, err := updateTestGroups(ctx, client, client, &q, configPath, gridPrefix, groupNames, freq)
if err != nil {
return err
}
Expand Down Expand Up @@ -323,7 +316,7 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics,

go func() {
cond := storage.Conditions{GenerationNotMatch: gen}
client := client.If(&cond, nil)
opener := client.If(&cond, &cond)
ticker := time.NewTicker(time.Minute)
for {
depth, next, when := q.Status()
Expand All @@ -344,10 +337,12 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics,
ticker.Stop()
return
case <-ticker.C:
if gen, _, err := updateTestGroups(ctx, client, &q, configPath, gridPrefix, groupNames, freq); err != nil {
log.WithError(err).Error("Failed to update configuration")
} else {
gen, _, err := updateTestGroups(ctx, opener, client, &q, configPath, gridPrefix, groupNames, freq)
switch {
case err == nil:
cond.GenerationNotMatch = gen
case !isPreconditionFailed(err):
log.WithError(err).Error("Failed to update configuration")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestGCS(t *testing.T) {
// either because the context is canceled or things like client are unset)
ctx, cancel := context.WithCancel(context.Background())
cancel()
updater := GCS(0, 0, 0, false, SortStarted)
updater := GCS(nil, 0, 0, 0, false, SortStarted)
defer func() {
if r := recover(); r != nil {
if !tc.fail {
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestUpdate(t *testing.T) {
client.Lister[buildsPath] = fi
}

groupUpdater := GCS(*tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, SortStarted)
groupUpdater := GCS(client, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, SortStarted)
mets := &Metrics{
Successes: &fakeCounter{},
Errors: &fakeCounter{},
Expand Down
1 change: 1 addition & 0 deletions util/gcs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@com_google_cloud_go_storage//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@org_golang_google_api//googleapi:go_default_library",
"@org_golang_google_api//iterator:go_default_library",
"@org_golang_google_api//option:go_default_library",
],
Expand Down
21 changes: 21 additions & 0 deletions util/gcs/real_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package gcs
import (
"context"
"io"
"net/http"
"strings"

"cloud.google.com/go/storage"
"google.golang.org/api/googleapi"
)

var (
Expand Down Expand Up @@ -65,9 +67,28 @@ func (rgc realGCSClient) Open(ctx context.Context, path Path) (io.ReadCloser, *s
if r == nil {
return nil, nil, err
}
if err == nil && rgc.readCond != nil {
err = checkPreconditions(r.Attrs, rgc.readCond)
}
return r, &r.Attrs, err
}

var (
errPreconditions = googleapi.Error{
Code: http.StatusPreconditionFailed,
}
)

func checkPreconditions(attrs storage.ReaderObjectAttrs, cond *storage.Conditions) error {
if g := cond.GenerationMatch; g > 0 && g != attrs.Generation {
return &errPreconditions
}
if g := cond.GenerationNotMatch; g > 0 && g == attrs.Generation {
return &errPreconditions
}
return nil
}

func (rgc realGCSClient) Objects(ctx context.Context, path Path, delimiter, startOffset string) Iterator {
p := path.Object()
if !strings.HasSuffix(p, "/") {
Expand Down

0 comments on commit d384fbd

Please sign in to comment.