Skip to content

Commit

Permalink
fix: getRemoteNodeClosestNodes shouldn't throw connection errors
Browse files Browse the repository at this point in the history
`getRemoteNodeClosestNodes` was throwing an connection error in certain conditions. If it failed to connect to a node it should've just skipped that node.

#418
  • Loading branch information
tegefaulkes committed Sep 21, 2022
1 parent f1ab40b commit a4470ad
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 45 deletions.
92 changes: 47 additions & 45 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import * as nodesErrors from './errors';
import GRPCClientAgent from '../agent/GRPCClientAgent';
import * as validationUtils from '../validation/utils';
import * as networkUtils from '../network/utils';
import * as agentErrors from '../agent/errors';
import * as grpcErrors from '../grpc/errors';
import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb';
import { timerStart } from '../utils';

Expand Down Expand Up @@ -190,11 +188,7 @@ class NodeConnectionManager {
return [
async (e) => {
await release();
if (
e instanceof nodesErrors.ErrorNodeConnectionDestroyed ||
e instanceof grpcErrors.ErrorGRPC ||
e instanceof agentErrors.ErrorAgentClientDestroyed
) {
if (nodesUtils.isConnectionError(e)) {
// Error with connection, shutting connection down
await this.destroyConnection(targetNodeId);
}
Expand Down Expand Up @@ -467,9 +461,6 @@ class NodeConnectionManager {
// Let foundTarget: boolean = false;
let foundAddress: NodeAddress | undefined = undefined;
// Get the closest alpha nodes to the target node (set as shortlist)
// FIXME? this is an array. Shouldn't it be a set?
// It's possible for this to grow faster than we can consume it,
// doubly so if we allow duplicates
const shortlist = await this.nodeGraph.getClosestNodes(
targetNodeId,
this.initialClosestNodes,
Expand All @@ -484,11 +475,10 @@ class NodeConnectionManager {
// Not sufficient to simply check if there's already a pre-existing connection
// in nodeConnections - what if there's been more than 1 invocation of
// getClosestGlobalNodes()?
const contacted: { [nodeId: string]: boolean } = {};
const contacted: Record<string, boolean> = {};
// Iterate until we've found and contacted k nodes
while (Object.keys(contacted).length <= this.nodeGraph.nodeBucketLimit) {
if (signal?.aborted) throw new nodesErrors.ErrorNodeAborted();
// While (!foundTarget) {
if (signal?.aborted) throw signal.reason;
// Remove the node from the front of the array
const nextNode = shortlist.shift();
// If we have no nodes left in the shortlist, then stop
Expand Down Expand Up @@ -522,6 +512,7 @@ class NodeConnectionManager {
targetNodeId,
timer,
);
if (foundClosest.length === 0) continue;
// Check to see if any of these are the target node. At the same time, add
// them to the shortlist
for (const [nodeId, nodeData] of foundClosest) {
Expand Down Expand Up @@ -585,36 +576,43 @@ class NodeConnectionManager {
// Construct the message
const nodeIdMessage = new nodesPB.Node();
nodeIdMessage.setNodeId(nodesUtils.encodeNodeId(targetNodeId));
// Send through client
return this.withConnF(
nodeId,
async (connection) => {
const client = connection.getClient();
const response = await client.nodesClosestLocalNodesGet(nodeIdMessage);
const nodes: Array<[NodeId, NodeData]> = [];
// Loop over each map element (from the returned response) and populate nodes
response.getNodeTableMap().forEach((address, nodeIdString: string) => {
const nodeId = nodesUtils.decodeNodeId(nodeIdString);
// If the nodeId is not valid we don't add it to the list of nodes
if (nodeId != null) {
nodes.push([
nodeId,
{
address: {
host: address.getHost() as Host | Hostname,
port: address.getPort() as Port,
},
// Not really needed
// But if it's needed then we need to add the information to the proto definition
lastUpdated: 0,
try {
// Send through client
const response = await this.withConnF(
nodeId,
async (connection) => {
const client = connection.getClient();
return await client.nodesClosestLocalNodesGet(nodeIdMessage);
},
timer,
);
const nodes: Array<[NodeId, NodeData]> = [];
// Loop over each map element (from the returned response) and populate nodes
response.getNodeTableMap().forEach((address, nodeIdString: string) => {
const nodeId = nodesUtils.decodeNodeId(nodeIdString);
// If the nodeId is not valid we don't add it to the list of nodes
if (nodeId != null) {
nodes.push([
nodeId,
{
address: {
host: address.getHost() as Host | Hostname,
port: address.getPort() as Port,
},
]);
}
});
return nodes;
},
timer,
);
// Not really needed
// But if it's needed then we need to add the information to the proto definition
lastUpdated: 0,
},
]);
}
});
return nodes;
} catch (e) {
if (nodesUtils.isConnectionError(e)) {
return [];
}
throw e;
}
}

/**
Expand All @@ -625,7 +623,10 @@ class NodeConnectionManager {
* non-blocking
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async syncNodeGraph(block: boolean = true, timer?: Timer) {
public async syncNodeGraph(
block: boolean = true,
timer?: Timer,
): Promise<void> {
this.logger.info('Syncing nodeGraph');
for (const seedNodeId of this.getSeedNodes()) {
// Check if the connection is viable
Expand All @@ -640,8 +641,9 @@ class NodeConnectionManager {
this.keyManager.getNodeId(),
timer,
);
const localNodeId = this.keyManager.getNodeId();
for (const [nodeId, nodeData] of closestNodes) {
if (!nodeId.equals(this.keyManager.getNodeId())) {
if (!localNodeId.equals(nodeId)) {
const pingAndSetTask = await this.taskManager.scheduleTask({
delay: 0,
handlerId: this.pingAndSetNodeHandlerId,
Expand Down Expand Up @@ -798,7 +800,6 @@ class NodeConnectionManager {
const signature = await this.keyManager.signWithRootKeyPair(
Buffer.from(proxyAddress),
);
if (signal?.aborted) throw signal.reason;
// FIXME: this needs to handle aborting
const holePunchPromises = Array.from(this.getSeedNodes(), (seedNodeId) => {
return this.sendHolePunchMessage(
Expand All @@ -817,6 +818,7 @@ class NodeConnectionManager {
);

const abortPromise = new Promise((_resolve, reject) => {
if (signal?.aborted) throw signal.reason;
signal?.addEventListener('abort', () => reject(signal.reason));
});

Expand Down
12 changes: 12 additions & 0 deletions src/nodes/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import type { KeyPath } from '@matrixai/db';
import { IdInternal } from '@matrixai/id';
import lexi from 'lexicographic-integer';
import { utils as dbUtils } from '@matrixai/db';
import * as nodesErrors from './errors';
import { bytes2BigInt } from '../utils';
import * as keysUtils from '../keys/utils';
import * as grpcErrors from '../grpc/errors';
import * as agentErrors from '../agent/errors';

const sepBuffer = dbUtils.sep;

Expand Down Expand Up @@ -310,6 +313,14 @@ function generateRandomNodeIdForBucket(
return xOrNodeId(nodeId, randomDistanceForBucket);
}

function isConnectionError(e): boolean {
return (
e instanceof nodesErrors.ErrorNodeConnectionDestroyed ||
e instanceof grpcErrors.ErrorGRPC ||
e instanceof agentErrors.ErrorAgentClientDestroyed
);
}

export {
sepBuffer,
encodeNodeId,
Expand All @@ -330,4 +341,5 @@ export {
generateRandomDistanceForBucket,
xOrNodeId,
generateRandomNodeIdForBucket,
isConnectionError,
};

0 comments on commit a4470ad

Please sign in to comment.