diff --git a/pkg/skaffold/build/gcb/cloud_build.go b/pkg/skaffold/build/gcb/cloud_build.go index 37fab52b249..983c9017b9f 100644 --- a/pkg/skaffold/build/gcb/cloud_build.go +++ b/pkg/skaffold/build/gcb/cloud_build.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "net/http" - "strings" "time" cstorage "cloud.google.com/go/storage" @@ -32,7 +31,6 @@ import ( "google.golang.org/api/cloudbuild/v1" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" - "k8s.io/apimachinery/pkg/util/wait" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants" @@ -144,60 +142,45 @@ func (b *Builder) buildArtifactWithCloudBuild(ctx context.Context, out io.Writer var digest string offset := int64(0) + var buildComplete bool + delay := time.NewTicker(PollDelay) + defer delay.Stop() + buildResult := b.reporter.getStatus(ctx, projectID, remoteID) watch: for { - var cb *cloudbuild.Build - var err error - logrus.Debugf("current offset %d", offset) - backoff := NewStatusBackoff() - if waitErr := wait.Poll(backoff.Duration, RetryTimeout, func() (bool, error) { - backoff.Step() - cb, err = cbclient.Projects.Builds.Get(projectID, remoteID).Do() - if err == nil { - return true, nil + select { + case <-ctx.Done(): + return "", ctx.Err() + case r := <-buildResult: + buildComplete = true + if r.err != nil { + return "", r.err } - if strings.Contains(err.Error(), "Error 429: Quota exceeded for quota metric 'cloudbuild.googleapis.com/get_requests'") { - // if we hit the rate limit, continue to retry - return false, nil - } - return false, err - }); waitErr != nil { - return "", fmt.Errorf("getting build status: %w", waitErr) - } - if err != nil { - return "", fmt.Errorf("getting build status: %w", err) - } - if cb == nil { - return "", errors.New("getting build status") - } - - r, err := b.getLogs(ctx, c, offset, cbBucket, logsObject) - if err != nil { - return "", fmt.Errorf("getting logs: %w", err) - } - if r != nil { - written, err := io.Copy(out, r) + digest, err = b.getDigest(r.status, tag) if err != nil { - return "", fmt.Errorf("copying logs to stdout: %w", err) + return "", fmt.Errorf("getting image id from finished build: %w", err) } - offset += written - r.Close() - } - switch cb.Status { - case StatusQueued, StatusWorking, StatusUnknown: - case StatusSuccess: - digest, err = b.getDigest(cb, tag) + case <-delay.C: + r, err := b.getLogs(ctx, c, offset, cbBucket, logsObject) if err != nil { - return "", fmt.Errorf("getting image id from finished build: %w", err) + return "", fmt.Errorf("getting logs: %w", err) + } + if r != nil { + written, err := io.Copy(out, r) + if err != nil { + return "", fmt.Errorf("copying logs to stdout: %w", err) + } + r.Close() + + offset += written + if written != 0 { + continue watch + } + } + if buildComplete { + break watch } - break watch - case StatusFailure, StatusInternalError, StatusTimeout, StatusCancelled: - return "", fmt.Errorf("cloud build failed: %s", cb.Status) - default: - return "", fmt.Errorf("unknown status: %s", cb.Status) } - - time.Sleep(RetryDelay) } if err := c.Bucket(cbBucket).Object(buildObject).Delete(ctx); err != nil { diff --git a/pkg/skaffold/build/gcb/status.go b/pkg/skaffold/build/gcb/status.go new file mode 100644 index 00000000000..5c1b4273408 --- /dev/null +++ b/pkg/skaffold/build/gcb/status.go @@ -0,0 +1,284 @@ +/* +Copyright 2021 The Skaffold Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gcb + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "google.golang.org/api/cloudbuild/v1" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/gcp" +) + +var ( + // maintain single instance of `statusReporter` per skaffold process + reporter *statusManagerImpl + reporterOnce sync.Once + // maintain single instance of the GCB client per skaffold process + client *cloudbuild.Service + clientOnce sync.Once + + // for testing + newClientFunc = func() (*cloudbuild.Service, error) { + return cloudbuild.NewService(context.Background(), gcp.ClientOptions()...) + } + + statusFunc = func(projectID string, filter string) (*cloudbuild.ListBuildsResponse, error) { + return client.Projects.Builds.List(projectID).Filter(filter).Do() + } +) + +// statusManager provides an interface for getting the status of GCB jobs. +// It reduces the number of requests made to the GCB API by using a single `List` call for +// all concurrently running builds for a project instead of multiple per-job `Get` calls. +// The statuses are polled with an independent exponential backoff strategy per project until success, cancellation or failure. +// The backoff duration for each `projectID` status query is reset with every new build request. +type statusManager interface { + getStatus(ctx context.Context, projectID string, buildID string) <-chan result +} + +// statusManagerImpl implements the `statusManager` interface +type statusManagerImpl struct { + results map[jobID]chan result + resultMutex sync.RWMutex + requests chan jobRequest +} + +// jobID represents a single build job +type jobID struct { + projectID string + buildID string +} + +// jobRequest encapsulates a single build job and it's context. +type jobRequest struct { + jobID + ctx context.Context +} + +// result represents a single build job status +type result struct { + jobID + status *cloudbuild.Build + err error +} + +func getStatusManager() statusManager { + reporterOnce.Do(func() { + reporter = &statusManagerImpl{ + results: make(map[jobID]chan result), + requests: make(chan jobRequest, 1), + } + // start processing all job requests in new goroutine. + go reporter.run() + }) + return reporter +} + +// poller sends the `projectID` on channel `C` with an exponentially increasing period for each project. +type poller struct { + timers map[string]*time.Timer // timers keep the next `Timer` keyed on the projectID + backoffs map[string]*wait.Backoff // backoffs keeps the current `Backoff` keyed on the projectID + reset chan string // reset channel receives any new `projectID` and resets the timer and backoff for that projectID + remove chan string // remove channel receives a `projectID` that has no more running jobs and deletes its timer and backoff + trans chan string // trans channel pushes to channel `C` and sets up the next trigger + C chan string // C channel triggers with the `projectID` +} + +func newPoller() *poller { + p := &poller{ + timers: make(map[string]*time.Timer), + backoffs: make(map[string]*wait.Backoff), + reset: make(chan string), + remove: make(chan string), + trans: make(chan string), + C: make(chan string), + } + + go func() { + for { + select { + case projectID := <-p.remove: + if b, found := p.timers[projectID]; found { + b.Stop() + } + delete(p.timers, projectID) + delete(p.backoffs, projectID) + case projectID := <-p.reset: + if b, found := p.timers[projectID]; found { + b.Stop() + } + p.backoffs[projectID] = NewStatusBackoff() + p.timers[projectID] = time.AfterFunc(p.backoffs[projectID].Step(), func() { + p.trans <- projectID + }) + case projectID := <-p.trans: + go func() { + p.C <- projectID + }() + b, found := p.backoffs[projectID] + if !found { + continue + } + p.timers[projectID] = time.AfterFunc(b.Step(), func() { + p.trans <- projectID + }) + } + } + }() + return p +} + +func (p *poller) resetTimer(projectID string) { + p.reset <- projectID +} + +func (p *poller) removeTimer(projectID string) { + p.remove <- projectID +} + +// run manages populating the `results` channels for each GCB build job status. +// A `poller` is set up to do independent exponential backoff per project until success, cancellation or failure. +// The backoff duration for each `projectID` status query is reset with every new build request. +func (r *statusManagerImpl) run() { + poll := newPoller() + jobsByProjectID := make(map[string]map[jobID]jobRequest) + jobCancelled := make(chan jobRequest) + retryCount := make(map[jobID]int) + for { + select { + case req := <-r.requests: + // setup for each new build status request + if _, found := jobsByProjectID[req.projectID]; !found { + jobsByProjectID[req.projectID] = make(map[jobID]jobRequest) + } + jobsByProjectID[req.projectID][req.jobID] = req + go poll.resetTimer(req.projectID) + go func() { + // setup cancellation for each job + r := req + <-r.ctx.Done() + jobCancelled <- r + }() + case projectID := <-poll.C: + // get status of all active jobs for `projectID` + jobs := jobsByProjectID[projectID] + if len(jobs) == 0 { + poll.removeTimer(projectID) + continue + } + statuses, err := getStatuses(projectID, jobs) + if err != nil { + for id := range jobs { + // if the GCB API is throttling, ignore error and retry `MaxRetryCount` number of times. + if strings.Contains(err.Error(), "Error 429: Quota exceeded for quota metric 'cloudbuild.googleapis.com/get_requests'") { + retryCount[id]++ + if retryCount[id] < MaxRetryCount { + continue + } + } + r.setResult(result{jobID: id, err: err}) + delete(jobs, id) + } + continue + } + for id := range jobs { + cb := statuses[id] + switch cb.Status { + case StatusQueued, StatusWorking, StatusUnknown: + case StatusSuccess: + r.setResult(result{jobID: id, status: cb}) + delete(jobs, id) + case StatusFailure, StatusInternalError, StatusTimeout, StatusCancelled: + r.setResult(result{jobID: id, err: fmt.Errorf("cloud build failed: %s", cb.Status)}) + delete(jobs, id) + default: + r.setResult(result{jobID: id, err: fmt.Errorf("unhandled status: %s", cb.Status)}) + delete(jobs, id) + } + } + case job := <-jobCancelled: + r.setResult(result{jobID: job.jobID, err: job.ctx.Err()}) + delete(jobsByProjectID[job.projectID], job.jobID) + } + } +} + +func getStatuses(projectID string, jobs map[jobID]jobRequest) (map[jobID]*cloudbuild.Build, error) { + var err error + clientOnce.Do(func() { + client, err = newClientFunc() + }) + if err != nil { + clientOnce = sync.Once{} // reset on failure + return nil, fmt.Errorf("getting cloudbuild client: %w", err) + } + cb, err := statusFunc(projectID, getFilterQuery(jobs)) + if err != nil { + return nil, fmt.Errorf("getting build status: %w", err) + } + if cb == nil { + return nil, errors.New("getting build status") + } + + m := make(map[jobID]*cloudbuild.Build) + for _, job := range jobs { + found := false + for _, b := range cb.Builds { + if b.Id != job.buildID { + continue + } + found = true + m[job.jobID] = b + break + } + if !found { + return nil, errors.New("getting build status") + } + } + return m, nil +} + +func getFilterQuery(jobs map[jobID]jobRequest) string { + var sl []string + for job := range jobs { + sl = append(sl, fmt.Sprintf("build_id=%s", job.buildID)) + } + return strings.Join(sl, " OR ") +} + +func (r *statusManagerImpl) setResult(result result) { + r.resultMutex.RLock() + r.results[result.jobID] <- result + r.resultMutex.RUnlock() +} + +func (r *statusManagerImpl) getStatus(ctx context.Context, projectID string, buildID string) <-chan result { + id := jobID{projectID, buildID} + res := make(chan result, 1) + r.resultMutex.Lock() + r.results[id] = res + r.resultMutex.Unlock() + r.requests <- jobRequest{id, ctx} + return res +} diff --git a/pkg/skaffold/build/gcb/types.go b/pkg/skaffold/build/gcb/types.go index 1f7387d62c3..281d3b6410a 100644 --- a/pkg/skaffold/build/gcb/types.go +++ b/pkg/skaffold/build/gcb/types.go @@ -55,8 +55,8 @@ const ( // StatusCancelled "CANCELLED" - Build was canceled by a user. StatusCancelled = "CANCELLED" - // RetryDelay is the time to wait in between polling the status of the cloud build - RetryDelay = 1 * time.Second + // PollDelay is the time to wait in between writing logs of the cloud build + PollDelay = 1 * time.Second // BackoffFactor is the exponent for exponential backoff during build status polling BackoffFactor = 1.5 @@ -64,16 +64,16 @@ const ( // BackoffSteps is the number of times we increase the backoff time during exponential backoff BackoffSteps = 10 - // RetryTimeout is the max amount of time to retry getting the status of the build before erroring - RetryTimeout = 3 * time.Minute + // MaxRetryCount is the max number of times we retry a throttled GCB API request + MaxRetryCount = 10 ) func NewStatusBackoff() *wait.Backoff { return &wait.Backoff{ - Duration: RetryDelay, + Duration: PollDelay, Factor: float64(BackoffFactor), Steps: BackoffSteps, - Cap: 60 * time.Second, + Cap: 10 * time.Second, } } @@ -86,6 +86,7 @@ type Builder struct { muted build.Muted artifactStore build.ArtifactStore sourceDependencies graph.SourceDependenciesCache + reporter statusManager } type Config interface { @@ -110,6 +111,7 @@ func NewBuilder(bCtx BuilderContext, buildCfg *latestV1.GoogleCloudBuild) *Build muted: bCtx.Muted(), artifactStore: bCtx.ArtifactStore(), sourceDependencies: bCtx.SourceDependenciesResolver(), + reporter: getStatusManager(), } }