From 737d4d07bde086220ed23c5049b05656d7e9daf7 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 10:13:04 -0400 Subject: [PATCH 01/28] comment nits --- x/sync/network_client.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 4419138abfe6..05f80bbfe89f 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -82,8 +82,7 @@ func NewNetworkClient( } } -// AppResponse is called when this node receives a response from a peer. -// As the engine considers errors returned from this function as fatal, +// Always returns nil because the engine considers errors returned from this function as fatal, // this function always returns nil. func (c *networkClient) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { c.lock.Lock() @@ -142,7 +141,7 @@ func (c *networkClient) AppRequestFailed(_ context.Context, nodeID ids.NodeID, r } // Returns the handler for [requestID] and marks the request as fulfilled. -// This is called by either [AppResponse] or [AppRequestFailed]. +// Returns false if there's no outstanding request with [requestID]. // Assumes [c.lock] is held. func (c *networkClient) getRequestHandler(requestID uint32) (ResponseHandler, bool) { handler, exists := c.outstandingRequestHandlers[requestID] @@ -156,10 +155,9 @@ func (c *networkClient) getRequestHandler(requestID uint32) (ResponseHandler, bo } // RequestAny synchronously sends [request] to a randomly chosen peer with a -// node version greater than or equal to [minVersion]. If [minVersion] is nil, +// version greater than or equal to [minVersion]. If [minVersion] is nil, // the request is sent to any peer regardless of their version. -// If the limit on active requests is reached, this function blocks until -// a slot becomes available. +// May block until the number of outstanding requests decreases. // Returns the node's response and the ID of the node. func (c *networkClient) RequestAny( ctx context.Context, @@ -195,8 +193,7 @@ func (c *networkClient) Request(ctx context.Context, nodeID ids.NodeID, request return c.request(ctx, nodeID, request) } -// Sends [request] to [nodeID] and adds the response handler to [c.outstandingRequestHandlers] -// so that it can be invoked upon response/failure. +// Sends [request] to [nodeID] and returns the response. // Blocks until a response is received or the request fails. // Assumes [nodeID] is never [c.myNodeID] since we guarantee [c.myNodeID] will not be added to [c.peers]. // Releases active requests semaphore if there was an error in sending the request. From accde389a538a16efbc31b3f6b52bd2a40466ac1 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 10:34:53 -0400 Subject: [PATCH 02/28] comment nits --- x/sync/network_client.go | 117 ++++++++++++++++++++++++++------------- 1 file changed, 77 insertions(+), 40 deletions(-) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 05f80bbfe89f..e6e7964b1871 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -56,14 +56,20 @@ type NetworkClient interface { } type networkClient struct { - lock sync.Mutex // lock for mutating state of this Network struct - myNodeID ids.NodeID // NodeID of this node - requestID uint32 // requestID counter used to track outbound requests - outstandingRequestHandlers map[uint32]ResponseHandler // requestID => handler for the response/failure - activeRequests *semaphore.Weighted // controls maximum number of active outbound requests - peers *peerTracker // tracking of peers & bandwidth - appSender common.AppSender // AppSender for sending messages - log logging.Logger + lock sync.Mutex + log logging.Logger + // This node's ID + myNodeID ids.NodeID + // requestID counter used to track outbound requests + requestID uint32 + // requestID => handler for the response/failure + outstandingRequestHandlers map[uint32]ResponseHandler + // controls maximum number of active outbound requests + activeRequests *semaphore.Weighted + // tracking of peers & bandwidth usage + peers *peerTracker + // For sending messages to peers + appSender common.AppSender } func NewNetworkClient( @@ -82,9 +88,14 @@ func NewNetworkClient( } } -// Always returns nil because the engine considers errors returned from this function as fatal, -// this function always returns nil. -func (c *networkClient) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { +// Always returns nil because the engine considers errors +// returned from this function as fatal. +func (c *networkClient) AppResponse( + _ context.Context, + nodeID ids.NodeID, + requestID uint32, + response []byte, +) error { c.lock.Lock() defer c.lock.Unlock() @@ -97,7 +108,8 @@ func (c *networkClient) AppResponse(_ context.Context, nodeID ids.NodeID, reques handler, exists := c.getRequestHandler(requestID) if !exists { - // Should never happen since the engine should be managing outstanding requests + // Should never happen since the engine + // should be managing outstanding requests c.log.Error( "received response to unknown request", zap.Stringer("nodeID", nodeID), @@ -110,13 +122,13 @@ func (c *networkClient) AppResponse(_ context.Context, nodeID ids.NodeID, reques return nil } -// AppRequestFailed can be called by the avalanchego -> VM in following cases: -// - node is benched -// - failed to send message to [nodeID] due to a network issue -// - timeout -// As the engine considers errors returned from this function as fatal, -// this function always returns nil. -func (c *networkClient) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error { +// Always returns nil because the engine considers errors +// returned from this function as fatal. +func (c *networkClient) AppRequestFailed( + _ context.Context, + nodeID ids.NodeID, + requestID uint32, +) error { c.lock.Lock() defer c.lock.Unlock() @@ -128,7 +140,8 @@ func (c *networkClient) AppRequestFailed(_ context.Context, nodeID ids.NodeID, r handler, exists := c.getRequestHandler(requestID) if !exists { - // Should never happen since the engine should be managing outstanding requests + // Should never happen since the engine + // should be managing outstanding requests c.log.Error( "received request failed to unknown request", zap.Stringer("nodeID", nodeID), @@ -177,14 +190,22 @@ func (c *networkClient) RequestAny( c.lock.Unlock() c.activeRequests.Release(1) - return nil, ids.EmptyNodeID, fmt.Errorf("no peers found matching version %s out of %d peers", minVersion, c.peers.Size()) + return nil, ids.EmptyNodeID, fmt.Errorf( + "no peers found matching version %s out of %d peers", + minVersion, c.peers.Size(), + ) } -// Sends [request] to [nodeID] and registers a handler for the response/failure. -// If the limit on active requests is reached, this function blocks until -// a slot becomes available. -func (c *networkClient) Request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) { - // Take a slot from total [activeRequests] and block until a slot becomes available. +// Sends [request] to [nodeID] and returns the response. +// Blocks until the number of outstanding requests is +// below the limit before sending the request. +func (c *networkClient) Request( + ctx context.Context, + nodeID ids.NodeID, + request []byte, +) ([]byte, error) { + // Take a slot from total [activeRequests] + // and block until a slot becomes available. if err := c.activeRequests.Acquire(ctx, 1); err != nil { return nil, ErrAcquiringSemaphore } @@ -194,22 +215,26 @@ func (c *networkClient) Request(ctx context.Context, nodeID ids.NodeID, request } // Sends [request] to [nodeID] and returns the response. -// Blocks until a response is received or the request fails. -// Assumes [nodeID] is never [c.myNodeID] since we guarantee [c.myNodeID] will not be added to [c.peers]. +// Returns an error if the request failed or [ctx] is canceled. +// Blocks until a response is received or the [ctx] is canceled fails. // Releases active requests semaphore if there was an error in sending the request. -// Returns an error if [appSender] is unable to make the request. +// Assumes [nodeID] is never [c.myNodeID] since we guarantee +// [c.myNodeID] will not be added to [c.peers]. // Assumes [c.lock] is held and unlocks [c.lock] before returning. -func (c *networkClient) request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) { - c.log.Debug("sending request to peer", zap.Stringer("nodeID", nodeID), zap.Int("requestLen", len(request))) +func (c *networkClient) request( + ctx context.Context, + nodeID ids.NodeID, + request []byte, +) ([]byte, error) { + c.log.Debug("sending request to peer", + zap.Stringer("nodeID", nodeID), + zap.Int("requestLen", len(request)), + ) c.peers.TrackPeer(nodeID) - // generate requestID requestID := c.requestID c.requestID++ - handler := newResponseHandler() - c.outstandingRequestHandlers[requestID] = handler - nodeIDs := set.NewSet[ids.NodeID](1) nodeIDs.Add(nodeID) @@ -217,11 +242,13 @@ func (c *networkClient) request(ctx context.Context, nodeID ids.NodeID, request if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil { // On failure, release the activeRequests slot and mark the message as processed. c.activeRequests.Release(1) - delete(c.outstandingRequestHandlers, requestID) c.lock.Unlock() return nil, err } + handler := newResponseHandler() + c.outstandingRequestHandlers[requestID] = handler + c.lock.Unlock() // unlock so response can be received var response []byte @@ -237,13 +264,19 @@ func (c *networkClient) request(ctx context.Context, nodeID ids.NodeID, request c.log.Debug("received response from peer", zap.Stringer("nodeID", nodeID), zap.Uint32("requestID", requestID), - zap.Int("responseLen", len(response))) + zap.Int("responseLen", len(response)), + ) return response, nil } -// Connected adds the given nodeID to the peer list so that it can receive messages. +// Connected adds the given [nodeID] to the peer +// list so that it can receive messages. // If [nodeID] is [c.myNodeID], this is a no-op. -func (c *networkClient) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error { +func (c *networkClient) Connected( + _ context.Context, + nodeID ids.NodeID, + nodeVersion *version.Application, +) error { c.lock.Lock() defer c.lock.Unlock() @@ -258,11 +291,15 @@ func (c *networkClient) Connected(_ context.Context, nodeID ids.NodeID, nodeVers } // Disconnected removes given [nodeID] from the peer list. -// TODO danlaine: should this be a no-op if [nodeID] is [c.myNodeID]? func (c *networkClient) Disconnected(_ context.Context, nodeID ids.NodeID) error { c.lock.Lock() defer c.lock.Unlock() + if nodeID == c.myNodeID { + c.log.Debug("skipping deregistering self as peer") + return nil + } + c.log.Debug("disconnecting peer", zap.Stringer("nodeID", nodeID)) c.peers.Disconnected(nodeID) return nil From 8beeb868455aa88d7a9c438d5bd4d7d49d4fa603 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 10:37:40 -0400 Subject: [PATCH 03/28] Error --> Warn log --- x/sync/network_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index e6e7964b1871..cc085ff25379 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -110,7 +110,7 @@ func (c *networkClient) AppResponse( if !exists { // Should never happen since the engine // should be managing outstanding requests - c.log.Error( + c.log.Warn( "received response to unknown request", zap.Stringer("nodeID", nodeID), zap.Uint32("requestID", requestID), @@ -142,7 +142,7 @@ func (c *networkClient) AppRequestFailed( if !exists { // Should never happen since the engine // should be managing outstanding requests - c.log.Error( + c.log.Warn( "received request failed to unknown request", zap.Stringer("nodeID", nodeID), zap.Uint32("requestID", requestID), From 2dd784dfce3b8d31dd9e79b6000c86017a2280d7 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 10:41:10 -0400 Subject: [PATCH 04/28] nits --- x/sync/client.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 3d6e0be6fe15..a2634775aa58 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -216,9 +216,10 @@ func getAndParse[T any](ctx context.Context, client *client, request []byte, par } // get sends [request] to an arbitrary peer and blocks until the node receives a response -// or [ctx] expires. Returns the raw response from the peer, the peer's NodeID, and an -// error if the request timed out. Thread safe. -func (c *client) get(ctx context.Context, requestBytes []byte) ([]byte, ids.NodeID, error) { +// or [ctx] expires. +// Returns the peer's NodeID and response. +// It's safe to call this method multiple times concurrently. +func (c *client) get(ctx context.Context, request []byte) ([]byte, ids.NodeID, error) { c.metrics.RequestMade() var ( response []byte @@ -227,13 +228,13 @@ func (c *client) get(ctx context.Context, requestBytes []byte) ([]byte, ids.Node startTime = time.Now() ) if len(c.stateSyncNodes) == 0 { - response, nodeID, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, requestBytes) + response, nodeID, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, request) } else { // get the next nodeID using the nodeIdx offset. If we're out of nodes, loop back to 0 // we do this every attempt to ensure we get a different node each time if possible. nodeIdx := atomic.AddUint32(&c.stateSyncNodeIdx, 1) nodeID = c.stateSyncNodes[nodeIdx%uint32(len(c.stateSyncNodes))] - response, err = c.networkClient.Request(ctx, nodeID, requestBytes) + response, err = c.networkClient.Request(ctx, nodeID, request) } if err != nil { c.metrics.RequestFailed() From c68b57c18cabadab512255102699c834cf809f7d Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 10:42:03 -0400 Subject: [PATCH 05/28] change order of return values in get --- x/sync/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index a2634775aa58..c197d3bf075b 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -195,7 +195,7 @@ func getAndParse[T any](ctx context.Context, client *client, request []byte, par } return nil, err } - responseBytes, nodeID, err := client.get(ctx, request) + nodeID, responseBytes, err := client.get(ctx, request) if err == nil { if response, err = parseFn(ctx, responseBytes); err == nil { return response, nil @@ -219,7 +219,7 @@ func getAndParse[T any](ctx context.Context, client *client, request []byte, par // or [ctx] expires. // Returns the peer's NodeID and response. // It's safe to call this method multiple times concurrently. -func (c *client) get(ctx context.Context, request []byte) ([]byte, ids.NodeID, error) { +func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { c.metrics.RequestMade() var ( response []byte @@ -239,11 +239,11 @@ func (c *client) get(ctx context.Context, request []byte) ([]byte, ids.NodeID, e if err != nil { c.metrics.RequestFailed() c.networkClient.TrackBandwidth(nodeID, 0) - return response, nodeID, err + return nodeID, response, err } bandwidth := float64(len(response)) / (time.Since(startTime).Seconds() + epsilon) c.networkClient.TrackBandwidth(nodeID, bandwidth) c.metrics.RequestSucceeded() - return response, nodeID, nil + return nodeID, response, nil } From c6d57c465ae6595ac2cb6113ffd7b09f1fc78f95 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 11:06:40 -0400 Subject: [PATCH 06/28] remove sleep --- x/sync/client.go | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index c197d3bf075b..88b8b9a3a21b 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -181,7 +181,12 @@ func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofReq // [parseFn] is called with the raw response. If [parseFn] returns an error or the request // times out, this function will retry the request to a different peer until [ctx] expires. // If [parseFn] returns a nil error, the result is returned from getAndParse. -func getAndParse[T any](ctx context.Context, client *client, request []byte, parseFn func(context.Context, []byte) (*T, error)) (*T, error) { +func getAndParse[T any]( + ctx context.Context, + client *client, + request []byte, + parseFn func(context.Context, []byte) (*T, error), +) (*T, error) { var ( lastErr error response *T @@ -191,10 +196,16 @@ func getAndParse[T any](ctx context.Context, client *client, request []byte, par // If the context has finished, return the context error early. if err := ctx.Err(); err != nil { if lastErr != nil { - return nil, fmt.Errorf("request failed after %d attempts with last error %w and ctx error %s", attempt, lastErr, err) + return nil, fmt.Errorf( + "request failed after %d attempts with last error %s and ctx error %w", + attempt, + lastErr, + err, + ) } return nil, err } + nodeID, responseBytes, err := client.get(ctx, request) if err == nil { if response, err = parseFn(ctx, responseBytes); err == nil { @@ -205,18 +216,26 @@ func getAndParse[T any](ctx context.Context, client *client, request []byte, par client.log.Debug("request failed, retrying", zap.Stringer("nodeID", nodeID), zap.Int("attempt", attempt), - zap.Error(err)) + zap.Error(err), + ) if err != ctx.Err() { - // if [err] is being propagated from [ctx], avoid overwriting [lastErr]. + // Don't overwrite [lastErr] if it's context cancelation. lastErr = err - time.Sleep(failedRequestSleepInterval) + + select { + case <-ctx.Done(): + // Return error at top of loop. + // Don't do it here to avoid duplicate code. + case <-time.After(failedRequestSleepInterval): + // TODO should we randomize this? + } } } } // get sends [request] to an arbitrary peer and blocks until the node receives a response -// or [ctx] expires. +// or [ctx] is canceled. // Returns the peer's NodeID and response. // It's safe to call this method multiple times concurrently. func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { From 17235ad90cb8bd7242c54263515e59ebe191c7f6 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 11:09:16 -0400 Subject: [PATCH 07/28] comment nits --- x/sync/client.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 88b8b9a3a21b..2f6a21648e7d 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -188,6 +188,7 @@ func getAndParse[T any]( parseFn func(context.Context, []byte) (*T, error), ) (*T, error) { var ( + // TODO is reporting the last error worth the code complexity? lastErr error response *T ) @@ -198,9 +199,7 @@ func getAndParse[T any]( if lastErr != nil { return nil, fmt.Errorf( "request failed after %d attempts with last error %s and ctx error %w", - attempt, - lastErr, - err, + attempt, lastErr, err, ) } return nil, err @@ -223,6 +222,7 @@ func getAndParse[T any]( // Don't overwrite [lastErr] if it's context cancelation. lastErr = err + // Wait before retrying. select { case <-ctx.Done(): // Return error at top of loop. @@ -234,8 +234,8 @@ func getAndParse[T any]( } } -// get sends [request] to an arbitrary peer and blocks until the node receives a response -// or [ctx] is canceled. +// get sends [request] to an arbitrary peer and blocks +// until the node receives a response or [ctx] is canceled. // Returns the peer's NodeID and response. // It's safe to call this method multiple times concurrently. func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { @@ -249,8 +249,9 @@ func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, e if len(c.stateSyncNodes) == 0 { response, nodeID, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, request) } else { - // get the next nodeID using the nodeIdx offset. If we're out of nodes, loop back to 0 - // we do this every attempt to ensure we get a different node each time if possible. + // Get the next nodeID to query using the [nodeIdx] offset. + // If we're out of nodes, loop back to 0. + // We do this try to query a different node each time if possible. nodeIdx := atomic.AddUint32(&c.stateSyncNodeIdx, 1) nodeID = c.stateSyncNodes[nodeIdx%uint32(len(c.stateSyncNodes))] response, err = c.networkClient.Request(ctx, nodeID, request) From 49a9f16ac783c61a4b8e22283d16193eb05a192c Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 11:20:20 -0400 Subject: [PATCH 08/28] comment nits --- x/sync/client.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 2f6a21648e7d..f017fa30b530 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -177,10 +177,11 @@ func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofReq return getAndParse(ctx, c, reqBytes, parseFn) } -// getAndParse uses [client] to send [request] to an arbitrary peer. If the peer responds, -// [parseFn] is called with the raw response. If [parseFn] returns an error or the request -// times out, this function will retry the request to a different peer until [ctx] expires. -// If [parseFn] returns a nil error, the result is returned from getAndParse. +// getAndParse uses [client] to send [request] to an arbitrary peer. +// Returns the response to the request. +// [parseFn] parses the raw response. +// If the request is unsuccessful or the response can't be parsed, +// retries the request to a different peer until [ctx] expires. func getAndParse[T any]( ctx context.Context, client *client, @@ -235,7 +236,8 @@ func getAndParse[T any]( } // get sends [request] to an arbitrary peer and blocks -// until the node receives a response or [ctx] is canceled. +// until the node receives a response, failure notification +// or [ctx] is canceled. // Returns the peer's NodeID and response. // It's safe to call this method multiple times concurrently. func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { From 2b52ab32f38499126e3e720cc4ec1f7b728505ff Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 12:12:00 -0400 Subject: [PATCH 09/28] clean up tracking of outstanding requests --- x/sync/network_client.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index cc085ff25379..db969d711b19 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -163,7 +163,6 @@ func (c *networkClient) getRequestHandler(requestID uint32) (ResponseHandler, bo } // mark message as processed, release activeRequests slot delete(c.outstandingRequestHandlers, requestID) - c.activeRequests.Release(1) return handler, true } @@ -181,6 +180,7 @@ func (c *networkClient) RequestAny( if err := c.activeRequests.Acquire(ctx, 1); err != nil { return nil, ids.EmptyNodeID, ErrAcquiringSemaphore } + defer c.activeRequests.Release(1) c.lock.Lock() if nodeID, ok := c.peers.GetAnyPeer(minVersion); ok { @@ -189,7 +189,6 @@ func (c *networkClient) RequestAny( } c.lock.Unlock() - c.activeRequests.Release(1) return nil, ids.EmptyNodeID, fmt.Errorf( "no peers found matching version %s out of %d peers", minVersion, c.peers.Size(), @@ -209,6 +208,7 @@ func (c *networkClient) Request( if err := c.activeRequests.Acquire(ctx, 1); err != nil { return nil, ErrAcquiringSemaphore } + defer c.activeRequests.Release(1) c.lock.Lock() return c.request(ctx, nodeID, request) @@ -240,8 +240,6 @@ func (c *networkClient) request( // Send an app request to the peer. if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil { - // On failure, release the activeRequests slot and mark the message as processed. - c.activeRequests.Release(1) c.lock.Unlock() return nil, err } From a6ae3a5eec11071dcf02b102cd848df49891a291 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 12:23:49 -0400 Subject: [PATCH 10/28] comment --- x/sync/network_client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index db969d711b19..78a6515e9194 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -184,6 +184,7 @@ func (c *networkClient) RequestAny( c.lock.Lock() if nodeID, ok := c.peers.GetAnyPeer(minVersion); ok { + // Note [c.request] releases [c.lock]. response, err := c.request(ctx, nodeID, request) return response, nodeID, err } @@ -211,6 +212,7 @@ func (c *networkClient) Request( defer c.activeRequests.Release(1) c.lock.Lock() + // Note [c.request] releases [c.lock]. return c.request(ctx, nodeID, request) } From a852001015ae08de7daf160542d4a8b0cdef6002 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 12:39:35 -0400 Subject: [PATCH 11/28] revert error changes --- x/sync/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/sync/client.go b/x/sync/client.go index f017fa30b530..3dfb44fa5150 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -199,7 +199,7 @@ func getAndParse[T any]( if err := ctx.Err(); err != nil { if lastErr != nil { return nil, fmt.Errorf( - "request failed after %d attempts with last error %s and ctx error %w", + "request failed after %d attempts with last error %w and ctx error %s", attempt, lastErr, err, ) } From b17c2fee4a8ff8bfa69d0184af6814a4001e7f93 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Mon, 3 Jul 2023 11:02:42 -0700 Subject: [PATCH 12/28] simplify client get loop (#1681) --- x/sync/client.go | 35 +++++++++++++---------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 3dfb44fa5150..1b8d9668b6df 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -189,23 +189,11 @@ func getAndParse[T any]( parseFn func(context.Context, []byte) (*T, error), ) (*T, error) { var ( - // TODO is reporting the last error worth the code complexity? lastErr error response *T ) // Loop until the context is cancelled or we get a valid response. - for attempt := 0; ; attempt++ { - // If the context has finished, return the context error early. - if err := ctx.Err(); err != nil { - if lastErr != nil { - return nil, fmt.Errorf( - "request failed after %d attempts with last error %w and ctx error %s", - attempt, lastErr, err, - ) - } - return nil, err - } - + for attempt := 1; ; attempt++ { nodeID, responseBytes, err := client.get(ctx, request) if err == nil { if response, err = parseFn(ctx, responseBytes); err == nil { @@ -218,19 +206,22 @@ func getAndParse[T any]( zap.Int("attempt", attempt), zap.Error(err), ) - + // if [err] is being propagated from [ctx], avoid overwriting [lastErr]. if err != ctx.Err() { - // Don't overwrite [lastErr] if it's context cancelation. lastErr = err + } - // Wait before retrying. - select { - case <-ctx.Done(): - // Return error at top of loop. - // Don't do it here to avoid duplicate code. - case <-time.After(failedRequestSleepInterval): - // TODO should we randomize this? + select { + case <-ctx.Done(): + if lastErr != nil { + // prefer reporting [lastErr] if it's not nil. + return nil, fmt.Errorf( + "request failed after %d attempts with last error %w and ctx error %s", + attempt, lastErr, ctx.Err(), + ) } + return nil, ctx.Err() + case <-time.After(failedRequestSleepInterval): } } } From 81f9dbc83850aa0d062194f9cab20cb4bff73ec5 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 14:04:16 -0400 Subject: [PATCH 13/28] nit --- x/sync/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x/sync/client.go b/x/sync/client.go index 1b8d9668b6df..02c9a24492f7 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -232,13 +232,15 @@ func getAndParse[T any]( // Returns the peer's NodeID and response. // It's safe to call this method multiple times concurrently. func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { - c.metrics.RequestMade() var ( response []byte nodeID ids.NodeID err error startTime = time.Now() ) + + c.metrics.RequestMade() + if len(c.stateSyncNodes) == 0 { response, nodeID, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, request) } else { From 9b3b9556107d1c6db2b774660a2c420792963ef1 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 15:20:44 -0400 Subject: [PATCH 14/28] `sync` -- add exponential backoff (#1684) --- x/sync/client.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 02c9a24492f7..5e4ef5c3fc58 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "math" "sync/atomic" "time" @@ -23,7 +24,9 @@ import ( ) const ( - failedRequestSleepInterval = 10 * time.Millisecond + initialRetryWait = 10 * time.Millisecond + maxRetryWait = time.Second + retryWaitFactor = 1.5 // Larger --> timeout grows more quickly epsilon = 1e-6 // small amount to add to time to avoid division by 0 ) @@ -211,6 +214,11 @@ func getAndParse[T any]( lastErr = err } + retryWait := initialRetryWait * time.Duration(math.Pow(retryWaitFactor, float64(attempt))) + if retryWait > maxRetryWait || retryWait < 0 { // Handle overflows with negative check. + retryWait = maxRetryWait + } + select { case <-ctx.Done(): if lastErr != nil { @@ -221,7 +229,7 @@ func getAndParse[T any]( ) } return nil, ctx.Err() - case <-time.After(failedRequestSleepInterval): + case <-time.After(retryWait): } } } From b8e99ff89e00e80ce3f946f0b67b3580124183bb Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 15:24:34 -0400 Subject: [PATCH 15/28] unexport stuff --- x/sync/client.go | 35 ++++++++++----------- x/sync/client_test.go | 12 +++---- x/sync/manager.go | 2 +- x/sync/network_client.go | 68 ++++++++++++++++++++-------------------- x/sync/sync_test.go | 2 +- 5 files changed, 59 insertions(+), 60 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 5e4ef5c3fc58..3fb7ae7381ca 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -32,16 +32,16 @@ const ( ) var ( - _ Client = (*client)(nil) + _ client = (*clientImpl)(nil) errInvalidRangeProof = errors.New("failed to verify range proof") errTooManyKeys = errors.New("response contains more than requested keys") errTooManyBytes = errors.New("response contains more than requested bytes") ) -// Client synchronously fetches data from the network to fulfill state sync requests. +// client synchronously fetches data from the network to fulfill state sync requests. // Repeatedly retries failed requests until the context is canceled. -type Client interface { +type client interface { // GetRangeProof synchronously sends the given request, returning a parsed StateResponse or error // Note: this verifies the response including the range proof. GetRangeProof(ctx context.Context, request *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) @@ -51,8 +51,8 @@ type Client interface { GetChangeProof(ctx context.Context, request *pb.SyncGetChangeProofRequest, verificationDB DB) (*merkledb.ChangeProof, error) } -type client struct { - networkClient NetworkClient +type clientImpl struct { + networkClient networkClient stateSyncNodes []ids.NodeID stateSyncNodeIdx uint32 stateSyncMinVersion *version.Application @@ -60,29 +60,28 @@ type client struct { metrics SyncMetrics } -type ClientConfig struct { - NetworkClient NetworkClient +type clientConfig struct { + NetworkClient networkClient StateSyncNodeIDs []ids.NodeID StateSyncMinVersion *version.Application Log logging.Logger Metrics SyncMetrics } -func NewClient(config *ClientConfig) Client { - c := &client{ +func NewClient(config *clientConfig) client { + return &clientImpl{ networkClient: config.NetworkClient, stateSyncNodes: config.StateSyncNodeIDs, stateSyncMinVersion: config.StateSyncMinVersion, log: config.Log, metrics: config.Metrics, } - return c } // GetChangeProof synchronously retrieves the change proof given by [req]. // Upon failure, retries until the context is expired. // The returned change proof is verified. -func (c *client) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofRequest, db DB) (*merkledb.ChangeProof, error) { +func (c *clientImpl) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofRequest, db DB) (*merkledb.ChangeProof, error) { parseFn := func(ctx context.Context, responseBytes []byte) (*merkledb.ChangeProof, error) { if len(responseBytes) > int(req.BytesLimit) { return nil, fmt.Errorf("%w: (%d) > %d)", errTooManyBytes, len(responseBytes), req.BytesLimit) @@ -131,7 +130,7 @@ func (c *client) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofR // GetRangeProof synchronously retrieves the range proof given by [req]. // Upon failure, retries until the context is expired. // The returned range proof is verified. -func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) { +func (c *clientImpl) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) { parseFn := func(ctx context.Context, responseBytes []byte) (*merkledb.RangeProof, error) { if len(responseBytes) > int(req.BytesLimit) { return nil, fmt.Errorf("%w: (%d) > %d)", errTooManyBytes, len(responseBytes), req.BytesLimit) @@ -187,7 +186,7 @@ func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofReq // retries the request to a different peer until [ctx] expires. func getAndParse[T any]( ctx context.Context, - client *client, + client *clientImpl, request []byte, parseFn func(context.Context, []byte) (*T, error), ) (*T, error) { @@ -239,7 +238,7 @@ func getAndParse[T any]( // or [ctx] is canceled. // Returns the peer's NodeID and response. // It's safe to call this method multiple times concurrently. -func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { +func (c *clientImpl) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { var ( response []byte nodeID ids.NodeID @@ -250,23 +249,23 @@ func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, e c.metrics.RequestMade() if len(c.stateSyncNodes) == 0 { - response, nodeID, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, request) + response, nodeID, err = c.networkClient.requestAny(ctx, c.stateSyncMinVersion, request) } else { // Get the next nodeID to query using the [nodeIdx] offset. // If we're out of nodes, loop back to 0. // We do this try to query a different node each time if possible. nodeIdx := atomic.AddUint32(&c.stateSyncNodeIdx, 1) nodeID = c.stateSyncNodes[nodeIdx%uint32(len(c.stateSyncNodes))] - response, err = c.networkClient.Request(ctx, nodeID, request) + response, err = c.networkClient.request(ctx, nodeID, request) } if err != nil { c.metrics.RequestFailed() - c.networkClient.TrackBandwidth(nodeID, 0) + c.networkClient.trackBandwidth(nodeID, 0) return nodeID, response, err } bandwidth := float64(len(response)) / (time.Since(startTime).Seconds() + epsilon) - c.networkClient.TrackBandwidth(nodeID, bandwidth) + c.networkClient.trackBandwidth(nodeID, bandwidth) c.metrics.RequestSucceeded() return nodeID, response, nil } diff --git a/x/sync/client_test.go b/x/sync/client_test.go index 0b118c704e45..2612ee537d87 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -58,8 +58,8 @@ func sendRangeRequest( handler := NewNetworkServer(sender, db, logging.NoLog{}) clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() networkClient := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}) - require.NoError(networkClient.Connected(context.Background(), serverNodeID, version.CurrentApp)) - client := NewClient(&ClientConfig{ + require.NoError(networkClient.connected(context.Background(), serverNodeID, version.CurrentApp)) + client := NewClient(&clientConfig{ NetworkClient: networkClient, Metrics: &mockMetrics{}, Log: logging.NoLog{}, @@ -114,7 +114,7 @@ func sendRangeRequest( // reserialize the response and pass it to the client to complete the handling. responseBytes, err := proto.Marshal(response.ToProto()) require.NoError(err) - require.NoError(networkClient.AppResponse(context.Background(), serverNodeID, requestID, responseBytes)) + require.NoError(networkClient.appResponse(context.Background(), serverNodeID, requestID, responseBytes)) return nil }, ).AnyTimes() @@ -318,8 +318,8 @@ func sendChangeRequest( handler := NewNetworkServer(sender, db, logging.NoLog{}) clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() networkClient := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}) - require.NoError(networkClient.Connected(context.Background(), serverNodeID, version.CurrentApp)) - client := NewClient(&ClientConfig{ + require.NoError(networkClient.connected(context.Background(), serverNodeID, version.CurrentApp)) + client := NewClient(&clientConfig{ NetworkClient: networkClient, Metrics: &mockMetrics{}, Log: logging.NoLog{}, @@ -380,7 +380,7 @@ func sendChangeRequest( }, }) require.NoError(err) - require.NoError(networkClient.AppResponse(context.Background(), serverNodeID, requestID, responseBytes)) + require.NoError(networkClient.appResponse(context.Background(), serverNodeID, requestID, responseBytes)) return nil }, ).AnyTimes() diff --git a/x/sync/manager.go b/x/sync/manager.go index 992451312f0b..e039d98fd7aa 100644 --- a/x/sync/manager.go +++ b/x/sync/manager.go @@ -107,7 +107,7 @@ type Manager struct { type ManagerConfig struct { DB DB - Client Client + Client client SimultaneousWorkLimit int Log logging.Logger TargetRoot ids.ID diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 78a6515e9194..d195e981333c 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -25,37 +25,37 @@ import ( const minRequestHandlingDuration = 100 * time.Millisecond var ( - _ NetworkClient = (*networkClient)(nil) + _ networkClient = (*networkClientImpl)(nil) ErrAcquiringSemaphore = errors.New("error acquiring semaphore") ErrRequestFailed = errors.New("request failed") ) -// NetworkClient defines ability to send request / response through the Network -type NetworkClient interface { - // RequestAny synchronously sends request to an arbitrary peer with a +// networkClient defines ability to send request / response through the Network +type networkClient interface { + // requestAny synchronously sends request to an arbitrary peer with a // node version greater than or equal to minVersion. // Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if // the request should be retried. - RequestAny(ctx context.Context, minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) + requestAny(ctx context.Context, minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) - // Request synchronously sends request to the selected nodeID. + // request synchronously sends request to the selected nodeID. // Returns response bytes, and ErrRequestFailed if the request should be retried. - Request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) + request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) - // TrackBandwidth should be called for each valid response with the bandwidth + // trackBandwidth should be called for each valid response with the bandwidth // (length of response divided by request time), and with 0 if the response is invalid. - TrackBandwidth(nodeID ids.NodeID, bandwidth float64) + trackBandwidth(nodeID ids.NodeID, bandwidth float64) // The following declarations allow this interface to be embedded in the VM // to handle incoming responses from peers. - AppResponse(context.Context, ids.NodeID, uint32, []byte) error - AppRequestFailed(context.Context, ids.NodeID, uint32) error - Connected(context.Context, ids.NodeID, *version.Application) error - Disconnected(context.Context, ids.NodeID) error + appResponse(context.Context, ids.NodeID, uint32, []byte) error + appRequestFailed(context.Context, ids.NodeID, uint32) error + connected(context.Context, ids.NodeID, *version.Application) error + disconnected(context.Context, ids.NodeID) error } -type networkClient struct { +type networkClientImpl struct { lock sync.Mutex log logging.Logger // This node's ID @@ -77,8 +77,8 @@ func NewNetworkClient( myNodeID ids.NodeID, maxActiveRequests int64, log logging.Logger, -) NetworkClient { - return &networkClient{ +) networkClient { + return &networkClientImpl{ appSender: appSender, myNodeID: myNodeID, outstandingRequestHandlers: make(map[uint32]ResponseHandler), @@ -90,7 +90,7 @@ func NewNetworkClient( // Always returns nil because the engine considers errors // returned from this function as fatal. -func (c *networkClient) AppResponse( +func (c *networkClientImpl) appResponse( _ context.Context, nodeID ids.NodeID, requestID uint32, @@ -124,7 +124,7 @@ func (c *networkClient) AppResponse( // Always returns nil because the engine considers errors // returned from this function as fatal. -func (c *networkClient) AppRequestFailed( +func (c *networkClientImpl) appRequestFailed( _ context.Context, nodeID ids.NodeID, requestID uint32, @@ -156,7 +156,7 @@ func (c *networkClient) AppRequestFailed( // Returns the handler for [requestID] and marks the request as fulfilled. // Returns false if there's no outstanding request with [requestID]. // Assumes [c.lock] is held. -func (c *networkClient) getRequestHandler(requestID uint32) (ResponseHandler, bool) { +func (c *networkClientImpl) getRequestHandler(requestID uint32) (ResponseHandler, bool) { handler, exists := c.outstandingRequestHandlers[requestID] if !exists { return nil, false @@ -166,12 +166,12 @@ func (c *networkClient) getRequestHandler(requestID uint32) (ResponseHandler, bo return handler, true } -// RequestAny synchronously sends [request] to a randomly chosen peer with a +// requestAny synchronously sends [request] to a randomly chosen peer with a // version greater than or equal to [minVersion]. If [minVersion] is nil, // the request is sent to any peer regardless of their version. // May block until the number of outstanding requests decreases. // Returns the node's response and the ID of the node. -func (c *networkClient) RequestAny( +func (c *networkClientImpl) requestAny( ctx context.Context, minVersion *version.Application, request []byte, @@ -185,7 +185,7 @@ func (c *networkClient) RequestAny( c.lock.Lock() if nodeID, ok := c.peers.GetAnyPeer(minVersion); ok { // Note [c.request] releases [c.lock]. - response, err := c.request(ctx, nodeID, request) + response, err := c.get(ctx, nodeID, request) return response, nodeID, err } @@ -199,7 +199,7 @@ func (c *networkClient) RequestAny( // Sends [request] to [nodeID] and returns the response. // Blocks until the number of outstanding requests is // below the limit before sending the request. -func (c *networkClient) Request( +func (c *networkClientImpl) request( ctx context.Context, nodeID ids.NodeID, request []byte, @@ -213,17 +213,17 @@ func (c *networkClient) Request( c.lock.Lock() // Note [c.request] releases [c.lock]. - return c.request(ctx, nodeID, request) + return c.get(ctx, nodeID, request) } -// Sends [request] to [nodeID] and returns the response. -// Returns an error if the request failed or [ctx] is canceled. +// Sends [get] to [nodeID] and returns the response. +// Returns an error if the get failed or [ctx] is canceled. // Blocks until a response is received or the [ctx] is canceled fails. -// Releases active requests semaphore if there was an error in sending the request. +// Releases active requests semaphore if there was an error in sending the get. // Assumes [nodeID] is never [c.myNodeID] since we guarantee // [c.myNodeID] will not be added to [c.peers]. // Assumes [c.lock] is held and unlocks [c.lock] before returning. -func (c *networkClient) request( +func (c *networkClientImpl) get( ctx context.Context, nodeID ids.NodeID, request []byte, @@ -269,10 +269,10 @@ func (c *networkClient) request( return response, nil } -// Connected adds the given [nodeID] to the peer +// connected adds the given [nodeID] to the peer // list so that it can receive messages. // If [nodeID] is [c.myNodeID], this is a no-op. -func (c *networkClient) Connected( +func (c *networkClientImpl) connected( _ context.Context, nodeID ids.NodeID, nodeVersion *version.Application, @@ -290,8 +290,8 @@ func (c *networkClient) Connected( return nil } -// Disconnected removes given [nodeID] from the peer list. -func (c *networkClient) Disconnected(_ context.Context, nodeID ids.NodeID) error { +// disconnected removes given [nodeID] from the peer list. +func (c *networkClientImpl) disconnected(_ context.Context, nodeID ids.NodeID) error { c.lock.Lock() defer c.lock.Unlock() @@ -306,7 +306,7 @@ func (c *networkClient) Disconnected(_ context.Context, nodeID ids.NodeID) error } // Shutdown disconnects all peers -func (c *networkClient) Shutdown() { +func (c *networkClientImpl) Shutdown() { c.lock.Lock() defer c.lock.Unlock() @@ -315,7 +315,7 @@ func (c *networkClient) Shutdown() { c.peers = newPeerTracker(c.log) } -func (c *networkClient) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) { +func (c *networkClientImpl) trackBandwidth(nodeID ids.NodeID, bandwidth float64) { c.lock.Lock() defer c.lock.Unlock() diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 10ad6ff20c0b..fb0ba07ad245 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -26,7 +26,7 @@ import ( pb "github.com/ava-labs/avalanchego/proto/pb/sync" ) -var _ Client = (*mockClient)(nil) +var _ client = (*mockClient)(nil) func newNoopTracer() trace.Tracer { tracer, _ := trace.New(trace.Config{Enabled: false}) From 32bb544b45ad37ea56891567fe372fb05084772d Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 15:25:11 -0400 Subject: [PATCH 16/28] unexport stuff --- x/sync/network_client.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index d195e981333c..e83f03f69f29 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -27,20 +27,20 @@ const minRequestHandlingDuration = 100 * time.Millisecond var ( _ networkClient = (*networkClientImpl)(nil) - ErrAcquiringSemaphore = errors.New("error acquiring semaphore") - ErrRequestFailed = errors.New("request failed") + errAcquiringSemaphore = errors.New("error acquiring semaphore") + errRequestFailed = errors.New("request failed") ) // networkClient defines ability to send request / response through the Network type networkClient interface { // requestAny synchronously sends request to an arbitrary peer with a // node version greater than or equal to minVersion. - // Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if + // Returns response bytes, the ID of the chosen peer, and errRequestFailed if // the request should be retried. requestAny(ctx context.Context, minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) // request synchronously sends request to the selected nodeID. - // Returns response bytes, and ErrRequestFailed if the request should be retried. + // Returns response bytes, and errRequestFailed if the request should be retried. request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) // trackBandwidth should be called for each valid response with the bandwidth @@ -178,7 +178,7 @@ func (c *networkClientImpl) requestAny( ) ([]byte, ids.NodeID, error) { // Take a slot from total [activeRequests] and block until a slot becomes available. if err := c.activeRequests.Acquire(ctx, 1); err != nil { - return nil, ids.EmptyNodeID, ErrAcquiringSemaphore + return nil, ids.EmptyNodeID, errAcquiringSemaphore } defer c.activeRequests.Release(1) @@ -207,7 +207,7 @@ func (c *networkClientImpl) request( // Take a slot from total [activeRequests] // and block until a slot becomes available. if err := c.activeRequests.Acquire(ctx, 1); err != nil { - return nil, ErrAcquiringSemaphore + return nil, errAcquiringSemaphore } defer c.activeRequests.Release(1) @@ -258,7 +258,7 @@ func (c *networkClientImpl) get( case response = <-handler.responseChan: } if handler.failed { - return nil, ErrRequestFailed + return nil, errRequestFailed } c.log.Debug("received response from peer", From 43912deb3af524e97d1226bef68adf7219e0a9ee Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 15:30:56 -0400 Subject: [PATCH 17/28] more export changes --- x/sync/client.go | 22 +++++++++++----------- x/sync/client_test.go | 4 ++-- x/sync/manager.go | 2 +- x/sync/sync_test.go | 2 +- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 3fb7ae7381ca..9bef593b2677 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -32,16 +32,16 @@ const ( ) var ( - _ client = (*clientImpl)(nil) + _ Client = (*client)(nil) errInvalidRangeProof = errors.New("failed to verify range proof") errTooManyKeys = errors.New("response contains more than requested keys") errTooManyBytes = errors.New("response contains more than requested bytes") ) -// client synchronously fetches data from the network to fulfill state sync requests. +// Client synchronously fetches data from the network to fulfill state sync requests. // Repeatedly retries failed requests until the context is canceled. -type client interface { +type Client interface { // GetRangeProof synchronously sends the given request, returning a parsed StateResponse or error // Note: this verifies the response including the range proof. GetRangeProof(ctx context.Context, request *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) @@ -51,7 +51,7 @@ type client interface { GetChangeProof(ctx context.Context, request *pb.SyncGetChangeProofRequest, verificationDB DB) (*merkledb.ChangeProof, error) } -type clientImpl struct { +type client struct { networkClient networkClient stateSyncNodes []ids.NodeID stateSyncNodeIdx uint32 @@ -60,7 +60,7 @@ type clientImpl struct { metrics SyncMetrics } -type clientConfig struct { +type ClientConfig struct { NetworkClient networkClient StateSyncNodeIDs []ids.NodeID StateSyncMinVersion *version.Application @@ -68,8 +68,8 @@ type clientConfig struct { Metrics SyncMetrics } -func NewClient(config *clientConfig) client { - return &clientImpl{ +func NewClient(config *ClientConfig) Client { + return &client{ networkClient: config.NetworkClient, stateSyncNodes: config.StateSyncNodeIDs, stateSyncMinVersion: config.StateSyncMinVersion, @@ -81,7 +81,7 @@ func NewClient(config *clientConfig) client { // GetChangeProof synchronously retrieves the change proof given by [req]. // Upon failure, retries until the context is expired. // The returned change proof is verified. -func (c *clientImpl) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofRequest, db DB) (*merkledb.ChangeProof, error) { +func (c *client) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofRequest, db DB) (*merkledb.ChangeProof, error) { parseFn := func(ctx context.Context, responseBytes []byte) (*merkledb.ChangeProof, error) { if len(responseBytes) > int(req.BytesLimit) { return nil, fmt.Errorf("%w: (%d) > %d)", errTooManyBytes, len(responseBytes), req.BytesLimit) @@ -130,7 +130,7 @@ func (c *clientImpl) GetChangeProof(ctx context.Context, req *pb.SyncGetChangePr // GetRangeProof synchronously retrieves the range proof given by [req]. // Upon failure, retries until the context is expired. // The returned range proof is verified. -func (c *clientImpl) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) { +func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) { parseFn := func(ctx context.Context, responseBytes []byte) (*merkledb.RangeProof, error) { if len(responseBytes) > int(req.BytesLimit) { return nil, fmt.Errorf("%w: (%d) > %d)", errTooManyBytes, len(responseBytes), req.BytesLimit) @@ -186,7 +186,7 @@ func (c *clientImpl) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProo // retries the request to a different peer until [ctx] expires. func getAndParse[T any]( ctx context.Context, - client *clientImpl, + client *client, request []byte, parseFn func(context.Context, []byte) (*T, error), ) (*T, error) { @@ -238,7 +238,7 @@ func getAndParse[T any]( // or [ctx] is canceled. // Returns the peer's NodeID and response. // It's safe to call this method multiple times concurrently. -func (c *clientImpl) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { +func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { var ( response []byte nodeID ids.NodeID diff --git a/x/sync/client_test.go b/x/sync/client_test.go index 2612ee537d87..cf9c85e992bc 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -59,7 +59,7 @@ func sendRangeRequest( clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() networkClient := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}) require.NoError(networkClient.connected(context.Background(), serverNodeID, version.CurrentApp)) - client := NewClient(&clientConfig{ + client := NewClient(&ClientConfig{ NetworkClient: networkClient, Metrics: &mockMetrics{}, Log: logging.NoLog{}, @@ -319,7 +319,7 @@ func sendChangeRequest( clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() networkClient := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}) require.NoError(networkClient.connected(context.Background(), serverNodeID, version.CurrentApp)) - client := NewClient(&clientConfig{ + client := NewClient(&ClientConfig{ NetworkClient: networkClient, Metrics: &mockMetrics{}, Log: logging.NoLog{}, diff --git a/x/sync/manager.go b/x/sync/manager.go index e039d98fd7aa..992451312f0b 100644 --- a/x/sync/manager.go +++ b/x/sync/manager.go @@ -107,7 +107,7 @@ type Manager struct { type ManagerConfig struct { DB DB - Client client + Client Client SimultaneousWorkLimit int Log logging.Logger TargetRoot ids.ID diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index fb0ba07ad245..10ad6ff20c0b 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -26,7 +26,7 @@ import ( pb "github.com/ava-labs/avalanchego/proto/pb/sync" ) -var _ client = (*mockClient)(nil) +var _ Client = (*mockClient)(nil) func newNoopTracer() trace.Tracer { tracer, _ := trace.New(trace.Config{Enabled: false}) From 62996e358c28d6d153531e9dcf1b744eee223f9b Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 3 Jul 2023 15:33:11 -0400 Subject: [PATCH 18/28] unexport client --- x/sync/client.go | 20 ++++++++++---------- x/sync/manager.go | 2 +- x/sync/sync_test.go | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 9bef593b2677..b03e48d6d6cd 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -32,16 +32,16 @@ const ( ) var ( - _ Client = (*client)(nil) + _ client = (*clientImpl)(nil) errInvalidRangeProof = errors.New("failed to verify range proof") errTooManyKeys = errors.New("response contains more than requested keys") errTooManyBytes = errors.New("response contains more than requested bytes") ) -// Client synchronously fetches data from the network to fulfill state sync requests. +// client synchronously fetches data from the network to fulfill state sync requests. // Repeatedly retries failed requests until the context is canceled. -type Client interface { +type client interface { // GetRangeProof synchronously sends the given request, returning a parsed StateResponse or error // Note: this verifies the response including the range proof. GetRangeProof(ctx context.Context, request *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) @@ -51,7 +51,7 @@ type Client interface { GetChangeProof(ctx context.Context, request *pb.SyncGetChangeProofRequest, verificationDB DB) (*merkledb.ChangeProof, error) } -type client struct { +type clientImpl struct { networkClient networkClient stateSyncNodes []ids.NodeID stateSyncNodeIdx uint32 @@ -68,8 +68,8 @@ type ClientConfig struct { Metrics SyncMetrics } -func NewClient(config *ClientConfig) Client { - return &client{ +func NewClient(config *ClientConfig) client { + return &clientImpl{ networkClient: config.NetworkClient, stateSyncNodes: config.StateSyncNodeIDs, stateSyncMinVersion: config.StateSyncMinVersion, @@ -81,7 +81,7 @@ func NewClient(config *ClientConfig) Client { // GetChangeProof synchronously retrieves the change proof given by [req]. // Upon failure, retries until the context is expired. // The returned change proof is verified. -func (c *client) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofRequest, db DB) (*merkledb.ChangeProof, error) { +func (c *clientImpl) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofRequest, db DB) (*merkledb.ChangeProof, error) { parseFn := func(ctx context.Context, responseBytes []byte) (*merkledb.ChangeProof, error) { if len(responseBytes) > int(req.BytesLimit) { return nil, fmt.Errorf("%w: (%d) > %d)", errTooManyBytes, len(responseBytes), req.BytesLimit) @@ -130,7 +130,7 @@ func (c *client) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofR // GetRangeProof synchronously retrieves the range proof given by [req]. // Upon failure, retries until the context is expired. // The returned range proof is verified. -func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) { +func (c *clientImpl) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofRequest) (*merkledb.RangeProof, error) { parseFn := func(ctx context.Context, responseBytes []byte) (*merkledb.RangeProof, error) { if len(responseBytes) > int(req.BytesLimit) { return nil, fmt.Errorf("%w: (%d) > %d)", errTooManyBytes, len(responseBytes), req.BytesLimit) @@ -186,7 +186,7 @@ func (c *client) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProofReq // retries the request to a different peer until [ctx] expires. func getAndParse[T any]( ctx context.Context, - client *client, + client *clientImpl, request []byte, parseFn func(context.Context, []byte) (*T, error), ) (*T, error) { @@ -238,7 +238,7 @@ func getAndParse[T any]( // or [ctx] is canceled. // Returns the peer's NodeID and response. // It's safe to call this method multiple times concurrently. -func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { +func (c *clientImpl) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { var ( response []byte nodeID ids.NodeID diff --git a/x/sync/manager.go b/x/sync/manager.go index 992451312f0b..e039d98fd7aa 100644 --- a/x/sync/manager.go +++ b/x/sync/manager.go @@ -107,7 +107,7 @@ type Manager struct { type ManagerConfig struct { DB DB - Client Client + Client client SimultaneousWorkLimit int Log logging.Logger TargetRoot ids.ID diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 10ad6ff20c0b..fb0ba07ad245 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -26,7 +26,7 @@ import ( pb "github.com/ava-labs/avalanchego/proto/pb/sync" ) -var _ Client = (*mockClient)(nil) +var _ client = (*mockClient)(nil) func newNoopTracer() trace.Tracer { tracer, _ := trace.New(trace.Config{Enabled: false}) From 8202366269bdca88d3783acbdcb9a38a1989704d Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Wed, 5 Jul 2023 12:09:27 -0400 Subject: [PATCH 19/28] nit --- x/sync/network_client.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 78a6515e9194..4b627a6f1a9d 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -183,17 +183,18 @@ func (c *networkClient) RequestAny( defer c.activeRequests.Release(1) c.lock.Lock() - if nodeID, ok := c.peers.GetAnyPeer(minVersion); ok { - // Note [c.request] releases [c.lock]. - response, err := c.request(ctx, nodeID, request) - return response, nodeID, err + nodeID, ok := c.peers.GetAnyPeer(minVersion) + if !ok { + c.lock.Unlock() + return nil, ids.EmptyNodeID, fmt.Errorf( + "no peers found matching version %s out of %d peers", + minVersion, c.peers.Size(), + ) } - c.lock.Unlock() - return nil, ids.EmptyNodeID, fmt.Errorf( - "no peers found matching version %s out of %d peers", - minVersion, c.peers.Size(), - ) + // Note [c.request] releases [c.lock]. + response, err := c.request(ctx, nodeID, request) + return response, nodeID, err } // Sends [request] to [nodeID] and returns the response. From 82e242dfd6ef01d13333cca9b02a5ae2e228283d Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Wed, 5 Jul 2023 12:26:22 -0400 Subject: [PATCH 20/28] typo --- x/sync/network_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 40bb19cb3819..53b0562d41ca 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -192,8 +192,8 @@ func (c *networkClientImpl) requestAny( ) } - // Note [c.request] releases [c.lock]. - response, err := c.request(ctx, nodeID, request) + // Note [c.get] releases [c.lock]. + response, err := c.get(ctx, nodeID, request) return response, nodeID, err } From 264f143f890f37a2e7fd71e7e15de0448ddb6019 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Wed, 5 Jul 2023 13:47:34 -0400 Subject: [PATCH 21/28] switch return value order for RequestAny --- x/sync/client.go | 2 +- x/sync/network_client.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index 5e4ef5c3fc58..497c4f0bbf15 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -250,7 +250,7 @@ func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, e c.metrics.RequestMade() if len(c.stateSyncNodes) == 0 { - response, nodeID, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, request) + nodeID, response, err = c.networkClient.RequestAny(ctx, c.stateSyncMinVersion, request) } else { // Get the next nodeID to query using the [nodeIdx] offset. // If we're out of nodes, loop back to 0. diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 4b627a6f1a9d..5b312882ff9c 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -37,7 +37,7 @@ type NetworkClient interface { // node version greater than or equal to minVersion. // Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if // the request should be retried. - RequestAny(ctx context.Context, minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) + RequestAny(ctx context.Context, minVersion *version.Application, request []byte) (ids.NodeID, []byte, error) // Request synchronously sends request to the selected nodeID. // Returns response bytes, and ErrRequestFailed if the request should be retried. @@ -175,10 +175,10 @@ func (c *networkClient) RequestAny( ctx context.Context, minVersion *version.Application, request []byte, -) ([]byte, ids.NodeID, error) { +) (ids.NodeID, []byte, error) { // Take a slot from total [activeRequests] and block until a slot becomes available. if err := c.activeRequests.Acquire(ctx, 1); err != nil { - return nil, ids.EmptyNodeID, ErrAcquiringSemaphore + return ids.EmptyNodeID, nil, ErrAcquiringSemaphore } defer c.activeRequests.Release(1) @@ -186,7 +186,7 @@ func (c *networkClient) RequestAny( nodeID, ok := c.peers.GetAnyPeer(minVersion) if !ok { c.lock.Unlock() - return nil, ids.EmptyNodeID, fmt.Errorf( + return ids.EmptyNodeID, nil, fmt.Errorf( "no peers found matching version %s out of %d peers", minVersion, c.peers.Size(), ) @@ -194,7 +194,7 @@ func (c *networkClient) RequestAny( // Note [c.request] releases [c.lock]. response, err := c.request(ctx, nodeID, request) - return response, nodeID, err + return nodeID, response, err } // Sends [request] to [nodeID] and returns the response. From e55ae2169af1517d17f3c4182dfd349f39a2b567 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Wed, 5 Jul 2023 14:21:10 -0400 Subject: [PATCH 22/28] add handling for fatal error --- x/sync/client.go | 7 +++++++ x/sync/network_client.go | 37 +++++++++++++++++++------------------ 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/x/sync/client.go b/x/sync/client.go index cfac82815705..6f24737d2a9e 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -203,6 +203,11 @@ func getAndParse[T any]( } } + if errors.Is(err, errAppRequestSendFailed) { + // Failing to send an AppRequest is a fatal error. + return nil, err + } + client.log.Debug("request failed, retrying", zap.Stringer("nodeID", nodeID), zap.Int("attempt", attempt), @@ -237,6 +242,8 @@ func getAndParse[T any]( // until the node receives a response, failure notification // or [ctx] is canceled. // Returns the peer's NodeID and response. +// Returns [errAppRequestSendFailed] if we failed to send an AppRequest. +// This should be treated as fatal. // It's safe to call this method multiple times concurrently. func (c *clientImpl) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) { var ( diff --git a/x/sync/network_client.go b/x/sync/network_client.go index ca9074152895..25d5ca28fae3 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -27,20 +27,27 @@ const minRequestHandlingDuration = 100 * time.Millisecond var ( _ networkClient = (*networkClientImpl)(nil) - errAcquiringSemaphore = errors.New("error acquiring semaphore") - errRequestFailed = errors.New("request failed") + errAcquiringSemaphore = errors.New("error acquiring semaphore") + errRequestFailed = errors.New("request failed") + errAppRequestSendFailed = errors.New("failed to send AppRequest") ) // networkClient defines ability to send request / response through the Network type networkClient interface { - // requestAny synchronously sends request to an arbitrary peer with a - // node version greater than or equal to minVersion. - // Returns response bytes, the ID of the chosen peer, and errRequestFailed if - // the request should be retried. + // requestAny synchronously sends [request] to a randomly chosen peer with a + // version greater than or equal to [minVersion]. If [minVersion] is nil, + // the request is sent to any peer regardless of their version. + // May block until the number of outstanding requests decreases. + // Returns the node's response and the ID of the node. + // Returns [errAppRequestSendFailed] if we failed to send an AppRequest. + // This should be treated as fatal. requestAny(ctx context.Context, minVersion *version.Application, request []byte) (ids.NodeID, []byte, error) - // request synchronously sends request to the selected nodeID. - // Returns response bytes, and errRequestFailed if the request should be retried. + // Sends [request] to [nodeID] and returns the response. + // Returns [errAppRequestSendFailed] if we failed to send an AppRequest. + // This should be treated as fatal. + // Blocks until the number of outstanding requests is + // below the limit before sending the request. request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) // trackBandwidth should be called for each valid response with the bandwidth @@ -166,11 +173,6 @@ func (c *networkClientImpl) getRequestHandler(requestID uint32) (ResponseHandler return handler, true } -// requestAny synchronously sends [request] to a randomly chosen peer with a -// version greater than or equal to [minVersion]. If [minVersion] is nil, -// the request is sent to any peer regardless of their version. -// May block until the number of outstanding requests decreases. -// Returns the node's response and the ID of the node. func (c *networkClientImpl) requestAny( ctx context.Context, minVersion *version.Application, @@ -197,9 +199,6 @@ func (c *networkClientImpl) requestAny( return nodeID, response, err } -// Sends [request] to [nodeID] and returns the response. -// Blocks until the number of outstanding requests is -// below the limit before sending the request. func (c *networkClientImpl) request( ctx context.Context, nodeID ids.NodeID, @@ -218,7 +217,9 @@ func (c *networkClientImpl) request( } // Sends [get] to [nodeID] and returns the response. -// Returns an error if the get failed or [ctx] is canceled. +// Returns an error if the request failed or [ctx] is canceled. +// Returns [errAppRequestSendFailed] if we failed to send an AppRequest. +// This should be treated as fatal. // Blocks until a response is received or the [ctx] is canceled fails. // Releases active requests semaphore if there was an error in sending the get. // Assumes [nodeID] is never [c.myNodeID] since we guarantee @@ -244,7 +245,7 @@ func (c *networkClientImpl) get( // Send an app request to the peer. if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil { c.lock.Unlock() - return nil, err + return nil, fmt.Errorf("%w: %s", errAppRequestSendFailed, err) } handler := newResponseHandler() From 9cb255edf6c860a145502123a27425d96fb301c8 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Wed, 5 Jul 2023 15:10:02 -0400 Subject: [PATCH 23/28] comment nits --- x/sync/network_client.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 25d5ca28fae3..6aa897096c94 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -34,7 +34,7 @@ var ( // networkClient defines ability to send request / response through the Network type networkClient interface { - // requestAny synchronously sends [request] to a randomly chosen peer with a + // Synchronously sends [request] to a randomly chosen peer with a // version greater than or equal to [minVersion]. If [minVersion] is nil, // the request is sent to any peer regardless of their version. // May block until the number of outstanding requests decreases. @@ -50,15 +50,24 @@ type networkClient interface { // below the limit before sending the request. request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) - // trackBandwidth should be called for each valid response with the bandwidth + // Should be called for each valid response with the bandwidth // (length of response divided by request time), and with 0 if the response is invalid. trackBandwidth(nodeID ids.NodeID, bandwidth float64) - // The following declarations allow this interface to be embedded in the VM - // to handle incoming responses from peers. + // Always returns nil because the engine considers errors + // returned from this function as fatal. appResponse(context.Context, ids.NodeID, uint32, []byte) error + + // Always returns nil because the engine considers errors + // returned from this function as fatal. appRequestFailed(context.Context, ids.NodeID, uint32) error + + // Adds the given [nodeID] to the peer + // list so that it can receive messages. + // If [nodeID] is this node's ID, this is a no-op. connected(context.Context, ids.NodeID, *version.Application) error + + // Removes given [nodeID] from the peer list. disconnected(context.Context, ids.NodeID) error } @@ -95,8 +104,6 @@ func NewNetworkClient( } } -// Always returns nil because the engine considers errors -// returned from this function as fatal. func (c *networkClientImpl) appResponse( _ context.Context, nodeID ids.NodeID, @@ -129,8 +136,6 @@ func (c *networkClientImpl) appResponse( return nil } -// Always returns nil because the engine considers errors -// returned from this function as fatal. func (c *networkClientImpl) appRequestFailed( _ context.Context, nodeID ids.NodeID, @@ -271,9 +276,6 @@ func (c *networkClientImpl) get( return response, nil } -// connected adds the given [nodeID] to the peer -// list so that it can receive messages. -// If [nodeID] is [c.myNodeID], this is a no-op. func (c *networkClientImpl) connected( _ context.Context, nodeID ids.NodeID, @@ -292,7 +294,6 @@ func (c *networkClientImpl) connected( return nil } -// disconnected removes given [nodeID] from the peer list. func (c *networkClientImpl) disconnected(_ context.Context, nodeID ids.NodeID) error { c.lock.Lock() defer c.lock.Unlock() From 1d75906d8456953dc58c6b51e710cb92574ca130 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Wed, 5 Jul 2023 17:05:47 -0400 Subject: [PATCH 24/28] fix client range proof tests --- x/sync/client_test.go | 109 +++++++++++++++++---------- x/sync/mock_network_client.go | 136 ++++++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 40 deletions(-) create mode 100644 x/sync/mock_network_client.go diff --git a/x/sync/client_test.go b/x/sync/client_test.go index cf9c85e992bc..735ed1ba7eb1 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -38,60 +38,87 @@ func newDefaultDBConfig() merkledb.Config { } } -func sendRangeRequest( +// Create a client and send a range proof request to a server +// whose underlying database is [serverDB]. +// The server's response is modified with [modifyResponse] before +// being returned to the server. +// The client makes at most [maxAttempts] attempts to fulfill +// the request before returning an error. +func sendRangeProofRequest( t *testing.T, - db DB, + serverDB DB, request *pb.SyncGetRangeProofRequest, - maxAttempts uint32, + maxAttempts int, modifyResponse func(*merkledb.RangeProof), ) (*merkledb.RangeProof, error) { t.Helper() - var wg sync.WaitGroup - defer wg.Wait() // wait for goroutines spawned - require := require.New(t) ctrl := gomock.NewController(t) defer ctrl.Finish() - sender := common.NewMockSender(ctrl) - handler := NewNetworkServer(sender, db, logging.NoLog{}) - clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() - networkClient := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}) - require.NoError(networkClient.connected(context.Background(), serverNodeID, version.CurrentApp)) - client := NewClient(&ClientConfig{ - NetworkClient: networkClient, - Metrics: &mockMetrics{}, - Log: logging.NoLog{}, - }) + var ( + // Number of calls from the client to the server so far. + numAttempts int - ctx, cancel := context.WithCancel(context.Background()) - deadline := time.Now().Add(1 * time.Hour) // enough time to complete a request - defer cancel() // avoid leaking a goroutine + // Sends messages from server to client. + sender = common.NewMockSender(ctrl) - expectedSendNodeIDs := set.NewSet[ids.NodeID](1) - expectedSendNodeIDs.Add(serverNodeID) - sender.EXPECT().SendAppRequest( - gomock.Any(), // ctx - expectedSendNodeIDs, // {serverNodeID} - gomock.Any(), // requestID - gomock.Any(), // requestBytes + // Server the range proof. + server = NewNetworkServer(sender, serverDB, logging.NoLog{}) + + clientNodeID, serverNodeID = ids.GenerateTestNodeID(), ids.GenerateTestNodeID() + + // "Sends" the request from the client to the server and + // "receives" the response from the server. In reality, + // it just invokes the server's method and receives + // the response on [serverResponseChan]. + networkClient = NewMocknetworkClient(ctrl) + + serverResponseChan = make(chan []byte, 1) + + // The client fetching a range proof. + client = NewClient(&ClientConfig{ + NetworkClient: networkClient, + Metrics: &mockMetrics{}, + Log: logging.NoLog{}, + }) + + // The context used in client.GetRangeProof. + // Canceled after the first response is received because + // the client will keep sending requests until its context + // expires or it succeeds. + ctx, cancel = context.WithCancel(context.Background()) + ) + + networkClient.EXPECT().requestAny( + gomock.Any(), // ctx + gomock.Any(), // min version + gomock.Any(), // request ).DoAndReturn( - func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, requestBytes []byte) error { - // limit the number of attempts to [maxAttempts] by cancelling the context if needed. - if requestID >= maxAttempts { - cancel() - return ctx.Err() + func(_ context.Context, _ *version.Application, request []byte) (ids.NodeID, []byte, error) { + go func() { + // Get response from server + require.NoError(server.AppRequest(context.Background(), clientNodeID, 0, time.Now().Add(time.Hour), request)) + }() + + // Wait for response from server + serverResponse := <-serverResponseChan + + numAttempts++ + + if numAttempts >= maxAttempts { + defer cancel() } - wg.Add(1) - go func() { - defer wg.Done() - require.NoError(handler.AppRequest(ctx, clientNodeID, requestID, deadline, requestBytes)) - }() // should be on a goroutine so the test can make progress. - return nil + return serverNodeID, serverResponse, nil }, ).AnyTimes() + + // Handle bandwidth tracking calls from client. + networkClient.EXPECT().trackBandwidth(gomock.Any(), gomock.Any()).AnyTimes() + + // The server should expect to "send" a response to the client. sender.EXPECT().SendAppResponse( gomock.Any(), // ctx clientNodeID, @@ -114,7 +141,9 @@ func sendRangeRequest( // reserialize the response and pass it to the client to complete the handling. responseBytes, err := proto.Marshal(response.ToProto()) require.NoError(err) - require.NoError(networkClient.appResponse(context.Background(), serverNodeID, requestID, responseBytes)) + + serverResponseChan <- responseBytes + return nil }, ).AnyTimes() @@ -282,7 +311,7 @@ func TestGetRangeProof(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { require := require.New(t) - proof, err := sendRangeRequest(t, test.db, test.request, 1, test.modifyResponse) + proof, err := sendRangeProofRequest(t, test.db, test.request, 1, test.modifyResponse) require.ErrorIs(err, test.expectedErr) if test.expectedErr != nil { return @@ -598,7 +627,7 @@ func TestRangeProofRetries(t *testing.T) { response.KeyValues = nil } } - proof, err := sendRangeRequest(t, db, request, uint32(maxRequests), modifyResponse) + proof, err := sendRangeProofRequest(t, db, request, maxRequests, modifyResponse) require.NoError(err) require.Len(proof.KeyValues, keyCount) diff --git a/x/sync/mock_network_client.go b/x/sync/mock_network_client.go new file mode 100644 index 000000000000..7f3e781acfbd --- /dev/null +++ b/x/sync/mock_network_client.go @@ -0,0 +1,136 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: x/sync/network_client.go + +// Package sync is a generated GoMock package. +package sync + +import ( + context "context" + reflect "reflect" + + ids "github.com/ava-labs/avalanchego/ids" + version "github.com/ava-labs/avalanchego/version" + gomock "github.com/golang/mock/gomock" +) + +// MocknetworkClient is a mock of networkClient interface. +type MocknetworkClient struct { + ctrl *gomock.Controller + recorder *MocknetworkClientMockRecorder +} + +// MocknetworkClientMockRecorder is the mock recorder for MocknetworkClient. +type MocknetworkClientMockRecorder struct { + mock *MocknetworkClient +} + +// NewMocknetworkClient creates a new mock instance. +func NewMocknetworkClient(ctrl *gomock.Controller) *MocknetworkClient { + mock := &MocknetworkClient{ctrl: ctrl} + mock.recorder = &MocknetworkClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MocknetworkClient) EXPECT() *MocknetworkClientMockRecorder { + return m.recorder +} + +// appRequestFailed mocks base method. +func (m *MocknetworkClient) appRequestFailed(arg0 context.Context, arg1 ids.NodeID, arg2 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "appRequestFailed", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// appRequestFailed indicates an expected call of appRequestFailed. +func (mr *MocknetworkClientMockRecorder) appRequestFailed(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "appRequestFailed", reflect.TypeOf((*MocknetworkClient)(nil).appRequestFailed), arg0, arg1, arg2) +} + +// appResponse mocks base method. +func (m *MocknetworkClient) appResponse(arg0 context.Context, arg1 ids.NodeID, arg2 uint32, arg3 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "appResponse", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// appResponse indicates an expected call of appResponse. +func (mr *MocknetworkClientMockRecorder) appResponse(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "appResponse", reflect.TypeOf((*MocknetworkClient)(nil).appResponse), arg0, arg1, arg2, arg3) +} + +// connected mocks base method. +func (m *MocknetworkClient) connected(arg0 context.Context, arg1 ids.NodeID, arg2 *version.Application) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "connected", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// connected indicates an expected call of connected. +func (mr *MocknetworkClientMockRecorder) connected(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "connected", reflect.TypeOf((*MocknetworkClient)(nil).connected), arg0, arg1, arg2) +} + +// disconnected mocks base method. +func (m *MocknetworkClient) disconnected(arg0 context.Context, arg1 ids.NodeID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "disconnected", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// disconnected indicates an expected call of disconnected. +func (mr *MocknetworkClientMockRecorder) disconnected(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "disconnected", reflect.TypeOf((*MocknetworkClient)(nil).disconnected), arg0, arg1) +} + +// request mocks base method. +func (m *MocknetworkClient) request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "request", ctx, nodeID, request) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// request indicates an expected call of request. +func (mr *MocknetworkClientMockRecorder) request(ctx, nodeID, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "request", reflect.TypeOf((*MocknetworkClient)(nil).request), ctx, nodeID, request) +} + +// requestAny mocks base method. +func (m *MocknetworkClient) requestAny(ctx context.Context, minVersion *version.Application, request []byte) (ids.NodeID, []byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "requestAny", ctx, minVersion, request) + ret0, _ := ret[0].(ids.NodeID) + ret1, _ := ret[1].([]byte) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// requestAny indicates an expected call of requestAny. +func (mr *MocknetworkClientMockRecorder) requestAny(ctx, minVersion, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "requestAny", reflect.TypeOf((*MocknetworkClient)(nil).requestAny), ctx, minVersion, request) +} + +// trackBandwidth mocks base method. +func (m *MocknetworkClient) trackBandwidth(nodeID ids.NodeID, bandwidth float64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "trackBandwidth", nodeID, bandwidth) +} + +// trackBandwidth indicates an expected call of trackBandwidth. +func (mr *MocknetworkClientMockRecorder) trackBandwidth(nodeID, bandwidth interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "trackBandwidth", reflect.TypeOf((*MocknetworkClient)(nil).trackBandwidth), nodeID, bandwidth) +} From 816426e1f21c20b230b76dc24d6f90032500b1e8 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Thu, 6 Jul 2023 10:07:26 -0400 Subject: [PATCH 25/28] fix change proof tests --- x/sync/client_test.go | 113 +++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 45 deletions(-) diff --git a/x/sync/client_test.go b/x/sync/client_test.go index 735ed1ba7eb1..a49a1fdf924d 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -6,7 +6,6 @@ package sync import ( "context" "math/rand" - "sync" "testing" "time" @@ -21,7 +20,6 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/x/merkledb" @@ -64,7 +62,7 @@ func sendRangeProofRequest( // Sends messages from server to client. sender = common.NewMockSender(ctrl) - // Server the range proof. + // Serves the range proof. server = NewNetworkServer(sender, serverDB, logging.NoLog{}) clientNodeID, serverNodeID = ids.GenerateTestNodeID(), ids.GenerateTestNodeID() @@ -91,6 +89,8 @@ func sendRangeProofRequest( ctx, cancel = context.WithCancel(context.Background()) ) + defer cancel() + networkClient.EXPECT().requestAny( gomock.Any(), // ctx gomock.Any(), // min version @@ -326,61 +326,84 @@ func TestGetRangeProof(t *testing.T) { } } -func sendChangeRequest( +func sendChangeProofRequest( t *testing.T, db DB, verificationDB DB, request *pb.SyncGetChangeProofRequest, - maxAttempts uint32, + maxAttempts int, modifyResponse func(*merkledb.ChangeProof), ) (*merkledb.ChangeProof, error) { t.Helper() - var wg sync.WaitGroup - defer wg.Wait() // wait for goroutines spawned - require := require.New(t) ctrl := gomock.NewController(t) defer ctrl.Finish() - sender := common.NewMockSender(ctrl) - handler := NewNetworkServer(sender, db, logging.NoLog{}) - clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() - networkClient := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}) - require.NoError(networkClient.connected(context.Background(), serverNodeID, version.CurrentApp)) - client := NewClient(&ClientConfig{ - NetworkClient: networkClient, - Metrics: &mockMetrics{}, - Log: logging.NoLog{}, - }) - - ctx, cancel := context.WithCancel(context.Background()) - deadline := time.Now().Add(1 * time.Hour) // enough time to complete a request - defer cancel() // avoid leaking a goroutine - - expectedSendNodeIDs := set.NewSet[ids.NodeID](1) - expectedSendNodeIDs.Add(serverNodeID) - sender.EXPECT().SendAppRequest( - gomock.Any(), // ctx - expectedSendNodeIDs, // {serverNodeID} - gomock.Any(), // requestID - gomock.Any(), // requestBytes + var ( + // Number of calls from the client to the server so far. + numAttempts int + + // Sends messages from server to client. + sender = common.NewMockSender(ctrl) + + // Serves the change proof. + server = NewNetworkServer(sender, db, logging.NoLog{}) + + clientNodeID, serverNodeID = ids.GenerateTestNodeID(), ids.GenerateTestNodeID() + + // "Sends" the request from the client to the server and + // "receives" the response from the server. In reality, + // it just invokes the server's method and receives + // the response on [serverResponseChan]. + networkClient = NewMocknetworkClient(ctrl) + + serverResponseChan = make(chan []byte, 1) + + // The client fetching a change proof. + client = NewClient(&ClientConfig{ + NetworkClient: networkClient, + Metrics: &mockMetrics{}, + Log: logging.NoLog{}, + }) + + // The context used in client.GetChangeProof. + // Canceled after the first response is received because + // the client will keep sending requests until its context + // expires or it succeeds. + ctx, cancel = context.WithCancel(context.Background()) + ) + + defer cancel() // avoid leaking a goroutine + + networkClient.EXPECT().requestAny( + gomock.Any(), // ctx + gomock.Any(), // min version + gomock.Any(), // request ).DoAndReturn( - func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, requestBytes []byte) error { - // limit the number of attempts to [maxAttempts] by cancelling the context if needed. - if requestID >= maxAttempts { - cancel() - return ctx.Err() + func(_ context.Context, _ *version.Application, request []byte) (ids.NodeID, []byte, error) { + go func() { + // Get response from server + require.NoError(server.AppRequest(context.Background(), clientNodeID, 0, time.Now().Add(time.Hour), request)) + }() + + // Wait for response from server + serverResponse := <-serverResponseChan + + numAttempts++ + + if numAttempts >= maxAttempts { + defer cancel() } - wg.Add(1) - go func() { - defer wg.Done() - require.NoError(handler.AppRequest(ctx, clientNodeID, requestID, deadline, requestBytes)) - }() // should be on a goroutine so the test can make progress. - return nil + return serverNodeID, serverResponse, nil }, ).AnyTimes() + + // Handle bandwidth tracking calls from client. + networkClient.EXPECT().trackBandwidth(gomock.Any(), gomock.Any()).AnyTimes() + + // The server should expect to "send" a response to the client. sender.EXPECT().SendAppResponse( gomock.Any(), // ctx clientNodeID, @@ -393,8 +416,6 @@ func sendChangeRequest( require.NoError(proto.Unmarshal(responseBytes, &responseProto)) var changeProof merkledb.ChangeProof - // TODO when the client/server support including range proofs in the response, - // this will need to be updated. require.NoError(changeProof.UnmarshalProto(responseProto.GetChangeProof())) // modify if needed @@ -409,7 +430,9 @@ func sendChangeRequest( }, }) require.NoError(err) - require.NoError(networkClient.appResponse(context.Background(), serverNodeID, requestID, responseBytes)) + + serverResponseChan <- responseBytes + return nil }, ).AnyTimes() @@ -578,7 +601,7 @@ func TestGetChangeProof(t *testing.T) { t.Run(name, func(t *testing.T) { require := require.New(t) - proof, err := sendChangeRequest(t, trieDB, verificationDB, test.request, 1, test.modifyResponse) + proof, err := sendChangeProofRequest(t, trieDB, verificationDB, test.request, 1, test.modifyResponse) require.ErrorIs(err, test.expectedErr) if test.expectedErr != nil { return From 5ea4efc65942989d69ac9e6ef27031c0088c448e Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Thu, 6 Jul 2023 13:12:38 -0400 Subject: [PATCH 26/28] add test --- x/sync/client_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/x/sync/client_test.go b/x/sync/client_test.go index a49a1fdf924d..3f528568b07f 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -656,3 +656,46 @@ func TestRangeProofRetries(t *testing.T) { require.Equal(responseCount, maxRequests) // check the client performed retries. } + +// Test that a failure to send an AppRequest is propagated +// and returned by GetRangeProof and GetChangeProof. +func TestAppRequestSendFailed(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + networkClient := NewMocknetworkClient(ctrl) + + client := NewClient( + &ClientConfig{ + NetworkClient: networkClient, + Log: logging.NoLog{}, + Metrics: &mockMetrics{}, + }, + ) + + // Mock failure to send app request + networkClient.EXPECT().requestAny( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(ids.NodeID{}, nil, errAppRequestSendFailed).Times(2) + + networkClient.EXPECT().trackBandwidth( + gomock.Any(), + gomock.Any(), + ).Times(2) + + _, err := client.GetChangeProof( + context.Background(), + &pb.SyncGetChangeProofRequest{}, + nil, // database is unused + ) + require.ErrorIs(err, errAppRequestSendFailed) + + _, err = client.GetRangeProof( + context.Background(), + &pb.SyncGetRangeProofRequest{}, + ) + require.ErrorIs(err, errAppRequestSendFailed) +} From e1dd22d75983c6738e9aec7ba2d1a43780ca5338 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Thu, 6 Jul 2023 13:17:33 -0400 Subject: [PATCH 27/28] comment --- x/sync/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x/sync/client.go b/x/sync/client.go index 6f24737d2a9e..11ea8ee28e57 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -184,6 +184,8 @@ func (c *clientImpl) GetRangeProof(ctx context.Context, req *pb.SyncGetRangeProo // [parseFn] parses the raw response. // If the request is unsuccessful or the response can't be parsed, // retries the request to a different peer until [ctx] expires. +// Returns [errAppRequestSendFailed] if we fail to send an AppRequest. +// This should be treated as a fatal error. func getAndParse[T any]( ctx context.Context, client *clientImpl, From 2d8d200edbc430400330ee6e8163e895b779ac95 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Tue, 1 Aug 2023 13:17:51 -0400 Subject: [PATCH 28/28] fix unit test --- x/sync/client_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/x/sync/client_test.go b/x/sync/client_test.go index 43038f34d31a..715c03601c68 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -681,11 +681,6 @@ func TestAppRequestSendFailed(t *testing.T) { gomock.Any(), ).Return(ids.NodeID{}, nil, errAppRequestSendFailed).Times(2) - networkClient.EXPECT().TrackBandwidth( - gomock.Any(), - gomock.Any(), - ).Times(2) - _, err := client.GetChangeProof( context.Background(), &pb.SyncGetChangeProofRequest{},