From a7ba1b49e262ff2049575bfbe57ca394826ae657 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Fri, 14 Jun 2024 11:37:53 +0200 Subject: [PATCH 1/3] remove unused var --- backend/activity.go | 2 +- backend/orchestration.go | 2 +- backend/worker.go | 4 +--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/backend/activity.go b/backend/activity.go index bf53871..8f3d8df 100644 --- a/backend/activity.go +++ b/backend/activity.go @@ -23,7 +23,7 @@ type ActivityExecutor interface { func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker { processor := newActivityProcessor(be, executor) - return NewTaskWorker(be, processor, logger, opts...) + return NewTaskWorker(processor, logger, opts...) } func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor { diff --git a/backend/orchestration.go b/backend/orchestration.go index 3892569..c416da2 100644 --- a/backend/orchestration.go +++ b/backend/orchestration.go @@ -35,7 +35,7 @@ func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Lo executor: executor, logger: logger, } - return NewTaskWorker(be, processor, logger, opts...) + return NewTaskWorker(processor, logger, opts...) } // Name implements TaskProcessor diff --git a/backend/worker.go b/backend/worker.go index 9cb6f8c..c11bb4e 100644 --- a/backend/worker.go +++ b/backend/worker.go @@ -32,7 +32,6 @@ type TaskProcessor interface { } type worker struct { - backend Backend options *WorkerOptions logger Logger // dispatchSemaphore is for throttling orchestration concurrency. @@ -66,13 +65,12 @@ func WithMaxParallelism(n int32) NewTaskWorkerOptions { } } -func NewTaskWorker(be Backend, p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker { +func NewTaskWorker(p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker { options := &WorkerOptions{MaxParallelWorkItems: 1} for _, configure := range opts { configure(options) } return &worker{ - backend: be, processor: p, logger: logger, dispatchSemaphore: semaphore.New(int(options.MaxParallelWorkItems)), From 746e212ee01b2d1f17910554ed88436ac9933bd1 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Fri, 14 Jun 2024 11:38:57 +0200 Subject: [PATCH 2/3] add infinite retries to worker GetWorkItems stream connection --- client/worker_grpc.go | 104 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 13 deletions(-) diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 6c5ed70..6532e8d 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -6,26 +6,45 @@ import ( "io" "time" + "github.com/cenkalti/backoff/v4" "github.com/microsoft/durabletask-go/api" "github.com/microsoft/durabletask-go/backend" "github.com/microsoft/durabletask-go/internal/helpers" "github.com/microsoft/durabletask-go/internal/protos" "github.com/microsoft/durabletask-go/task" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/wrapperspb" ) +type workItemsStream interface { + Recv() (*protos.WorkItem, error) +} + func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.TaskRegistry) error { executor := task.NewTaskExecutor(r) - if _, err := c.client.Hello(ctx, &emptypb.Empty{}); err != nil { - return fmt.Errorf("failed to connect to task hub service: %w", err) + var stream workItemsStream + + initStream := func() error { + _, err := c.client.Hello(ctx, &emptypb.Empty{}) + if err != nil { + return fmt.Errorf("failed to connect to task hub service: %w", err) + } + + req := protos.GetWorkItemsRequest{} + stream, err = c.client.GetWorkItems(ctx, &req) + if err != nil { + return fmt.Errorf("failed to get work item stream: %w", err) + } + return nil } - req := protos.GetWorkItemsRequest{} - stream, err := c.client.GetWorkItems(ctx, &req) + c.logger.Infof("connecting work item listener stream") + err := initStream() if err != nil { - return fmt.Errorf("failed to get work item stream: %w", err) + return err } go func() { @@ -33,19 +52,68 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T for { // TODO: Manage concurrency workItem, err := stream.Recv() - if err == io.EOF || stream.Context().Err() != nil { - // shutdown - break - } else if err != nil { - c.logger.Warnf("failed to establish work item stream: %v", err) - time.Sleep(5 * time.Second) + + if err != nil { + // user wants to stop the listener + if ctx.Err() != nil { + c.logger.Infof("stopping background processor: %v", err) + return + } + + retriable := false + + c.logger.Errorf("background processor received stream error: %v", err) + + if err == io.EOF { + retriable = true + } else if grpcStatus, ok := status.FromError(err); ok { + c.logger.Warnf("received grpc error code %v", grpcStatus.Code().String()) + switch grpcStatus.Code() { + case codes.Unavailable: + fallthrough + case codes.Canceled: + fallthrough + default: + retriable = true + } + } + + if !retriable { + c.logger.Infof("stopping background processor, non retriable error: %v", err) + return + } + + err = backoff.Retry( + func() error { + // user wants to stop the listener + if ctx.Err() != nil { + return backoff.Permanent(ctx.Err()) + } + + c.logger.Infof("reconnecting work item listener stream") + streamErr := initStream() + if streamErr != nil { + c.logger.Errorf("error initializing work item listener stream %v", streamErr) + return streamErr + } + return nil + }, + // retry forever since we don't have a way of asynchronously return errors to the user + newInfiniteRetries(), + ) + if err != nil { + c.logger.Infof("stopping background processor, unable to reconnect stream: %v", err) + return + } + c.logger.Infof("successfully reconnected work item listener stream...") + // continue iterating continue } if orchReq := workItem.GetOrchestratorRequest(); orchReq != nil { - go c.processOrchestrationWorkItem(stream.Context(), executor, orchReq) + go c.processOrchestrationWorkItem(ctx, executor, orchReq) } else if actReq := workItem.GetActivityRequest(); actReq != nil { - go c.processActivityWorkItem(stream.Context(), executor, actReq) + go c.processActivityWorkItem(ctx, executor, actReq) } else { c.logger.Warnf("received unknown work item type: %v", workItem) } @@ -125,3 +193,13 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( } } } + +func newInfiniteRetries() *backoff.ExponentialBackOff { + b := backoff.NewExponentialBackOff() + // max wait of 15 seconds between retries + b.MaxInterval = 15 * time.Second + // retry forever + b.MaxElapsedTime = 0 + b.Reset() + return b +} From 16aeb01f4c44b1d99b2229aed0f7082e80692b8f Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Fri, 21 Jun 2024 10:08:08 +0900 Subject: [PATCH 3/3] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c74b48a..0a54005 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Bump google.golang.org/grpc from 1.53.0 to 1.56.3 ([#39](https://github.com/microsoft/durabletask-go/pull/39)) - Updated durabletask-protobuf submodule to [`4207e1d`](https://github.com/microsoft/durabletask-protobuf/commit/4207e1dbd14cedc268f69c3befee60fcaad19367) +- Add retries to GetWorkItems stream connection ([#72](https://github.com/microsoft/durabletask-go/pull/72)) - by [@famarting](https://github.com/famarting) ## [v0.4.0] - 2023-12-18