diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 353d3ba2d..72d3911af 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -26,8 +26,6 @@ import * as nodesErrors from './errors'; import GRPCClientAgent from '../agent/GRPCClientAgent'; import * as validationUtils from '../validation/utils'; import * as networkUtils from '../network/utils'; -import * as agentErrors from '../agent/errors'; -import * as grpcErrors from '../grpc/errors'; import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb'; import { timerStart } from '../utils'; @@ -190,11 +188,7 @@ class NodeConnectionManager { return [ async (e) => { await release(); - if ( - e instanceof nodesErrors.ErrorNodeConnectionDestroyed || - e instanceof grpcErrors.ErrorGRPC || - e instanceof agentErrors.ErrorAgentClientDestroyed - ) { + if (nodesUtils.isConnectionError(e)) { // Error with connection, shutting connection down await this.destroyConnection(targetNodeId); } @@ -467,9 +461,6 @@ class NodeConnectionManager { // Let foundTarget: boolean = false; let foundAddress: NodeAddress | undefined = undefined; // Get the closest alpha nodes to the target node (set as shortlist) - // FIXME? this is an array. Shouldn't it be a set? - // It's possible for this to grow faster than we can consume it, - // doubly so if we allow duplicates const shortlist = await this.nodeGraph.getClosestNodes( targetNodeId, this.initialClosestNodes, @@ -484,11 +475,10 @@ class NodeConnectionManager { // Not sufficient to simply check if there's already a pre-existing connection // in nodeConnections - what if there's been more than 1 invocation of // getClosestGlobalNodes()? - const contacted: { [nodeId: string]: boolean } = {}; + const contacted: Record = {}; // Iterate until we've found and contacted k nodes while (Object.keys(contacted).length <= this.nodeGraph.nodeBucketLimit) { - if (signal?.aborted) throw new nodesErrors.ErrorNodeAborted(); - // While (!foundTarget) { + if (signal?.aborted) throw signal.reason; // Remove the node from the front of the array const nextNode = shortlist.shift(); // If we have no nodes left in the shortlist, then stop @@ -522,6 +512,7 @@ class NodeConnectionManager { targetNodeId, timer, ); + if (foundClosest.length === 0) continue; // Check to see if any of these are the target node. At the same time, add // them to the shortlist for (const [nodeId, nodeData] of foundClosest) { @@ -585,36 +576,43 @@ class NodeConnectionManager { // Construct the message const nodeIdMessage = new nodesPB.Node(); nodeIdMessage.setNodeId(nodesUtils.encodeNodeId(targetNodeId)); - // Send through client - return this.withConnF( - nodeId, - async (connection) => { - const client = connection.getClient(); - const response = await client.nodesClosestLocalNodesGet(nodeIdMessage); - const nodes: Array<[NodeId, NodeData]> = []; - // Loop over each map element (from the returned response) and populate nodes - response.getNodeTableMap().forEach((address, nodeIdString: string) => { - const nodeId = nodesUtils.decodeNodeId(nodeIdString); - // If the nodeId is not valid we don't add it to the list of nodes - if (nodeId != null) { - nodes.push([ - nodeId, - { - address: { - host: address.getHost() as Host | Hostname, - port: address.getPort() as Port, - }, - // Not really needed - // But if it's needed then we need to add the information to the proto definition - lastUpdated: 0, + try { + // Send through client + const response = await this.withConnF( + nodeId, + async (connection) => { + const client = connection.getClient(); + return await client.nodesClosestLocalNodesGet(nodeIdMessage); + }, + timer, + ); + const nodes: Array<[NodeId, NodeData]> = []; + // Loop over each map element (from the returned response) and populate nodes + response.getNodeTableMap().forEach((address, nodeIdString: string) => { + const nodeId = nodesUtils.decodeNodeId(nodeIdString); + // If the nodeId is not valid we don't add it to the list of nodes + if (nodeId != null) { + nodes.push([ + nodeId, + { + address: { + host: address.getHost() as Host | Hostname, + port: address.getPort() as Port, }, - ]); - } - }); - return nodes; - }, - timer, - ); + // Not really needed + // But if it's needed then we need to add the information to the proto definition + lastUpdated: 0, + }, + ]); + } + }); + return nodes; + } catch (e) { + if (nodesUtils.isConnectionError(e)) { + return []; + } + throw e; + } } /** @@ -625,7 +623,10 @@ class NodeConnectionManager { * non-blocking */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) - public async syncNodeGraph(block: boolean = true, timer?: Timer) { + public async syncNodeGraph( + block: boolean = true, + timer?: Timer, + ): Promise { this.logger.info('Syncing nodeGraph'); for (const seedNodeId of this.getSeedNodes()) { // Check if the connection is viable @@ -640,8 +641,9 @@ class NodeConnectionManager { this.keyManager.getNodeId(), timer, ); + const localNodeId = this.keyManager.getNodeId(); for (const [nodeId, nodeData] of closestNodes) { - if (!nodeId.equals(this.keyManager.getNodeId())) { + if (!localNodeId.equals(nodeId)) { const pingAndSetTask = await this.taskManager.scheduleTask({ delay: 0, handlerId: this.pingAndSetNodeHandlerId, @@ -798,7 +800,6 @@ class NodeConnectionManager { const signature = await this.keyManager.signWithRootKeyPair( Buffer.from(proxyAddress), ); - if (signal?.aborted) throw signal.reason; // FIXME: this needs to handle aborting const holePunchPromises = Array.from(this.getSeedNodes(), (seedNodeId) => { return this.sendHolePunchMessage( @@ -817,6 +818,7 @@ class NodeConnectionManager { ); const abortPromise = new Promise((_resolve, reject) => { + if (signal?.aborted) throw signal.reason; signal?.addEventListener('abort', () => reject(signal.reason)); }); diff --git a/src/nodes/utils.ts b/src/nodes/utils.ts index 1fe3c799d..544b7bc55 100644 --- a/src/nodes/utils.ts +++ b/src/nodes/utils.ts @@ -8,8 +8,11 @@ import type { KeyPath } from '@matrixai/db'; import { IdInternal } from '@matrixai/id'; import lexi from 'lexicographic-integer'; import { utils as dbUtils } from '@matrixai/db'; +import * as nodesErrors from './errors'; import { bytes2BigInt } from '../utils'; import * as keysUtils from '../keys/utils'; +import * as grpcErrors from '../grpc/errors'; +import * as agentErrors from '../agent/errors'; const sepBuffer = dbUtils.sep; @@ -310,6 +313,14 @@ function generateRandomNodeIdForBucket( return xOrNodeId(nodeId, randomDistanceForBucket); } +function isConnectionError(e): boolean { + return ( + e instanceof nodesErrors.ErrorNodeConnectionDestroyed || + e instanceof grpcErrors.ErrorGRPC || + e instanceof agentErrors.ErrorAgentClientDestroyed + ); +} + export { sepBuffer, encodeNodeId, @@ -330,4 +341,5 @@ export { generateRandomDistanceForBucket, xOrNodeId, generateRandomNodeIdForBucket, + isConnectionError, };