Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate TaskManager into NodeGraph and Discovery #445

Merged
merged 40 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a69f513
feat: `PolykeyAgent.ts` using `TaskManager`
tegefaulkes Sep 12, 2022
66ee630
feat: updated `NodeManager` to use `TaskManager`
tegefaulkes Sep 12, 2022
41d3a00
feat: updated `NodeConnectionManager` to use `TaskManager`
tegefaulkes Sep 12, 2022
eb1b557
feat: adding cancellability to `NodeManager` handlers
tegefaulkes Sep 13, 2022
6e4322b
fix: small bug with `pingNode`
tegefaulkes Sep 14, 2022
27316f9
fix: bugs with `nodeConnectionManager.syncNodeGraph`
tegefaulkes Sep 14, 2022
f1ab40b
tests: cleaning up dependencies in tests
tegefaulkes Sep 14, 2022
a4470ad
fix: `getRemoteNodeClosestNodes` shouldn't throw connection errors
tegefaulkes Sep 14, 2022
ca5e675
fix: excessive connections from `refreshBuckets`
tegefaulkes Sep 15, 2022
206dceb
feat: `nodeConnectionManager.getClosestGlobalNodes` can optionally sk…
tegefaulkes Sep 16, 2022
c65e758
feat: added handler to detect promise deadlocks to `ExitHandlers.ts`
tegefaulkes Sep 16, 2022
f9b1dbe
fix: handerIds are derived from class and handler function names
tegefaulkes Sep 16, 2022
20ea395
fix: fixing up `setNode` garbage collection.
tegefaulkes Sep 16, 2022
26695b5
fix: refactored `nodeManager.setNode` garbage collection
tegefaulkes Sep 19, 2022
ca9af18
tests: proper stopping of `taskManager` in tests
tegefaulkes Sep 19, 2022
3be12d7
fix: updating task paths
tegefaulkes Sep 19, 2022
19bce20
feat: task handlers are now `timedCancellable`
tegefaulkes Sep 19, 2022
ad7c751
fix: depending on return from `updateTask` to check existence
tegefaulkes Sep 19, 2022
81dbac6
fix: replacing `private` with `protected`
tegefaulkes Sep 19, 2022
66181b4
fix: removing `Queue.ts`
tegefaulkes Sep 19, 2022
cd47c74
fix: removing `@ready` from `TaskManager.cancelTask`
tegefaulkes Sep 19, 2022
34c658e
docs: adding description to `isConnectionError`
tegefaulkes Sep 19, 2022
b9d248b
fix: added cancellability and blocking to `nodeManager.setNode`
tegefaulkes Sep 20, 2022
fe817a2
fix: extracted `refreshBucketsDelayJitter` into nodes utils
tegefaulkes Sep 20, 2022
ca2c966
fix: fixes to errors and adding un-recoverable error handlers
tegefaulkes Sep 20, 2022
97ce1d8
fix: moved 'syncNodeGraph` from `NodeConnectionManager` to `NodeManager`
tegefaulkes Sep 20, 2022
1229695
fix: `pingNode`s inside of `garbageCollectBucket` now have timeouts s…
tegefaulkes Sep 20, 2022
48298f8
fix: cleaning up ephemeral tasks when stopping `NodeManager`
tegefaulkes Sep 21, 2022
9f3d2c0
fix: cleaning up errors
tegefaulkes Sep 21, 2022
45360d4
fix: small fix to `updateRefreshBucketDelay`
tegefaulkes Sep 21, 2022
81e5532
fix: rollback of proxy changes, was out of scope for this PR
tegefaulkes Sep 21, 2022
a33ea26
fix: small fix to `garbageCollectBucket` concurrent pinging
tegefaulkes Sep 21, 2022
f11197c
fix: updated default timeout for `NodeConnectionManager.pingNode`
tegefaulkes Sep 21, 2022
0f729b1
fix: using Symbols for cancelling tasks
tegefaulkes Sep 21, 2022
607095b
fix: `TaskManager` should extend the `CreateDestroyStartStop` interface
tegefaulkes Sep 21, 2022
43027e7
fix: test wasn't overriding key-pair generation
tegefaulkes Sep 21, 2022
0267d3b
fix: `TaskManager`'s `stopProcessing` and `stopTasks` are now properl…
tegefaulkes Sep 21, 2022
ccc61a8
tests: slightly increasing timeouts for two tests
tegefaulkes Sep 21, 2022
b29ba9e
tests: general fixes for tests failing in CI
tegefaulkes Sep 21, 2022
23f470b
syntax: formatting change for `ExitHandlers.ts`
tegefaulkes Sep 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"@grpc/grpc-js": "1.6.7",
"@matrixai/async-cancellable": "^1.0.2",
"@matrixai/async-init": "^1.8.2",
"@matrixai/async-locks": "^3.1.2",
"@matrixai/async-locks": "^3.2.0",
"@matrixai/db": "^5.0.3",
"@matrixai/errors": "^1.1.3",
"@matrixai/id": "^3.3.3",
Expand Down
60 changes: 33 additions & 27 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import process from 'process';
import Logger from '@matrixai/logger';
import { DB } from '@matrixai/db';
import { CreateDestroyStartStop } from '@matrixai/async-init/dist/CreateDestroyStartStop';
import Queue from './nodes/Queue';
import * as networkUtils from './network/utils';
import KeyManager from './keys/KeyManager';
import Status from './status/Status';
Expand All @@ -35,6 +34,7 @@ import * as errors from './errors';
import * as utils from './utils';
import * as keysUtils from './keys/utils';
import * as nodesUtils from './nodes/utils';
import TaskManager from './tasks/TaskManager';

type NetworkConfig = {
forwardHost?: Host;
Expand Down Expand Up @@ -87,8 +87,8 @@ class PolykeyAgent {
acl,
gestaltGraph,
proxy,
taskManager,
nodeGraph,
queue,
nodeConnectionManager,
nodeManager,
discovery,
Expand Down Expand Up @@ -134,8 +134,8 @@ class PolykeyAgent {
acl?: ACL;
gestaltGraph?: GestaltGraph;
proxy?: Proxy;
taskManager?: TaskManager;
nodeGraph?: NodeGraph;
queue?: Queue;
nodeConnectionManager?: NodeConnectionManager;
nodeManager?: NodeManager;
discovery?: Discovery;
Expand Down Expand Up @@ -285,18 +285,21 @@ class PolykeyAgent {
keyManager,
logger: logger.getChild(NodeGraph.name),
}));
queue =
queue ??
new Queue({
logger: logger.getChild(Queue.name),
});
taskManager =
taskManager ??
(await TaskManager.createTaskManager({
db,
fresh,
lazy: true,
logger,
}));
nodeConnectionManager =
nodeConnectionManager ??
new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
queue,
taskManager,
seedNodes,
...nodeConnectionManagerConfig_,
logger: logger.getChild(NodeConnectionManager.name),
Expand All @@ -309,7 +312,7 @@ class PolykeyAgent {
keyManager,
nodeGraph,
nodeConnectionManager,
queue,
taskManager,
logger: logger.getChild(NodeManager.name),
});
await nodeManager.start();
Expand Down Expand Up @@ -373,6 +376,7 @@ class PolykeyAgent {
await notificationsManager?.stop();
await vaultManager?.stop();
await discovery?.stop();
await taskManager?.stop();
await proxy?.stop();
await gestaltGraph?.stop();
await acl?.stop();
Expand All @@ -396,7 +400,7 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
queue,
taskManager,
nodeConnectionManager,
nodeManager,
discovery,
Expand Down Expand Up @@ -429,7 +433,7 @@ class PolykeyAgent {
public readonly gestaltGraph: GestaltGraph;
public readonly proxy: Proxy;
public readonly nodeGraph: NodeGraph;
public readonly queue: Queue;
public readonly taskManager: TaskManager;
public readonly nodeConnectionManager: NodeConnectionManager;
public readonly nodeManager: NodeManager;
public readonly discovery: Discovery;
Expand All @@ -454,7 +458,7 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
queue,
taskManager,
nodeConnectionManager,
nodeManager,
discovery,
Expand All @@ -478,7 +482,7 @@ class PolykeyAgent {
gestaltGraph: GestaltGraph;
proxy: Proxy;
nodeGraph: NodeGraph;
queue: Queue;
taskManager: TaskManager;
nodeConnectionManager: NodeConnectionManager;
nodeManager: NodeManager;
discovery: Discovery;
Expand All @@ -504,7 +508,7 @@ class PolykeyAgent {
this.proxy = proxy;
this.discovery = discovery;
this.nodeGraph = nodeGraph;
this.queue = queue;
this.taskManager = taskManager;
this.nodeConnectionManager = nodeConnectionManager;
this.nodeManager = nodeManager;
this.vaultManager = vaultManager;
Expand Down Expand Up @@ -578,14 +582,10 @@ class PolykeyAgent {
);
// Reverse connection was established and authenticated,
// add it to the node graph
await this.nodeManager.setNode(
data.remoteNodeId,
{
host: data.remoteHost,
port: data.remotePort,
},
false,
);
await this.nodeManager.setNode(data.remoteNodeId, {
host: data.remoteHost,
port: data.remotePort,
});
}
},
);
Expand Down Expand Up @@ -667,15 +667,16 @@ class PolykeyAgent {
proxyPort: networkConfig_.proxyPort,
tlsConfig,
});
await this.queue.start();
await this.taskManager.start({ fresh, lazy: true });
await this.nodeManager.start();
await this.nodeConnectionManager.start({ nodeManager: this.nodeManager });
await this.nodeGraph.start({ fresh });
await this.nodeConnectionManager.syncNodeGraph(false);
await this.nodeManager.syncNodeGraph(false);
await this.discovery.start({ fresh });
await this.vaultManager.start({ fresh });
await this.notificationsManager.start({ fresh });
await this.sessionManager.start({ fresh });
await this.taskManager.startProcessing();
await this.status.finishStart({
pid: process.pid,
nodeId: this.keyManager.getNodeId(),
Expand All @@ -693,14 +694,16 @@ class PolykeyAgent {
this.logger.warn(`Failed Starting ${this.constructor.name}`);
this.events.removeAllListeners();
await this.status?.beginStop({ pid: process.pid });
await this.taskManager?.stopProcessing();
await this.taskManager?.stopTasks();
await this.sessionManager?.stop();
await this.notificationsManager?.stop();
await this.vaultManager?.stop();
await this.discovery?.stop();
await this.queue?.stop();
await this.nodeGraph?.stop();
await this.nodeConnectionManager?.stop();
await this.nodeManager?.stop();
await this.taskManager?.stop();
await this.proxy?.stop();
await this.grpcServerAgent?.stop();
await this.grpcServerClient?.stop();
Expand All @@ -723,14 +726,16 @@ class PolykeyAgent {
this.logger.info(`Stopping ${this.constructor.name}`);
this.events.removeAllListeners();
await this.status.beginStop({ pid: process.pid });
await this.taskManager.stopProcessing();
await this.taskManager.stopTasks();
await this.sessionManager.stop();
await this.notificationsManager.stop();
await this.vaultManager.stop();
await this.discovery.stop();
await this.nodeConnectionManager.stop();
await this.nodeGraph.stop();
await this.nodeManager.stop();
await this.queue.stop();
await this.taskManager.stop();
await this.proxy.stop();
await this.grpcServerAgent.stop();
await this.grpcServerClient.stop();
Expand All @@ -755,6 +760,7 @@ class PolykeyAgent {
await this.discovery.destroy();
await this.nodeGraph.destroy();
await this.gestaltGraph.destroy();
await this.taskManager.destroy();
await this.acl.destroy();
await this.sigchain.destroy();
await this.identitiesManager.destroy();
Expand Down
32 changes: 27 additions & 5 deletions src/bin/errors.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
import ErrorPolykey from '../ErrorPolykey';
import sysexits from '../utils/sysexits';

class ErrorCLI<T> extends ErrorPolykey<T> {}
class ErrorBin<T> extends ErrorPolykey<T> {}

class ErrorBinUncaughtException<T> extends ErrorBin<T> {
static description = '';
exitCode = sysexits.SOFTWARE;
}

class ErrorBinUnhandledRejection<T> extends ErrorBin<T> {
static description = '';
exitCode = sysexits.SOFTWARE;
}

class ErrorBinAsynchronousDeadlock<T> extends ErrorBin<T> {
static description =
'PolykeyAgent process exited unexpectedly, likely due to promise deadlock';
exitCode = sysexits.SOFTWARE;
}

class ErrorCLI<T> extends ErrorBin<T> {}

class ErrorCLINodePath<T> extends ErrorCLI<T> {
static description = 'Cannot derive default node path from unknown platform';
Expand Down Expand Up @@ -49,17 +67,21 @@ class ErrorCLIPolykeyAgentProcess<T> extends ErrorCLI<T> {
exitCode = sysexits.OSERR;
}

class ErrorNodeFindFailed<T> extends ErrorCLI<T> {
class ErrorCLINodeFindFailed<T> extends ErrorCLI<T> {
static description = 'Failed to find the node in the DHT';
exitCode = 1;
}

class ErrorNodePingFailed<T> extends ErrorCLI<T> {
class ErrorCLINodePingFailed<T> extends ErrorCLI<T> {
static description = 'Node was not online or not found.';
exitCode = 1;
}

export {
ErrorBin,
ErrorBinUncaughtException,
ErrorBinUnhandledRejection,
ErrorBinAsynchronousDeadlock,
ErrorCLI,
ErrorCLINodePath,
ErrorCLIClientOptions,
Expand All @@ -70,6 +92,6 @@ export {
ErrorCLIFileRead,
ErrorCLIPolykeyAgentStatus,
ErrorCLIPolykeyAgentProcess,
ErrorNodeFindFailed,
ErrorNodePingFailed,
ErrorCLINodeFindFailed,
ErrorCLINodePingFailed,
};
2 changes: 1 addition & 1 deletion src/bin/nodes/CommandFind.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class CommandFind extends CommandPolykey {
);
// Like ping it should error when failing to find node for automation reasons.
if (!result.success) {
throw new binErrors.ErrorNodeFindFailed(result.message);
throw new binErrors.ErrorCLINodeFindFailed(result.message);
}
} finally {
if (pkClient! != null) await pkClient.stop();
Expand Down
4 changes: 2 additions & 2 deletions src/bin/nodes/CommandPing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CommandPing extends CommandPolykey {
);
} catch (err) {
if (err.cause instanceof nodesErrors.ErrorNodeGraphNodeIdNotFound) {
error = new binErrors.ErrorNodePingFailed(
error = new binErrors.ErrorCLINodePingFailed(
`Failed to resolve node ID ${nodesUtils.encodeNodeId(
nodeId,
)} to an address.`,
Expand All @@ -69,7 +69,7 @@ class CommandPing extends CommandPolykey {
const status = { success: false, message: '' };
status.success = statusMessage ? statusMessage.getSuccess() : false;
if (!status.success && !error) {
error = new binErrors.ErrorNodePingFailed('No response received');
error = new binErrors.ErrorCLINodePingFailed('No response received');
}
if (status.success) status.message = 'Node is Active.';
else status.message = error.message;
Expand Down
Loading