Skip to content

Commit

Permalink
kafka(ticdc): fix sarama admin client reset by error. (#9481) (#9507)
Browse files Browse the repository at this point in the history
close #9480
  • Loading branch information
ti-chi-bot authored Aug 14, 2023
1 parent fe96b21 commit db5fb4d
Showing 1 changed file with 3 additions and 10 deletions.
13 changes: 3 additions & 10 deletions pkg/sink/kafka/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,10 @@ func (a *saramaAdminClient) queryClusterWithRetry(ctx context.Context, query fun
zap.String("changefeed", a.changefeed.ID),
zap.Error(err))

if !errors.Is(err, syscall.EPIPE) {
return err
}
if !errors.Is(err, net.ErrClosed) {
return err
if errors.Is(err, syscall.EPIPE) || errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return a.reset()
}
if !errors.Is(err, io.EOF) {
return err
}

return a.reset()
return err
}, retry.WithBackoffBaseDelay(defaultRetryBackoff), retry.WithMaxTries(defaultRetryMaxTries))
return err
}
Expand Down

0 comments on commit db5fb4d

Please sign in to comment.