Skip to content

Commit

Permalink
sync -- handle fatal error (#1874)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Laine authored Aug 18, 2023
1 parent cf14d79 commit 7d970c3
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 26 deletions.
6 changes: 3 additions & 3 deletions x/sync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (c *client) GetRangeProof(
// [parseFn] parses the raw response.
// If the request is unsuccessful or the response can't be parsed,
// retries the request to a different peer until [ctx] expires.
// Returns [errAppRequestSendFailed] if we fail to send an AppRequest.
// Returns [errAppSendFailed] if we fail to send an AppRequest/AppResponse.
// This should be treated as a fatal error.
func getAndParse[T any](
ctx context.Context,
Expand All @@ -301,7 +301,7 @@ func getAndParse[T any](
}
}

if errors.Is(err, errAppRequestSendFailed) {
if errors.Is(err, errAppSendFailed) {
// Failing to send an AppRequest is a fatal error.
return nil, err
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func getAndParse[T any](
// until the node receives a response, failure notification
// or [ctx] is canceled.
// Returns the peer's NodeID and response.
// Returns [errAppRequestSendFailed] if we failed to send an AppRequest.
// Returns [errAppSendFailed] if we failed to send an AppRequest/AppResponse.
// This should be treated as fatal.
// It's safe to call this method multiple times concurrently.
func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) {
Expand Down
6 changes: 3 additions & 3 deletions x/sync/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,18 +794,18 @@ func TestAppRequestSendFailed(t *testing.T) {
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(ids.NodeID{}, nil, errAppRequestSendFailed).Times(2)
).Return(ids.NodeID{}, nil, errAppSendFailed).Times(2)

_, err := client.GetChangeProof(
context.Background(),
&pb.SyncGetChangeProofRequest{},
nil, // database is unused
)
require.ErrorIs(err, errAppRequestSendFailed)
require.ErrorIs(err, errAppSendFailed)

_, err = client.GetRangeProof(
context.Background(),
&pb.SyncGetRangeProofRequest{},
)
require.ErrorIs(err, errAppRequestSendFailed)
require.ErrorIs(err, errAppSendFailed)
}
17 changes: 13 additions & 4 deletions x/sync/network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ const minRequestHandlingDuration = 100 * time.Millisecond
var (
_ NetworkClient = (*networkClient)(nil)

errAcquiringSemaphore = errors.New("error acquiring semaphore")
errRequestFailed = errors.New("request failed")
errAppRequestSendFailed = errors.New("failed to send AppRequest")
errAcquiringSemaphore = errors.New("error acquiring semaphore")
errRequestFailed = errors.New("request failed")
errAppSendFailed = errors.New("failed to send app message")
)

// NetworkClient defines ability to send request / response through the Network
Expand Down Expand Up @@ -189,6 +189,7 @@ func (c *networkClient) getRequestHandler(requestID uint32) (ResponseHandler, bo
return handler, true
}

// If [errAppSendFailed] is returned this should be considered fatal.
func (c *networkClient) RequestAny(
ctx context.Context,
minVersion *version.Application,
Expand All @@ -212,6 +213,7 @@ func (c *networkClient) RequestAny(
return nodeID, response, err
}

// If [errAppSendFailed] is returned this should be considered fatal.
func (c *networkClient) Request(
ctx context.Context,
nodeID ids.NodeID,
Expand All @@ -229,6 +231,7 @@ func (c *networkClient) Request(

// Sends [request] to [nodeID] and returns the response.
// Returns an error if the request failed or [ctx] is canceled.
// If [errAppSendFailed] is returned this should be considered fatal.
// Blocks until a response is received or the [ctx] is canceled fails.
// Releases active requests semaphore if there was an error in sending the request.
// Assumes [nodeID] is never [c.myNodeID] since we guarantee
Expand All @@ -254,7 +257,13 @@ func (c *networkClient) request(
// Send an app request to the peer.
if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil {
c.lock.Unlock()
return nil, fmt.Errorf("%w: %s", errAppRequestSendFailed, err)
c.log.Fatal(
"failed to send app request",
zap.Stringer("nodeID", nodeID),
zap.Int("requestLen", len(request)),
zap.Error(err),
)
return nil, fmt.Errorf("%w: %s", errAppSendFailed, err)
}

handler := newResponseHandler()
Expand Down
66 changes: 50 additions & 16 deletions x/sync/network_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewNetworkServer(appSender common.AppSender, db DB, log logging.Logger) *Ne
}

// AppRequest is called by avalanchego -> VM when there is an incoming AppRequest from a peer.
// Never returns errors as they are considered fatal.
// Returns a non-nil error iff we fail to send an app message. This is a fatal error.
// Sends a response back to the sender if length of response returned by the handler > 0.
func (s *NetworkServer) AppRequest(
ctx context.Context,
Expand Down Expand Up @@ -121,14 +121,20 @@ func (s *NetworkServer) AppRequest(
return nil
}

if err != nil && !isTimeout(err) {
// log unexpected errors instead of returning them, since they are fatal.
s.log.Warn(
"unexpected error handling AppRequest",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
if err != nil {
if errors.Is(err, errAppSendFailed) {
return err
}

if !isTimeout(err) {
// log unexpected errors instead of returning them, since they are fatal.
s.log.Warn(
"unexpected error handling AppRequest",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
}
}
return nil
}
Expand All @@ -141,6 +147,7 @@ func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] {
}

// Generates a change proof and sends it to [nodeID].
// If [errAppSendFailed] is returned, this should be considered fatal.
func (s *NetworkServer) HandleChangeProofRequest(
ctx context.Context,
nodeID ids.NodeID,
Expand Down Expand Up @@ -212,8 +219,17 @@ func (s *NetworkServer) HandleChangeProofRequest(
return err
}

// TODO handle this fatal error
return s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes)
if err := s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes); err != nil {
s.log.Fatal(
"failed to send app response",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Int("responseLen", len(proofBytes)),
zap.Error(err),
)
return fmt.Errorf("%w: %s", errAppSendFailed, err)
}
return nil
}

// We generated a change proof. See if it's small enough.
Expand All @@ -227,8 +243,17 @@ func (s *NetworkServer) HandleChangeProofRequest(
}

if len(proofBytes) < bytesLimit {
// TODO handle this fatal error
return s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes)
if err := s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes); err != nil {
s.log.Fatal(
"failed to send app response",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Int("responseLen", len(proofBytes)),
zap.Error(err),
)
return fmt.Errorf("%w: %s", errAppSendFailed, err)
}
return nil
}

// The proof was too large. Try to shrink it.
Expand All @@ -238,7 +263,7 @@ func (s *NetworkServer) HandleChangeProofRequest(
}

// Generates a range proof and sends it to [nodeID].
// TODO danlaine how should we handle context cancellation?
// If [errAppSendFailed] is returned, this should be considered fatal.
func (s *NetworkServer) HandleRangeProofRequest(
ctx context.Context,
nodeID ids.NodeID,
Expand Down Expand Up @@ -276,8 +301,17 @@ func (s *NetworkServer) HandleRangeProofRequest(
if err != nil {
return err
}
// TODO handle this fatal error
return s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes)
if err := s.appSender.SendAppResponse(ctx, nodeID, requestID, proofBytes); err != nil {
s.log.Fatal(
"failed to send app response",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Int("responseLen", len(proofBytes)),
zap.Error(err),
)
return fmt.Errorf("%w: %s", errAppSendFailed, err)
}
return nil
}

// Get the range proof specified by [req].
Expand Down
109 changes: 109 additions & 0 deletions x/sync/network_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,112 @@ func Test_Server_GetChangeProof(t *testing.T) {
})
}
}

// Test that AppRequest returns a non-nil error if we fail to send
// an AppRequest or AppResponse.
func TestAppRequestErrAppSendFailed(t *testing.T) {
startRootID := ids.GenerateTestID()
endRootID := ids.GenerateTestID()

type test struct {
name string
request *pb.Request
handlerFunc func(*gomock.Controller) *NetworkServer
expectedErr error
}

tests := []test{
{
name: "GetChangeProof",
request: &pb.Request{
Message: &pb.Request_ChangeProofRequest{
ChangeProofRequest: &pb.SyncGetChangeProofRequest{
StartRootHash: startRootID[:],
EndRootHash: endRootID[:],
StartKey: []byte{1},
EndKey: &pb.MaybeBytes{Value: []byte{2}},
KeyLimit: 100,
BytesLimit: 100,
},
},
},
handlerFunc: func(ctrl *gomock.Controller) *NetworkServer {
sender := common.NewMockSender(ctrl)
sender.EXPECT().SendAppResponse(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(errAppSendFailed).AnyTimes()

db := merkledb.NewMockMerkleDB(ctrl)
db.EXPECT().GetChangeProof(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&merkledb.ChangeProof{}, nil).Times(1)

return NewNetworkServer(sender, db, logging.NoLog{})
},
expectedErr: errAppSendFailed,
},
{
name: "GetRangeProof",
request: &pb.Request{
Message: &pb.Request_RangeProofRequest{
RangeProofRequest: &pb.SyncGetRangeProofRequest{
RootHash: endRootID[:],
StartKey: []byte{1},
EndKey: &pb.MaybeBytes{Value: []byte{2}},
KeyLimit: 100,
BytesLimit: 100,
},
},
},
handlerFunc: func(ctrl *gomock.Controller) *NetworkServer {
sender := common.NewMockSender(ctrl)
sender.EXPECT().SendAppResponse(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(errAppSendFailed).AnyTimes()

db := merkledb.NewMockMerkleDB(ctrl)
db.EXPECT().GetRangeProofAtRoot(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&merkledb.RangeProof{}, nil).Times(1)

return NewNetworkServer(sender, db, logging.NoLog{})
},
expectedErr: errAppSendFailed,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)

handler := tt.handlerFunc(ctrl)
requestBytes, err := proto.Marshal(tt.request)
require.NoError(err)

err = handler.AppRequest(
context.Background(),
ids.EmptyNodeID,
0,
time.Now().Add(10*time.Second),
requestBytes,
)
require.ErrorIs(err, tt.expectedErr)
})
}
}

0 comments on commit 7d970c3

Please sign in to comment.