Skip to content

Commit

Permalink
Exit the for-loop after 5 gRPC communication errors (#218)
Browse files Browse the repository at this point in the history
* Exit the for-loop after 5 gRPC communication errors

* Reset counter and ignore unknown errors

* Return an error if we exit the for-loop

* Search for the error message we care to stop the for-loop
  • Loading branch information
shackra authored Sep 3, 2024
1 parent bba2ba8 commit 4a45c87
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion pkg/connectorrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"path/filepath"
"strings"
"time"

"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -128,8 +129,9 @@ func (c *connectorRunner) run(ctx context.Context) error {

waitDuration := time.Second * 0
errCount := 0
stopForLoop := false
var err error
for {
for !stopForLoop {
select {
case <-ctx.Done():
return c.handleContextCancel(ctx)
Expand Down Expand Up @@ -194,14 +196,24 @@ func (c *connectorRunner) run(ctx context.Context) error {
defer sem.Release(1)
err := c.processTask(ctx, t)
if err != nil {
if strings.Contains(err.Error(), "grpc: the client connection is closing") {
stopForLoop = true
}
l.Error("runner: error processing task", zap.Error(err), zap.String("task_id", t.Id), zap.String("task_type", tasks.GetType(t).String()))
return
}
l.Debug("runner: task processed", zap.String("task_id", t.Id), zap.String("task_type", tasks.GetType(t).String()))
}(nextTask)

l.Debug("runner: dispatched task, waiting for next task", zap.Duration("wait_duration", waitDuration))
}
}

if stopForLoop {
return fmt.Errorf("Unable to communicate with gRPC server")
}

return nil
}

func (c *connectorRunner) Close(ctx context.Context) error {
Expand Down

0 comments on commit 4a45c87

Please sign in to comment.