Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] use list api to query multiple Cloud Build statuses together #6005

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 31 additions & 48 deletions pkg/skaffold/build/gcb/cloud_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"io"
"net/http"
"strings"
"time"

cstorage "cloud.google.com/go/storage"
Expand All @@ -32,7 +31,6 @@ import (
"google.golang.org/api/cloudbuild/v1"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
Expand Down Expand Up @@ -144,60 +142,45 @@ func (b *Builder) buildArtifactWithCloudBuild(ctx context.Context, out io.Writer

var digest string
offset := int64(0)
var buildComplete bool
delay := time.NewTicker(PollDelay)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
delay := time.NewTicker(PollDelay)
delay := time.NewTicker(PollDelay) // fixed schedule for log polling

defer delay.Stop()
buildResult := b.reporter.getStatus(ctx, projectID, remoteID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT of s/getStatus/trackStatus/ and s/reporter/monitor/?

  • getStatus implies that we've wait until we've gotten the result
  • this reporter is monitoring for build status, and doesn't itself report.

watch:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loop labels can be hard to reason about, especially when used with both goto and break, and this loop relies on the buildResult channel not being closed (which seems unusual?).

Can we instead track the number of logging bytes last received, and turn the for {} into for !buildComplete || lastLog > 0 {}.

for {
var cb *cloudbuild.Build
var err error
logrus.Debugf("current offset %d", offset)
backoff := NewStatusBackoff()
if waitErr := wait.Poll(backoff.Duration, RetryTimeout, func() (bool, error) {
backoff.Step()
cb, err = cbclient.Projects.Builds.Get(projectID, remoteID).Do()
if err == nil {
return true, nil
select {
case <-ctx.Done():
return "", ctx.Err()
case r := <-buildResult:
buildComplete = true
if r.err != nil {
return "", r.err
}
if strings.Contains(err.Error(), "Error 429: Quota exceeded for quota metric 'cloudbuild.googleapis.com/get_requests'") {
// if we hit the rate limit, continue to retry
return false, nil
}
return false, err
}); waitErr != nil {
return "", fmt.Errorf("getting build status: %w", waitErr)
}
if err != nil {
return "", fmt.Errorf("getting build status: %w", err)
}
if cb == nil {
return "", errors.New("getting build status")
}

r, err := b.getLogs(ctx, c, offset, cbBucket, logsObject)
if err != nil {
return "", fmt.Errorf("getting logs: %w", err)
}
if r != nil {
written, err := io.Copy(out, r)
digest, err = b.getDigest(r.status, tag)
if err != nil {
return "", fmt.Errorf("copying logs to stdout: %w", err)
return "", fmt.Errorf("getting image id from finished build: %w", err)
}
offset += written
r.Close()
}
switch cb.Status {
case StatusQueued, StatusWorking, StatusUnknown:
case StatusSuccess:
digest, err = b.getDigest(cb, tag)
case <-delay.C:
r, err := b.getLogs(ctx, c, offset, cbBucket, logsObject)
if err != nil {
return "", fmt.Errorf("getting image id from finished build: %w", err)
return "", fmt.Errorf("getting logs: %w", err)
}
if r != nil {
written, err := io.Copy(out, r)
if err != nil {
return "", fmt.Errorf("copying logs to stdout: %w", err)
}
r.Close()

offset += written
if written != 0 {
continue watch
}
}
if buildComplete {
break watch
}
break watch
case StatusFailure, StatusInternalError, StatusTimeout, StatusCancelled:
return "", fmt.Errorf("cloud build failed: %s", cb.Status)
default:
return "", fmt.Errorf("unknown status: %s", cb.Status)
}

time.Sleep(RetryDelay)
}

if err := c.Bucket(cbBucket).Object(buildObject).Delete(ctx); err != nil {
Expand Down
284 changes: 284 additions & 0 deletions pkg/skaffold/build/gcb/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
/*
Copyright 2021 The Skaffold Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gcb

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"google.golang.org/api/cloudbuild/v1"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/gcp"
)

var (
// maintain single instance of `statusReporter` per skaffold process
reporter *statusManagerImpl
reporterOnce sync.Once
// maintain single instance of the GCB client per skaffold process
client *cloudbuild.Service
clientOnce sync.Once
Comment on lines +37 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Singletons are bad — they're a global dependency and breaks modularity. We should strive to avoid them (look at the kubeContext/kubeConfig mess). At the very least, have this instance be managed by the statusManagerImpl.

You should be able to write tests for the statusManagerImpl and have different backoff values (nobody wants the test to wait a minute to verify timeout, for example).


// for testing
newClientFunc = func() (*cloudbuild.Service, error) {
return cloudbuild.NewService(context.Background(), gcp.ClientOptions()...)
}

statusFunc = func(projectID string, filter string) (*cloudbuild.ListBuildsResponse, error) {
return client.Projects.Builds.List(projectID).Filter(filter).Do()
}
)

// statusManager provides an interface for getting the status of GCB jobs.
// It reduces the number of requests made to the GCB API by using a single `List` call for
// all concurrently running builds for a project instead of multiple per-job `Get` calls.
// The statuses are polled with an independent exponential backoff strategy per project until success, cancellation or failure.
// The backoff duration for each `projectID` status query is reset with every new build request.
type statusManager interface {
getStatus(ctx context.Context, projectID string, buildID string) <-chan result
}

// statusManagerImpl implements the `statusManager` interface
type statusManagerImpl struct {
results map[jobID]chan result
resultMutex sync.RWMutex
requests chan jobRequest
}

// jobID represents a single build job
type jobID struct {
projectID string
buildID string
}
Comment on lines +67 to +71
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT of creating types for buildID and projectID for use in the signatures and get some type checking into place:

type projectID string
type buildID string

I think the poller would benefit.


// jobRequest encapsulates a single build job and it's context.
type jobRequest struct {
jobID
ctx context.Context
}

// result represents a single build job status
type result struct {
jobID
status *cloudbuild.Build
err error
}

func getStatusManager() statusManager {
reporterOnce.Do(func() {
reporter = &statusManagerImpl{
results: make(map[jobID]chan result),
requests: make(chan jobRequest, 1),
}
// start processing all job requests in new goroutine.
go reporter.run()
})
return reporter
}

// poller sends the `projectID` on channel `C` with an exponentially increasing period for each project.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about breaking poller out to a new file. It should have a stop() method.

Though it feels like you could simplify this and avoid the goroutine and internal channels with a sync.Map

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm not sure that such a "simplification" would actually reduce the complexity though.)

type poller struct {
timers map[string]*time.Timer // timers keep the next `Timer` keyed on the projectID
backoffs map[string]*wait.Backoff // backoffs keeps the current `Backoff` keyed on the projectID
reset chan string // reset channel receives any new `projectID` and resets the timer and backoff for that projectID
remove chan string // remove channel receives a `projectID` that has no more running jobs and deletes its timer and backoff
trans chan string // trans channel pushes to channel `C` and sets up the next trigger
C chan string // C channel triggers with the `projectID`
}

func newPoller() *poller {
p := &poller{
timers: make(map[string]*time.Timer),
backoffs: make(map[string]*wait.Backoff),
reset: make(chan string),
remove: make(chan string),
trans: make(chan string),
C: make(chan string),
}

go func() {
for {
select {
case projectID := <-p.remove:
if b, found := p.timers[projectID]; found {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b is a non-obvious name. t?

b.Stop()
}
delete(p.timers, projectID)
delete(p.backoffs, projectID)
case projectID := <-p.reset:
if b, found := p.timers[projectID]; found {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/b/t/

b.Stop()
}
p.backoffs[projectID] = NewStatusBackoff()
p.timers[projectID] = time.AfterFunc(p.backoffs[projectID].Step(), func() {
p.trans <- projectID
})
case projectID := <-p.trans:
go func() {
p.C <- projectID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we already blocked on the projectID := <-p.trans call in the case statement, does this need to be in a go func()?

}()
b, found := p.backoffs[projectID]
if !found {
continue
}
p.timers[projectID] = time.AfterFunc(b.Step(), func() {
p.trans <- projectID
})
}
}
}()
return p
}

func (p *poller) resetTimer(projectID string) {
p.reset <- projectID
}

func (p *poller) removeTimer(projectID string) {
p.remove <- projectID
}

// run manages populating the `results` channels for each GCB build job status.
// A `poller` is set up to do independent exponential backoff per project until success, cancellation or failure.
// The backoff duration for each `projectID` status query is reset with every new build request.
func (r *statusManagerImpl) run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function could use maybe a two to three sentence high-level description of what it does. there's a lot going on here :)

something like

run() manages the status of all GCB builds started by Skaffold, by using the `list` API to retrieve all jobs from a build ID. for each project ID encountered, a `poller` is set up to repeatedly poll and report status of all jobs, with a backoff mechanism in place to handle throttling from the API server.... etc.

poll := newPoller()
jobsByProjectID := make(map[string]map[jobID]jobRequest)
jobCancelled := make(chan jobRequest)
retryCount := make(map[jobID]int)
for {
select {
case req := <-r.requests:
// setup for each new build status request
if _, found := jobsByProjectID[req.projectID]; !found {
jobsByProjectID[req.projectID] = make(map[jobID]jobRequest)
}
jobsByProjectID[req.projectID][req.jobID] = req
go poll.resetTimer(req.projectID)
go func() {
// setup cancellation for each job
r := req
<-r.ctx.Done()
jobCancelled <- r
}()
case projectID := <-poll.C:
// get status of all active jobs for `projectID`
jobs := jobsByProjectID[projectID]
if len(jobs) == 0 {
poll.removeTimer(projectID)
continue
}
statuses, err := getStatuses(projectID, jobs)
if err != nil {
for id := range jobs {
// if the GCB API is throttling, ignore error and retry `MaxRetryCount` number of times.
if strings.Contains(err.Error(), "Error 429: Quota exceeded for quota metric 'cloudbuild.googleapis.com/get_requests'") {
retryCount[id]++
if retryCount[id] < MaxRetryCount {
continue
}
}
r.setResult(result{jobID: id, err: err})
delete(jobs, id)
}
continue
}
for id := range jobs {
cb := statuses[id]
switch cb.Status {
case StatusQueued, StatusWorking, StatusUnknown:
case StatusSuccess:
r.setResult(result{jobID: id, status: cb})
delete(jobs, id)
case StatusFailure, StatusInternalError, StatusTimeout, StatusCancelled:
r.setResult(result{jobID: id, err: fmt.Errorf("cloud build failed: %s", cb.Status)})
delete(jobs, id)
default:
r.setResult(result{jobID: id, err: fmt.Errorf("unhandled status: %s", cb.Status)})
delete(jobs, id)
}
}
case job := <-jobCancelled:
r.setResult(result{jobID: job.jobID, err: job.ctx.Err()})
delete(jobsByProjectID[job.projectID], job.jobID)
}
}
}

func getStatuses(projectID string, jobs map[jobID]jobRequest) (map[jobID]*cloudbuild.Build, error) {
var err error
clientOnce.Do(func() {
client, err = newClientFunc()
})
if err != nil {
clientOnce = sync.Once{} // reset on failure
return nil, fmt.Errorf("getting cloudbuild client: %w", err)
}
cb, err := statusFunc(projectID, getFilterQuery(jobs))
if err != nil {
return nil, fmt.Errorf("getting build status: %w", err)
}
if cb == nil {
return nil, errors.New("getting build status")
}

m := make(map[jobID]*cloudbuild.Build)
for _, job := range jobs {
found := false
for _, b := range cb.Builds {
if b.Id != job.buildID {
continue
}
found = true
m[job.jobID] = b
break
}
if !found {
return nil, errors.New("getting build status")
}
}
return m, nil
}

func getFilterQuery(jobs map[jobID]jobRequest) string {
var sl []string
for job := range jobs {
sl = append(sl, fmt.Sprintf("build_id=%s", job.buildID))
}
return strings.Join(sl, " OR ")
}

func (r *statusManagerImpl) setResult(result result) {
r.resultMutex.RLock()
r.results[result.jobID] <- result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we delete the result from r.results? If so, add a test case to ensure we don't see .results grow without end. If not, add a comment.

r.resultMutex.RUnlock()
}

func (r *statusManagerImpl) getStatus(ctx context.Context, projectID string, buildID string) <-chan result {
id := jobID{projectID, buildID}
res := make(chan result, 1)
r.resultMutex.Lock()
r.results[id] = res
r.resultMutex.Unlock()
r.requests <- jobRequest{id, ctx}
return res
}
Loading