Skip to content

Commit

Permalink
puller: fix retry logic when check store version failed (#11903) (#11929
Browse files Browse the repository at this point in the history
)

close #11766
  • Loading branch information
ti-chi-bot authored Jan 19, 2025
1 parent 6e0c788 commit 5fda913
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var (
metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest")
metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown")
metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable")
metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr")
metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore")
metricConnectToStoreErr = eventFeedErrorCounter.WithLabelValues("ConnectToStore")
)
Expand Down Expand Up @@ -685,8 +686,17 @@ func (s *eventFeedSession) requestRegionToStore(
time.Sleep(delay)
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
errInfo := newRegionErrorInfo(sri, &connectToStoreErr{})
var regionErr error
var scheduleReload bool
if cerror.Is(err, cerror.ErrGetAllStoresFailed) {
regionErr = &getStoreErr{}
scheduleReload = true
} else {
regionErr = &connectToStoreErr{}
scheduleReload = regionScheduleReload
}
s.client.regionCache.OnSendFail(bo, rpcCtx, scheduleReload, err)
errInfo := newRegionErrorInfo(sri, regionErr)
s.onRegionFail(ctx, errInfo)
continue
}
Expand Down Expand Up @@ -961,6 +971,10 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
metricConnectToStoreErr.Inc()
case *sendRequestToStoreErr:
metricStoreSendRequestErr.Inc()
case *getStoreErr:
metricGetStoreErr.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
default:
//[TODO] Move all OnSendFail logic here
// We expect some unknown error to trigger RegionCache recheck its store state and change leader to peer to
Expand Down Expand Up @@ -1514,3 +1528,7 @@ func (e *connectToStoreErr) Error() string { return "connect to store error" }
type sendRequestToStoreErr struct{}

func (e *sendRequestToStoreErr) Error() string { return "send request to store error" }

type getStoreErr struct{}

func (e *getStoreErr) Error() string { return "get store error" }

0 comments on commit 5fda913

Please sign in to comment.