diff --git a/x/sync/client.go b/x/sync/client.go index a9a2d4a1b695..c2d0a2f66e66 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -70,14 +70,13 @@ type ClientConfig struct { } func NewClient(config *ClientConfig) Client { - c := &client{ + return &client{ networkClient: config.NetworkClient, stateSyncNodes: config.StateSyncNodeIDs, stateSyncMinVersion: config.StateSyncMinVersion, log: config.Log, metrics: config.Metrics, } - return c } // GetChangeProof synchronously retrieves the change proof given by [req]. @@ -196,6 +195,8 @@ func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofReq // [parseFn] parses the raw response. // If the request is unsuccessful or the response can't be parsed, // retries the request to a different peer until [ctx] expires. +// Returns [errAppRequestSendFailed] if we fail to send an AppRequest. +// This should be treated as a fatal error. func getAndParse[T any]( ctx context.Context, client *client, @@ -215,6 +216,11 @@ func getAndParse[T any]( } } + if errors.Is(err, errAppRequestSendFailed) { + // Failing to send an AppRequest is a fatal error. + return nil, err + } + client.log.Debug("request failed, retrying", zap.Stringer("nodeID", nodeID), zap.Int("attempt", attempt), @@ -249,6 +255,8 @@ func getAndParse[T any]( // until the node receives a response, failure notification // or [ctx] is canceled. // Returns the peer's NodeID and response. +// Returns [errAppRequestSendFailed] if we failed to send an AppRequest. +// This should be treated as fatal. // It's safe to call this method multiple times concurrently. func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { var ( diff --git a/x/sync/client_test.go b/x/sync/client_test.go index 159654b339ee..b67f10373212 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -6,7 +6,6 @@ package sync import ( "context" "math/rand" - "sync" "testing" "time" @@ -22,7 +21,6 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/maybe" - "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/x/merkledb" @@ -39,59 +37,88 @@ func newDefaultDBConfig() merkledb.Config { } } -func sendRangeRequest( +// Create a client and send a range proof request to a server +// whose underlying database is [serverDB]. +// The server's response is modified with [modifyResponse] before +// being returned to the server. +// The client makes at most [maxAttempts] attempts to fulfill +// the request before returning an error. +func sendRangeProofRequest( t *testing.T, - db DB, + serverDB DB, request *pb.SyncGetRangeProofRequest, - maxAttempts uint32, + maxAttempts int, modifyResponse func(*merkledb.RangeProof), ) (*merkledb.RangeProof, error) { t.Helper() - var wg sync.WaitGroup - defer wg.Wait() // wait for goroutines spawned - require := require.New(t) ctrl := gomock.NewController(t) - sender := common.NewMockSender(ctrl) - handler := NewNetworkServer(sender, db, logging.NoLog{}) - clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() - networkClient, err := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}, "", prometheus.NewRegistry()) - require.NoError(err) - require.NoError(networkClient.Connected(context.Background(), serverNodeID, version.CurrentApp)) - client := NewClient(&ClientConfig{ - NetworkClient: networkClient, - Metrics: &mockMetrics{}, - Log: logging.NoLog{}, - }) - - ctx, cancel := context.WithCancel(context.Background()) - deadline := time.Now().Add(1 * time.Hour) // enough time to complete a request - defer cancel() // avoid leaking a goroutine - - expectedSendNodeIDs := set.Of(serverNodeID) - sender.EXPECT().SendAppRequest( - gomock.Any(), // ctx - expectedSendNodeIDs, // {serverNodeID} - gomock.Any(), // requestID - gomock.Any(), // requestBytes + var ( + // Number of calls from the client to the server so far. + numAttempts int + + // Sends messages from server to client. + sender = common.NewMockSender(ctrl) + + // Serves the range proof. + server = NewNetworkServer(sender, serverDB, logging.NoLog{}) + + clientNodeID, serverNodeID = ids.GenerateTestNodeID(), ids.GenerateTestNodeID() + + // "Sends" the request from the client to the server and + // "receives" the response from the server. In reality, + // it just invokes the server's method and receives + // the response on [serverResponseChan]. + networkClient = NewMockNetworkClient(ctrl) + + serverResponseChan = make(chan []byte, 1) + + // The client fetching a range proof. + client = NewClient(&ClientConfig{ + NetworkClient: networkClient, + Metrics: &mockMetrics{}, + Log: logging.NoLog{}, + }) + + // The context used in client.GetRangeProof. + // Canceled after the first response is received because + // the client will keep sending requests until its context + // expires or it succeeds. + ctx, cancel = context.WithCancel(context.Background()) + ) + + defer cancel() + + networkClient.EXPECT().RequestAny( + gomock.Any(), // ctx + gomock.Any(), // min version + gomock.Any(), // request ).DoAndReturn( - func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, requestBytes []byte) error { - // limit the number of attempts to [maxAttempts] by cancelling the context if needed. - if requestID >= maxAttempts { - cancel() - return ctx.Err() + func(_ context.Context, _ *version.Application, request []byte) (ids.NodeID, []byte, error) { + go func() { + // Get response from server + require.NoError(server.AppRequest(context.Background(), clientNodeID, 0, time.Now().Add(time.Hour), request)) + }() + + // Wait for response from server + serverResponse := <-serverResponseChan + + numAttempts++ + + if numAttempts >= maxAttempts { + defer cancel() } - wg.Add(1) - go func() { - defer wg.Done() - require.NoError(handler.AppRequest(ctx, clientNodeID, requestID, deadline, requestBytes)) - }() // should be on a goroutine so the test can make progress. - return nil + return serverNodeID, serverResponse, nil }, ).AnyTimes() + + // Handle bandwidth tracking calls from client. + networkClient.EXPECT().TrackBandwidth(gomock.Any(), gomock.Any()).AnyTimes() + + // The server should expect to "send" a response to the client. sender.EXPECT().SendAppResponse( gomock.Any(), // ctx clientNodeID, @@ -114,7 +141,9 @@ func sendRangeRequest( // reserialize the response and pass it to the client to complete the handling. responseBytes, err := proto.Marshal(response.ToProto()) require.NoError(err) - require.NoError(networkClient.AppResponse(context.Background(), serverNodeID, requestID, responseBytes)) + + serverResponseChan <- responseBytes + return nil }, ).AnyTimes() @@ -282,7 +311,7 @@ func TestGetRangeProof(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { require := require.New(t) - proof, err := sendRangeRequest(t, test.db, test.request, 1, test.modifyResponse) + proof, err := sendRangeProofRequest(t, test.db, test.request, 1, test.modifyResponse) require.ErrorIs(err, test.expectedErr) if test.expectedErr != nil { return @@ -297,60 +326,83 @@ func TestGetRangeProof(t *testing.T) { } } -func sendChangeRequest( +func sendChangeProofRequest( t *testing.T, db DB, verificationDB DB, request *pb.SyncGetChangeProofRequest, - maxAttempts uint32, + maxAttempts int, modifyResponse func(*merkledb.ChangeProof), ) (*merkledb.ChangeProof, error) { t.Helper() - var wg sync.WaitGroup - defer wg.Wait() // wait for goroutines spawned - require := require.New(t) ctrl := gomock.NewController(t) - sender := common.NewMockSender(ctrl) - handler := NewNetworkServer(sender, db, logging.NoLog{}) - clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() - networkClient, err := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}, "", prometheus.NewRegistry()) - require.NoError(err) - require.NoError(networkClient.Connected(context.Background(), serverNodeID, version.CurrentApp)) - client := NewClient(&ClientConfig{ - NetworkClient: networkClient, - Metrics: &mockMetrics{}, - Log: logging.NoLog{}, - }) - - ctx, cancel := context.WithCancel(context.Background()) - deadline := time.Now().Add(1 * time.Hour) // enough time to complete a request - defer cancel() // avoid leaking a goroutine - - expectedSendNodeIDs := set.Of(serverNodeID) - sender.EXPECT().SendAppRequest( - gomock.Any(), // ctx - expectedSendNodeIDs, // {serverNodeID} - gomock.Any(), // requestID - gomock.Any(), // requestBytes + var ( + // Number of calls from the client to the server so far. + numAttempts int + + // Sends messages from server to client. + sender = common.NewMockSender(ctrl) + + // Serves the change proof. + server = NewNetworkServer(sender, db, logging.NoLog{}) + + clientNodeID, serverNodeID = ids.GenerateTestNodeID(), ids.GenerateTestNodeID() + + // "Sends" the request from the client to the server and + // "receives" the response from the server. In reality, + // it just invokes the server's method and receives + // the response on [serverResponseChan]. + networkClient = NewMockNetworkClient(ctrl) + + serverResponseChan = make(chan []byte, 1) + + // The client fetching a change proof. + client = NewClient(&ClientConfig{ + NetworkClient: networkClient, + Metrics: &mockMetrics{}, + Log: logging.NoLog{}, + }) + + // The context used in client.GetChangeProof. + // Canceled after the first response is received because + // the client will keep sending requests until its context + // expires or it succeeds. + ctx, cancel = context.WithCancel(context.Background()) + ) + + defer cancel() // avoid leaking a goroutine + + networkClient.EXPECT().RequestAny( + gomock.Any(), // ctx + gomock.Any(), // min version + gomock.Any(), // request ).DoAndReturn( - func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, requestBytes []byte) error { - // limit the number of attempts to [maxAttempts] by cancelling the context if needed. - if requestID >= maxAttempts { - cancel() - return ctx.Err() + func(_ context.Context, _ *version.Application, request []byte) (ids.NodeID, []byte, error) { + go func() { + // Get response from server + require.NoError(server.AppRequest(context.Background(), clientNodeID, 0, time.Now().Add(time.Hour), request)) + }() + + // Wait for response from server + serverResponse := <-serverResponseChan + + numAttempts++ + + if numAttempts >= maxAttempts { + defer cancel() } - wg.Add(1) - go func() { - defer wg.Done() - require.NoError(handler.AppRequest(ctx, clientNodeID, requestID, deadline, requestBytes)) - }() // should be on a goroutine so the test can make progress. - return nil + return serverNodeID, serverResponse, nil }, ).AnyTimes() + + // Handle bandwidth tracking calls from client. + networkClient.EXPECT().TrackBandwidth(gomock.Any(), gomock.Any()).AnyTimes() + + // The server should expect to "send" a response to the client. sender.EXPECT().SendAppResponse( gomock.Any(), // ctx clientNodeID, @@ -363,8 +415,6 @@ func sendChangeRequest( require.NoError(proto.Unmarshal(responseBytes, &responseProto)) var changeProof merkledb.ChangeProof - // TODO when the client/server support including range proofs in the response, - // this will need to be updated. require.NoError(changeProof.UnmarshalProto(responseProto.GetChangeProof())) // modify if needed @@ -379,7 +429,9 @@ func sendChangeRequest( }, }) require.NoError(err) - require.NoError(networkClient.AppResponse(context.Background(), serverNodeID, requestID, responseBytes)) + + serverResponseChan <- responseBytes + return nil }, ).AnyTimes() @@ -548,7 +600,7 @@ func TestGetChangeProof(t *testing.T) { t.Run(name, func(t *testing.T) { require := require.New(t) - proof, err := sendChangeRequest(t, trieDB, verificationDB, test.request, 1, test.modifyResponse) + proof, err := sendChangeProofRequest(t, trieDB, verificationDB, test.request, 1, test.modifyResponse) require.ErrorIs(err, test.expectedErr) if test.expectedErr != nil { return @@ -597,9 +649,47 @@ func TestRangeProofRetries(t *testing.T) { response.KeyValues = nil } } - proof, err := sendRangeRequest(t, db, request, uint32(maxRequests), modifyResponse) + proof, err := sendRangeProofRequest(t, db, request, maxRequests, modifyResponse) require.NoError(err) require.Len(proof.KeyValues, keyCount) require.Equal(responseCount, maxRequests) // check the client performed retries. } + +// Test that a failure to send an AppRequest is propagated +// and returned by GetRangeProof and GetChangeProof. +func TestAppRequestSendFailed(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + networkClient := NewMockNetworkClient(ctrl) + + client := NewClient( + &ClientConfig{ + NetworkClient: networkClient, + Log: logging.NoLog{}, + Metrics: &mockMetrics{}, + }, + ) + + // Mock failure to send app request + networkClient.EXPECT().RequestAny( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(ids.NodeID{}, nil, errAppRequestSendFailed).Times(2) + + _, err := client.GetChangeProof( + context.Background(), + &pb.SyncGetChangeProofRequest{}, + nil, // database is unused + ) + require.ErrorIs(err, errAppRequestSendFailed) + + _, err = client.GetRangeProof( + context.Background(), + &pb.SyncGetRangeProofRequest{}, + ) + require.ErrorIs(err, errAppRequestSendFailed) +} diff --git a/x/sync/mock_network_client.go b/x/sync/mock_network_client.go new file mode 100644 index 000000000000..913eb2163e7f --- /dev/null +++ b/x/sync/mock_network_client.go @@ -0,0 +1,136 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: x/sync/network_client.go + +// Package mock_sync is a generated GoMock package. +package sync + +import ( + context "context" + reflect "reflect" + + ids "github.com/ava-labs/avalanchego/ids" + version "github.com/ava-labs/avalanchego/version" + gomock "github.com/golang/mock/gomock" +) + +// MockNetworkClient is a mock of NetworkClient interface. +type MockNetworkClient struct { + ctrl *gomock.Controller + recorder *MockNetworkClientMockRecorder +} + +// MockNetworkClientMockRecorder is the mock recorder for MockNetworkClient. +type MockNetworkClientMockRecorder struct { + mock *MockNetworkClient +} + +// NewMockNetworkClient creates a new mock instance. +func NewMockNetworkClient(ctrl *gomock.Controller) *MockNetworkClient { + mock := &MockNetworkClient{ctrl: ctrl} + mock.recorder = &MockNetworkClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockNetworkClient) EXPECT() *MockNetworkClientMockRecorder { + return m.recorder +} + +// AppRequestFailed mocks base method. +func (m *MockNetworkClient) AppRequestFailed(arg0 context.Context, arg1 ids.NodeID, arg2 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AppRequestFailed", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AppRequestFailed indicates an expected call of AppRequestFailed. +func (mr *MockNetworkClientMockRecorder) AppRequestFailed(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppRequestFailed", reflect.TypeOf((*MockNetworkClient)(nil).AppRequestFailed), arg0, arg1, arg2) +} + +// AppResponse mocks base method. +func (m *MockNetworkClient) AppResponse(arg0 context.Context, arg1 ids.NodeID, arg2 uint32, arg3 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AppResponse", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// AppResponse indicates an expected call of AppResponse. +func (mr *MockNetworkClientMockRecorder) AppResponse(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppResponse", reflect.TypeOf((*MockNetworkClient)(nil).AppResponse), arg0, arg1, arg2, arg3) +} + +// Connected mocks base method. +func (m *MockNetworkClient) Connected(arg0 context.Context, arg1 ids.NodeID, arg2 *version.Application) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Connected", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Connected indicates an expected call of Connected. +func (mr *MockNetworkClientMockRecorder) Connected(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connected", reflect.TypeOf((*MockNetworkClient)(nil).Connected), arg0, arg1, arg2) +} + +// Disconnected mocks base method. +func (m *MockNetworkClient) Disconnected(arg0 context.Context, arg1 ids.NodeID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Disconnected", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Disconnected indicates an expected call of Disconnected. +func (mr *MockNetworkClientMockRecorder) Disconnected(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnected", reflect.TypeOf((*MockNetworkClient)(nil).Disconnected), arg0, arg1) +} + +// Request mocks base method. +func (m *MockNetworkClient) Request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Request", ctx, nodeID, request) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Request indicates an expected call of Request. +func (mr *MockNetworkClientMockRecorder) Request(ctx, nodeID, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Request", reflect.TypeOf((*MockNetworkClient)(nil).Request), ctx, nodeID, request) +} + +// RequestAny mocks base method. +func (m *MockNetworkClient) RequestAny(ctx context.Context, minVersion *version.Application, request []byte) (ids.NodeID, []byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequestAny", ctx, minVersion, request) + ret0, _ := ret[0].(ids.NodeID) + ret1, _ := ret[1].([]byte) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// RequestAny indicates an expected call of RequestAny. +func (mr *MockNetworkClientMockRecorder) RequestAny(ctx, minVersion, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestAny", reflect.TypeOf((*MockNetworkClient)(nil).RequestAny), ctx, minVersion, request) +} + +// TrackBandwidth mocks base method. +func (m *MockNetworkClient) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "TrackBandwidth", nodeID, bandwidth) +} + +// TrackBandwidth indicates an expected call of TrackBandwidth. +func (mr *MockNetworkClientMockRecorder) TrackBandwidth(nodeID, bandwidth interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrackBandwidth", reflect.TypeOf((*MockNetworkClient)(nil).TrackBandwidth), nodeID, bandwidth) +} diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 52643bc83048..d9a1098cc94a 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -29,8 +29,9 @@ const minRequestHandlingDuration = 100 * time.Millisecond var ( _ NetworkClient = (*networkClient)(nil) - ErrAcquiringSemaphore = errors.New("error acquiring semaphore") - ErrRequestFailed = errors.New("request failed") + errAcquiringSemaphore = errors.New("error acquiring semaphore") + errRequestFailed = errors.New("request failed") + errAppRequestSendFailed = errors.New("failed to send AppRequest") ) // NetworkClient defines ability to send request / response through the Network @@ -39,17 +40,38 @@ type NetworkClient interface { // node version greater than or equal to minVersion. // Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if // the request should be retried. - RequestAny(ctx context.Context, minVersion *version.Application, request []byte) (ids.NodeID, []byte, error) - - // Request synchronously sends request to the selected nodeID. - // Returns response bytes, and ErrRequestFailed if the request should be retried. - Request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) + RequestAny( + ctx context.Context, + minVersion *version.Application, + request []byte, + ) (ids.NodeID, []byte, error) + + // Sends [request] to [nodeID] and returns the response. + // Blocks until the number of outstanding requests is + // below the limit before sending the request. + Request( + ctx context.Context, + nodeID ids.NodeID, + request []byte, + ) ([]byte, error) // The following declarations allow this interface to be embedded in the VM // to handle incoming responses from peers. + + // Always returns nil because the engine considers errors + // returned from this function as fatal. AppResponse(context.Context, ids.NodeID, uint32, []byte) error + + // Always returns nil because the engine considers errors + // returned from this function as fatal. AppRequestFailed(context.Context, ids.NodeID, uint32) error + + // Adds the given [nodeID] to the peer + // list so that it can receive messages. + // If [nodeID] is this node's ID, this is a no-op. Connected(context.Context, ids.NodeID, *version.Application) error + + // Removes given [nodeID] from the peer list. Disconnected(context.Context, ids.NodeID) error } @@ -93,8 +115,6 @@ func NewNetworkClient( }, nil } -// Always returns nil because the engine considers errors -// returned from this function as fatal. func (c *networkClient) AppResponse( _ context.Context, nodeID ids.NodeID, @@ -127,8 +147,6 @@ func (c *networkClient) AppResponse( return nil } -// Always returns nil because the engine considers errors -// returned from this function as fatal. func (c *networkClient) AppRequestFailed( _ context.Context, nodeID ids.NodeID, @@ -171,11 +189,6 @@ func (c *networkClient) getRequestHandler(requestID uint32) (ResponseHandler, bo return handler, true } -// RequestAny synchronously sends [request] to a randomly chosen peer with a -// version greater than or equal to [minVersion]. If [minVersion] is nil, -// the request is sent to any peer regardless of their version. -// May block until the number of outstanding requests decreases. -// Returns the node's response and the ID of the node. func (c *networkClient) RequestAny( ctx context.Context, minVersion *version.Application, @@ -183,7 +196,7 @@ func (c *networkClient) RequestAny( ) (ids.NodeID, []byte, error) { // Take a slot from total [activeRequests] and block until a slot becomes available. if err := c.activeRequests.Acquire(ctx, 1); err != nil { - return ids.EmptyNodeID, nil, ErrAcquiringSemaphore + return ids.EmptyNodeID, nil, errAcquiringSemaphore } defer c.activeRequests.Release(1) @@ -199,9 +212,6 @@ func (c *networkClient) RequestAny( return nodeID, response, err } -// Sends [request] to [nodeID] and returns the response. -// Blocks until the number of outstanding requests is -// below the limit before sending the request. func (c *networkClient) Request( ctx context.Context, nodeID ids.NodeID, @@ -210,7 +220,7 @@ func (c *networkClient) Request( // Take a slot from total [activeRequests] // and block until a slot becomes available. if err := c.activeRequests.Acquire(ctx, 1); err != nil { - return nil, ErrAcquiringSemaphore + return nil, errAcquiringSemaphore } defer c.activeRequests.Release(1) @@ -244,7 +254,7 @@ func (c *networkClient) request( // Send an app request to the peer. if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil { c.lock.Unlock() - return nil, err + return nil, fmt.Errorf("%w: %s", errAppRequestSendFailed, err) } handler := newResponseHandler() @@ -268,7 +278,7 @@ func (c *networkClient) request( } if handler.failed { c.peers.TrackBandwidth(nodeID, 0) - return nil, ErrRequestFailed + return nil, errRequestFailed } c.log.Debug("received response from peer", @@ -279,9 +289,6 @@ func (c *networkClient) request( return response, nil } -// Connected adds the given [nodeID] to the peer -// list so that it can receive messages. -// If [nodeID] is [c.myNodeID], this is a no-op. func (c *networkClient) Connected( _ context.Context, nodeID ids.NodeID, @@ -297,7 +304,6 @@ func (c *networkClient) Connected( return nil } -// Disconnected removes given [nodeID] from the peer list. func (c *networkClient) Disconnected(_ context.Context, nodeID ids.NodeID) error { if nodeID == c.myNodeID { c.log.Debug("skipping deregistering self as peer")