Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retries to GetWorkItems stream connection #72

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ActivityExecutor interface {

func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
processor := newActivityProcessor(be, executor)
return NewTaskWorker(be, processor, logger, opts...)
return NewTaskWorker(processor, logger, opts...)
}

func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor {
Expand Down
2 changes: 1 addition & 1 deletion backend/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Lo
executor: executor,
logger: logger,
}
return NewTaskWorker(be, processor, logger, opts...)
return NewTaskWorker(processor, logger, opts...)
}

// Name implements TaskProcessor
Expand Down
4 changes: 1 addition & 3 deletions backend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type TaskProcessor interface {
}

type worker struct {
backend Backend
options *WorkerOptions
logger Logger
// dispatchSemaphore is for throttling orchestration concurrency.
Expand Down Expand Up @@ -66,13 +65,12 @@ func WithMaxParallelism(n int32) NewTaskWorkerOptions {
}
}

func NewTaskWorker(be Backend, p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
func NewTaskWorker(p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
options := &WorkerOptions{MaxParallelWorkItems: 1}
for _, configure := range opts {
configure(options)
}
return &worker{
backend: be,
processor: p,
logger: logger,
dispatchSemaphore: semaphore.New(int(options.MaxParallelWorkItems)),
Expand Down
104 changes: 91 additions & 13 deletions client/worker_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,114 @@ import (
"io"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/internal/helpers"
"github.com/microsoft/durabletask-go/internal/protos"
"github.com/microsoft/durabletask-go/task"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type workItemsStream interface {
Recv() (*protos.WorkItem, error)
}

func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.TaskRegistry) error {
executor := task.NewTaskExecutor(r)

if _, err := c.client.Hello(ctx, &emptypb.Empty{}); err != nil {
return fmt.Errorf("failed to connect to task hub service: %w", err)
var stream workItemsStream

initStream := func() error {
_, err := c.client.Hello(ctx, &emptypb.Empty{})
if err != nil {
return fmt.Errorf("failed to connect to task hub service: %w", err)
}

req := protos.GetWorkItemsRequest{}
stream, err = c.client.GetWorkItems(ctx, &req)
if err != nil {
return fmt.Errorf("failed to get work item stream: %w", err)
}
return nil
}

req := protos.GetWorkItemsRequest{}
stream, err := c.client.GetWorkItems(ctx, &req)
c.logger.Infof("connecting work item listener stream")
err := initStream()
if err != nil {
return fmt.Errorf("failed to get work item stream: %w", err)
return err
}

go func() {
c.logger.Info("starting background processor")
for {
// TODO: Manage concurrency
workItem, err := stream.Recv()
if err == io.EOF || stream.Context().Err() != nil {
// shutdown
break
} else if err != nil {
c.logger.Warnf("failed to establish work item stream: %v", err)
time.Sleep(5 * time.Second)

if err != nil {
// user wants to stop the listener
if ctx.Err() != nil {
c.logger.Infof("stopping background processor: %v", err)
return
}

retriable := false

c.logger.Errorf("background processor received stream error: %v", err)

if err == io.EOF {
retriable = true
} else if grpcStatus, ok := status.FromError(err); ok {
c.logger.Warnf("received grpc error code %v", grpcStatus.Code().String())
switch grpcStatus.Code() {
case codes.Unavailable:
fallthrough
case codes.Canceled:
fallthrough
default:
retriable = true
}
}

if !retriable {
c.logger.Infof("stopping background processor, non retriable error: %v", err)
return
}

err = backoff.Retry(
func() error {
// user wants to stop the listener
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
}

c.logger.Infof("reconnecting work item listener stream")
streamErr := initStream()
if streamErr != nil {
c.logger.Errorf("error initializing work item listener stream %v", streamErr)
return streamErr
}
return nil
},
// retry forever since we don't have a way of asynchronously return errors to the user
newInfiniteRetries(),
)
if err != nil {
c.logger.Infof("stopping background processor, unable to reconnect stream: %v", err)
return
}
c.logger.Infof("successfully reconnected work item listener stream...")
// continue iterating
continue
}

if orchReq := workItem.GetOrchestratorRequest(); orchReq != nil {
go c.processOrchestrationWorkItem(stream.Context(), executor, orchReq)
go c.processOrchestrationWorkItem(ctx, executor, orchReq)
} else if actReq := workItem.GetActivityRequest(); actReq != nil {
go c.processActivityWorkItem(stream.Context(), executor, actReq)
go c.processActivityWorkItem(ctx, executor, actReq)
} else {
c.logger.Warnf("received unknown work item type: %v", workItem)
}
Expand Down Expand Up @@ -125,3 +193,13 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
}
}
}

func newInfiniteRetries() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
// max wait of 15 seconds between retries
b.MaxInterval = 15 * time.Second
// retry forever
b.MaxElapsedTime = 0
b.Reset()
return b
}
Loading