Skip to content

Commit

Permalink
chore: lintfix
Browse files Browse the repository at this point in the history
[ci-skip]
  • Loading branch information
amydevs committed Oct 25, 2023
1 parent c34cede commit 08c579e
Show file tree
Hide file tree
Showing 23 changed files with 299 additions and 151 deletions.
10 changes: 5 additions & 5 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class PolykeyAgent {
mdns: {
groups: config.defaultsSystem.mdnsGroups,
port: config.defaultsSystem.mdnsPort,
}
},
});
// This can only happen if the caller didn't specify the node path and the
// automatic detection failed
Expand Down Expand Up @@ -327,14 +327,14 @@ class PolykeyAgent {
logger: logger.getChild(NodeGraph.name),
});
mdns = new MDNS({
logger: logger.getChild(MDNS.name)
logger: logger.getChild(MDNS.name),
});
await mdns.start({
id: keyRing.getNodeId().toBuffer().readUint16BE(),
hostname: nodesUtils.encodeNodeId(keyRing.getNodeId()),
groups: optionsDefaulted.mdns.groups,
port: optionsDefaulted.mdns.port,
})
});
// Remove your own node ID if provided as a seed node
const nodeIdOwnEncoded = nodesUtils.encodeNodeId(keyRing.getNodeId());
delete optionsDefaulted.seedNodes[nodeIdOwnEncoded];
Expand Down Expand Up @@ -670,7 +670,7 @@ class PolykeyAgent {
mdns: {
groups: config.defaultsSystem.mdnsGroups,
port: config.defaultsSystem.mdnsPort,
}
},
});
// Register event handlers
this.certManager.addEventListener(
Expand Down Expand Up @@ -745,7 +745,7 @@ class PolykeyAgent {
hostname: nodesUtils.encodeNodeId(this.keyRing.getNodeId()),
groups: optionsDefaulted.mdns.groups,
port: optionsDefaulted.mdns.port,
})
});
await this.nodeManager.start();
await this.nodeConnectionManager.start({
host: optionsDefaulted.agentServiceHost,
Expand Down
4 changes: 3 additions & 1 deletion src/client/handlers/NodesAdd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ class NodesAdd extends UnaryHandler<
// Pinging to authenticate the node
if (
(input.ping ?? false) &&
!(await nodeManager.pingNode(nodeId, [{ host, port, scopes: ['external'] }]))
!(await nodeManager.pingNode(nodeId, [
{ host, port, scopes: ['external'] },
]))
) {
throw new nodeErrors.ErrorNodePingFailed(
'Failed to authenticate target node',
Expand Down
4 changes: 3 additions & 1 deletion src/client/handlers/NodesFind.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class NodesFind extends UnaryHandler<
},
);
const addresses = await nodeConnectionManager.findNodeAll(nodeId);
if (addresses.length === 0) throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
if (addresses.length === 0) {
throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
}

return { addresses };
};
Expand Down
2 changes: 1 addition & 1 deletion src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type NodesFindMessage = {
addresses: Array<AddressMessage>;
};

type NodesGetMessage = NodeAddressMessage & { bucketIndex: number };
type NodesGetMessage = NodeAddressMessage & { bucketIndex: number };

type NodesAddMessage = NodeAddressMessage & {
force?: boolean;
Expand Down
4 changes: 2 additions & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ const testnet: Record<string, NodeAddress> = {
v7v9ptvcdbdf8p4upok3prpmu3938ns8v4g45dib7sm5hqvvehv70: {
host: 'testnet.polykey.com' as Host,
port: 1314 as Port,
scopes: ['external']
scopes: ['external'],
},
v270ktdd3cs3mp1r3q3dkmick92bn927mii9or4sgroeogd1peqb0: {
host: 'testnet.polykey.com' as Host,
port: 1314 as Port,
scopes: ['external']
scopes: ['external'],
},
};

Expand Down
18 changes: 15 additions & 3 deletions src/network/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,17 +448,29 @@ async function resolveHostnames(
addresses: Array<NodeAddress>,
existingAddresses: Set<string> = new Set(),
): Promise<Array<{ host: Host; port: Port; scopes: Array<NodeAddressScope> }>> {
const final: Array<{ host: Host; port: Port; scopes: Array<NodeAddressScope> }> = [];
const final: Array<{
host: Host;
port: Port;
scopes: Array<NodeAddressScope>;
}> = [];
for (const address of addresses) {
if (isHost(address.host)) {
if (existingAddresses.has(`${address.host}|${address.port}`)) continue;
final.push({ host: address.host, port: address.port, scopes: address.scopes });
final.push({
host: address.host,
port: address.port,
scopes: address.scopes,
});
existingAddresses.add(`${address.host}|${address.port}`);
continue;
}
const resolvedAddresses = await resolveHostname(address.host);
for (const resolvedHost of resolvedAddresses) {
const newAddress = { host: resolvedHost, port: address.port, scopes: address.scopes };
const newAddress = {
host: resolvedHost,
port: address.port,
scopes: address.scopes,
};
if (!Validator.isValidIPv4String(resolvedHost)[0]) continue;
if (existingAddresses.has(`${resolvedHost}|${address.port}`)) continue;
final.push(newAddress);
Expand Down
127 changes: 79 additions & 48 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
TLSConfig,
} from '../network/types';
import type { ServerManifest } from '@matrixai/rpc';
import type { MDNS, ServicePOJO } from '@matrixai/mdns';
import Logger from '@matrixai/logger';
import { withF } from '@matrixai/resources';
import { ready, StartStop } from '@matrixai/async-init/dist/StartStop';
Expand All @@ -38,7 +39,7 @@ import {
} from '@matrixai/quic';
import { running, status } from '@matrixai/async-init';
import { RPCServer, middleware as rpcUtilsMiddleware } from '@matrixai/rpc';
import { MDNS, ServicePOJO, events as mdnsEvents, utils as mdnsUtils } from '@matrixai/mdns';
import { events as mdnsEvents, utils as mdnsUtils } from '@matrixai/mdns';
import NodeConnection from './NodeConnection';
import * as nodesUtils from './utils';
import * as nodesErrors from './errors';
Expand Down Expand Up @@ -711,7 +712,9 @@ class NodeConnectionManager {
if (addresses == null || addresses.length === 0) {
// Find the node
addresses = await this.findNodeAll(targetNodeId, undefined, ctx);
if (addresses.length === 0) throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
if (addresses.length === 0) {
throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
}
}
// Then we just get the connection, it should already exist.
return this.getConnectionWithAddresses(targetNodeId, addresses, ctx);
Expand Down Expand Up @@ -772,9 +775,12 @@ class NodeConnectionManager {
if (existingConnection != null) return existingConnection;
const targetNodeIdEncoded = nodesUtils.encodeNodeId(targetNodeId);
let timeoutDivisions = 0;
const addressGroups: { local: Array<NodeAddress>, external: Array<NodeAddress> } = { local: [], external: [] };
const addressGroups: {
local: Array<NodeAddress>;
external: Array<NodeAddress>;
} = { local: [], external: [] };
for (const address of addresses) {
let scope = address.scopes.includes('local') ? 'local' : 'external';
const scope = address.scopes.includes('local') ? 'local' : 'external';
// If this is the first time an addressGroup has had an address added, the timeout divisions must be incremented.
if (addressGroups[scope].length === 0) {
timeoutDivisions++;
Expand All @@ -794,7 +800,7 @@ class NodeConnectionManager {
addressGroups.local,
{
signal: ctx.signal,
timer: timeout
timer: timeout,
},
);
}
Expand All @@ -805,7 +811,7 @@ class NodeConnectionManager {
addressGroups.external,
{
signal: ctx.signal,
timer: timeout
timer: timeout,
},
);
}
Expand Down Expand Up @@ -950,7 +956,7 @@ class NodeConnectionManager {
address: {
host: Host;
port: Port;
scopes: Array<NodeAddressScope>
scopes: Array<NodeAddressScope>;
},
connectionsResults: Map<NodeIdString, ConnectionAndTimer>,
ctx: ContextTimed,
Expand All @@ -961,7 +967,9 @@ class NodeConnectionManager {
this.logger.debug(
`establishing single connection for address ${address.host}:${address.port}`,
);
const iceProm = !address.scopes?.includes('local') ? this.initiateHolePunch(nodeIds, ctx) : undefined;
const iceProm = !address.scopes?.includes('local')
? this.initiateHolePunch(nodeIds, ctx)
: undefined;
const connection =
await NodeConnection.createNodeConnection<ManifestClientAgent>(
{
Expand Down Expand Up @@ -1280,55 +1288,67 @@ class NodeConnectionManager {
@context ctx: ContextTimed,
): Promise<Array<NodeAddress>> {
const encodedNodeId = nodesUtils.encodeNodeId(targetNodeId);
this.logger.debug(
`Finding local addresses for ${encodedNodeId}`,
);
let addresses: Array<NodeAddress> = [];
this.logger.debug(`Finding local addresses for ${encodedNodeId}`);
const addresses: Array<NodeAddress> = [];
if (this.mdns == null) {
return addresses;
}
// First check if we already have an existing MDNS Service
const mdnsOptions = { type: "polykey", protocol: "udp" } as const;
let service = this.mdns.networkServices.get(mdnsUtils.toFqdn({ name: encodedNodeId, ...mdnsOptions }));
const mdnsOptions = { type: 'polykey', protocol: 'udp' } as const;
let service = this.mdns.networkServices.get(
mdnsUtils.toFqdn({ name: encodedNodeId, ...mdnsOptions }),
);
if (service == null) {
// setup promises
// Setup promises
ctx.signal.throwIfAborted();
const { p: abortP, rejectP: rejectAbortP } = utils.promise<never>();
const abortHandler = () => {
rejectAbortP(ctx.signal.reason);
};
ctx.signal.addEventListener('abort', abortHandler, { once: true });
const { p: serviceP, resolveP: resolveServiceP } = utils.promise<ServicePOJO>();
const { p: serviceP, resolveP: resolveServiceP } =
utils.promise<ServicePOJO>();
const handleEventMDNSService = (evt: mdnsEvents.EventMDNSService) => {
if (evt.detail.name === encodedNodeId) {
resolveServiceP(evt.detail);
}
};
this.mdns.addEventListener(mdnsEvents.EventMDNSService.name, handleEventMDNSService, { once: true });
// abort and restart query in case already running
this.mdns.addEventListener(
mdnsEvents.EventMDNSService.name,
handleEventMDNSService,
{ once: true },
);
// Abort and restart query in case already running
this.mdns.stopQuery(mdnsOptions);
this.mdns.startQuery(mdnsOptions);
// race promises to find node or timeout
// Race promises to find node or timeout
try {
service = await Promise.race([serviceP, abortP]);
} catch {
this.mdns.removeEventListener(mdnsEvents.EventMDNSService.name, handleEventMDNSService);
this.mdns.removeEventListener(
mdnsEvents.EventMDNSService.name,
handleEventMDNSService,
);
} finally {
this.mdns.stopQuery(mdnsOptions);
ctx.signal.removeEventListener('abort', abortHandler);
}
}
// if the service is not found, just return no addresses
// If the service is not found, just return no addresses
if (service == null) {
return addresses;
}
for (const host_ of service.hosts) {
let host: string;
switch (this.quicSocket.type) {
case 'ipv4':
if (quicUtils.isIPv4(host_)) host = host_;
else if (quicUtils.isIPv4MappedIPv6(host_)) host = quicUtils.fromIPv4MappedIPv6(host_);
else continue;
if (quicUtils.isIPv4(host_)) {
host = host_;
} else if (quicUtils.isIPv4MappedIPv6(host_)) {
host = quicUtils.fromIPv4MappedIPv6(host_);
} else {
continue;
}
break;
case 'ipv6':
if (quicUtils.isIPv6(host_)) host = host_;
Expand All @@ -1343,12 +1363,12 @@ class NodeConnectionManager {
addresses.push({
host: host as Host,
port: service.port as Port,
scopes: ['local']
scopes: ['local'],
});
this.logger.debug(
`found address for ${nodesUtils.encodeNodeId(targetNodeId)} at ${
host
}:${service.port}`,
`found address for ${nodesUtils.encodeNodeId(
targetNodeId,
)} at ${host}:${service.port}`,
);
}
return addresses;
Expand All @@ -1367,15 +1387,26 @@ class NodeConnectionManager {
ctx?: Partial<ContextTimed>,
): PromiseCancellable<Array<NodeAddress>>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@timedCancellable(true, (nodeConnectionManager: NodeConnectionManager) => nodeConnectionManager.connectionConnectTimeoutTime)
@timedCancellable(
true,
(nodeConnectionManager: NodeConnectionManager) =>
nodeConnectionManager.connectionConnectTimeoutTime,
)
public async findNodeAll(
targetNodeId: NodeId,
pingTimeoutTime: number | undefined,
@context ctx: ContextTimed,
): Promise<Array<NodeAddress>> {
const [localAddresses, kademliaAddress] = await Promise.allSettled([this.findNodeLocal(targetNodeId, ctx), this.findNode(targetNodeId, pingTimeoutTime, ctx)])
const addresses = localAddresses.status === 'fulfilled' ? localAddresses.value : [];
if (kademliaAddress.status === 'fulfilled' && kademliaAddress.value != null) {
const [localAddresses, kademliaAddress] = await Promise.allSettled([
this.findNodeLocal(targetNodeId, ctx),
this.findNode(targetNodeId, pingTimeoutTime, ctx),
]);
const addresses =
localAddresses.status === 'fulfilled' ? localAddresses.value : [];
if (
kademliaAddress.status === 'fulfilled' &&
kademliaAddress.value != null
) {
addresses.push(kademliaAddress.value);
}
return addresses;
Expand Down Expand Up @@ -1450,11 +1481,13 @@ class NodeConnectionManager {
if (
!(await this.pingNode(
nextNodeId,
[{
host: nextNodeAddress.address.host,
port: nextNodeAddress.address.port,
scopes: ['external']
}],
[
{
host: nextNodeAddress.address.host,
port: nextNodeAddress.address.port,
scopes: ['external'],
},
],
{
signal: ctx.signal,
timer: pingTimeoutTime ?? this.connectionConnectTimeoutTime,
Expand Down Expand Up @@ -1489,11 +1522,13 @@ class NodeConnectionManager {
nodeId.equals(targetNodeId) &&
(await this.pingNode(
nodeId,
[{
host: nextNodeAddress.address.host,
port: nextNodeAddress.address.port,
scopes: ['external']
}],
[
{
host: nextNodeAddress.address.host,
port: nextNodeAddress.address.port,
scopes: ['external'],
},
],
{
signal: ctx.signal,
timer: pingTimeoutTime ?? this.connectionConnectTimeoutTime,
Expand Down Expand Up @@ -1573,7 +1608,7 @@ class NodeConnectionManager {
address: {
host: result.host as Host | Hostname,
port: result.port as Port,
scopes: ['external']
scopes: ['external'],
},
// Not really needed
// But if it's needed then we need to add the information to the proto definition
Expand Down Expand Up @@ -1790,11 +1825,7 @@ class NodeConnectionManager {
@context ctx: ContextTimed,
): Promise<boolean> {
try {
await this.getConnectionWithAddresses(
nodeId,
addresses,
ctx,
);
await this.getConnectionWithAddresses(nodeId, addresses, ctx);
return true;
} catch {
return false;
Expand Down
Loading

0 comments on commit 08c579e

Please sign in to comment.