Skip to content

Commit

Permalink
storage: filter Raft heartbeats in unreliableRaftHandler
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
nvanbenschoten committed Apr 24, 2019
1 parent 84e3255 commit af48c34
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
35 changes: 31 additions & 4 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2979,27 +2979,54 @@ type unreliableRaftHandler struct {
rangeID roachpb.RangeID
storage.RaftMessageHandler
// If non-nil, can return false to avoid dropping a msg to rangeID
drop func(request *storage.RaftMessageRequest, response *storage.RaftMessageResponse) bool
dropReq func(*storage.RaftMessageRequest) bool
dropHB func(*storage.RaftHeartbeat) bool
dropResp func(*storage.RaftMessageResponse) bool
}

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
if req.RangeID == h.rangeID {
if h.drop == nil || h.drop(req, nil) {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
reqCpy := *req
req = &reqCpy
req.Heartbeats = h.filterHeartbeats(req.Heartbeats)
req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps)
if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 {
// Entirely filtered.
return nil
}
} else if req.RangeID == h.rangeID {
if h.dropReq == nil || h.dropReq(req) {
return nil
}
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (h *unreliableRaftHandler) filterHeartbeats(
hbs []storage.RaftHeartbeat,
) []storage.RaftHeartbeat {
if len(hbs) == 0 {
return hbs
}
var cpy []storage.RaftHeartbeat
for i := range hbs {
hb := &hbs[i]
if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) {
cpy = append(cpy, *hb)
}
}
return cpy
}

func (h *unreliableRaftHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
if resp.RangeID == h.rangeID {
if h.drop == nil || h.drop(nil, resp) {
if h.dropResp == nil || h.dropResp(resp) {
return nil
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,12 +603,14 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
mtc.transport.Listen(partitionStore.Ident.StoreID, &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: partitionStore,
drop: func(req *storage.RaftMessageRequest, _ *storage.RaftMessageResponse) bool {
dropReq: func(req *storage.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just truncated can
// make it through. The Raft transport is asynchronous so this is necessary
// to make the test pass reliably.
return req != nil && req.Message.Type == raftpb.MsgApp && req.Message.Index <= index
return req.Message.Type == raftpb.MsgApp && req.Message.Index <= index
},
dropHB: func(*storage.RaftHeartbeat) bool { return false },
dropResp: func(*storage.RaftMessageResponse) bool { return false },
})

// Check the error.
Expand Down

0 comments on commit af48c34

Please sign in to comment.