From 4a45c87869875ffbf6aafa9bd53e8c55749318a4 Mon Sep 17 00:00:00 2001 From: Jorge Araya Navarro Date: Tue, 3 Sep 2024 10:25:39 -0600 Subject: [PATCH] Exit the for-loop after 5 gRPC communication errors (#218) * Exit the for-loop after 5 gRPC communication errors * Reset counter and ignore unknown errors * Return an error if we exit the for-loop * Search for the error message we care to stop the for-loop --- pkg/connectorrunner/runner.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/connectorrunner/runner.go b/pkg/connectorrunner/runner.go index ded338af..f57889fd 100644 --- a/pkg/connectorrunner/runner.go +++ b/pkg/connectorrunner/runner.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "path/filepath" + "strings" "time" "golang.org/x/sync/semaphore" @@ -128,8 +129,9 @@ func (c *connectorRunner) run(ctx context.Context) error { waitDuration := time.Second * 0 errCount := 0 + stopForLoop := false var err error - for { + for !stopForLoop { select { case <-ctx.Done(): return c.handleContextCancel(ctx) @@ -194,7 +196,11 @@ func (c *connectorRunner) run(ctx context.Context) error { defer sem.Release(1) err := c.processTask(ctx, t) if err != nil { + if strings.Contains(err.Error(), "grpc: the client connection is closing") { + stopForLoop = true + } l.Error("runner: error processing task", zap.Error(err), zap.String("task_id", t.Id), zap.String("task_type", tasks.GetType(t).String())) + return } l.Debug("runner: task processed", zap.String("task_id", t.Id), zap.String("task_type", tasks.GetType(t).String())) }(nextTask) @@ -202,6 +208,12 @@ func (c *connectorRunner) run(ctx context.Context) error { l.Debug("runner: dispatched task, waiting for next task", zap.Duration("wait_duration", waitDuration)) } } + + if stopForLoop { + return fmt.Errorf("Unable to communicate with gRPC server") + } + + return nil } func (c *connectorRunner) Close(ctx context.Context) error {