Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve agent rpc retry logic with exponential backoff #2205

Merged
merged 2 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 62 additions & 11 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -30,8 +31,6 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
)

var backoff = time.Second

// set grpc version on compile time to compare against server version response
const ClientGrpcVersion int32 = proto.Version

Expand All @@ -52,6 +51,13 @@ func (c *client) Close() error {
return c.conn.Close()
}

func (c *client) newBackOff() backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.MaxInterval = 10 * time.Second
b.InitialInterval = 10 * time.Millisecond
return b
}

// Version returns the server- & grpc-version
func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
res, err := c.client.Version(ctx, &proto.Empty{})
Expand All @@ -68,6 +74,7 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) {
var res *proto.NextResponse
var err error
retry := c.newBackOff()
req := new(proto.NextRequest)
req.Filter = new(proto.Filter)
req.Filter.Labels = f.Labels
Expand Down Expand Up @@ -96,10 +103,12 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error)
default:
return nil, err
}
if ctx.Err() != nil {

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return nil, ctx.Err()
}
<-time.After(backoff)
}

if res.GetPipeline() == nil {
Expand All @@ -118,6 +127,7 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error)

// Wait blocks until the pipeline is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) {
retry := c.newBackOff()
req := new(proto.WaitRequest)
req.Id = id
for {
Expand All @@ -139,13 +149,19 @@ func (c *client) Wait(ctx context.Context, id string) (err error) {
default:
return err
}
<-time.After(backoff)

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

// Init signals the pipeline is initialized.
func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.InitRequest)
req.Id = id
req.State = new(proto.State)
Expand Down Expand Up @@ -174,13 +190,19 @@ func (c *client) Init(ctx context.Context, id string, state rpc.State) (err erro
default:
return err
}
<-time.After(backoff)

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

// Done signals the pipeline is complete.
func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.DoneRequest)
req.Id = id
req.State = new(proto.State)
Expand Down Expand Up @@ -209,13 +231,19 @@ func (c *client) Done(ctx context.Context, id string, state rpc.State) (err erro
default:
return err
}
<-time.After(backoff)

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

// Extend extends the pipeline deadline
func (c *client) Extend(ctx context.Context, id string) (err error) {
retry := c.newBackOff()
req := new(proto.ExtendRequest)
req.Id = id
for {
Expand All @@ -237,13 +265,19 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
default:
return err
}
<-time.After(backoff)

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

// Update updates the pipeline state.
func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
req.Id = id
req.State = new(proto.State)
Expand Down Expand Up @@ -272,13 +306,19 @@ func (c *client) Update(ctx context.Context, id string, state rpc.State) (err er
default:
return err
}
<-time.After(backoff)

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

// Log writes the pipeline log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff()
req := new(proto.LogRequest)
req.LogEntry = new(proto.LogEntry)
req.LogEntry.StepUuid = logEntry.StepUUID
Expand All @@ -305,7 +345,12 @@ func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
default:
return err
}
<-time.After(backoff)

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
Expand All @@ -322,6 +367,7 @@ func (c *client) RegisterAgent(ctx context.Context, platform, backend, version s
}

func (c *client) ReportHealth(ctx context.Context) (err error) {
retry := c.newBackOff()
req := new(proto.ReportHealthRequest)
req.Status = "I am alive!"

Expand All @@ -341,6 +387,11 @@ func (c *client) ReportHealth(ctx context.Context) (err error) {
default:
return err
}
<-time.After(backoff)

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/caddyserver/certmagic v0.17.2 h1:o30seC1T/dBqBCNNGNHWwj2i5/I/FMjBbTAhjADP3nE=
github.com/caddyserver/certmagic v0.17.2/go.mod h1:ouWUuC490GOLJzkyN35eXfV8bSbwMwSf4bdhkIxtdQE=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down