Skip to content

Commit

Permalink
feat: consolidating client service logic into ClientService
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Sep 14, 2023
1 parent 5cd6e49 commit 1ebcd20
Show file tree
Hide file tree
Showing 23 changed files with 1,268 additions and 1,194 deletions.
121 changes: 63 additions & 58 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import path from 'path';
import process from 'process';
import Logger from '@matrixai/logger';
import { DB } from '@matrixai/db';
import { CreateDestroyStartStop } from '@matrixai/async-init/dist/CreateDestroyStartStop';
import {
CreateDestroyStartStop,
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import RPCServer from './rpc/RPCServer';
import WebSocketServer from './websockets/WebSocketServer';
import * as rpcUtilsMiddleware from './rpc/utils/middleware';
import * as clientUtilsMiddleware from './client/utils/middleware';
import { WorkerManager } from './workers';
Expand Down Expand Up @@ -42,6 +44,7 @@ import * as workersUtils from './workers/utils';
import TaskManager from './tasks/TaskManager';
import { serverManifest as clientServerManifest } from './client/handlers';
import { serverManifest as agentServerManifest } from './agent/handlers';
import ClientService from './client/ClientService';

/**
* Optional configuration for `PolykeyAgent`.
Expand Down Expand Up @@ -105,7 +108,7 @@ class PolykeyAgent {
* All optional configuration is deep-merged with defaults.
*
* If any of the optional dependencies is injected, their lifecycle will not
* be managed by `PolykeyAgent`. Furthermore if you inject an optional
* be managed by `PolykeyAgent`. Furthermore, if you inject an optional
* dependency, make sure you are injecting all upstream transitive
* dependencies at the same time. For example if you inject `acl`, you must
* also inject `db`.
Expand Down Expand Up @@ -134,8 +137,7 @@ class PolykeyAgent {
vaultManager,
notificationsManager,
sessionManager,
rpcServerClient,
webSocketServerClient,
clientService,
rpcServerAgent,
fs = require('fs'),
logger = new Logger(this.name),
Expand All @@ -159,8 +161,7 @@ class PolykeyAgent {
vaultManager?: VaultManager;
notificationsManager?: NotificationsManager;
sessionManager?: SessionManager;
rpcServerClient?: RPCServer;
webSocketServerClient?: WebSocketServer;
clientService?: ClientService;
rpcServerAgent?: RPCServer;
fs?: FileSystem;
logger?: Logger;
Expand Down Expand Up @@ -447,9 +448,9 @@ class PolykeyAgent {
if (optionsDefaulted.keys.recoveryCode != null) {
await sessionManager.resetKey();
}
if (rpcServerClient == null) {
if (clientService == null) {
pkAgentProm = utils.promise();
rpcServerClient = await RPCServer.createRPCServer({
clientService = await ClientService.createClientService({
manifest: clientServerManifest({
acl: acl,
certManager: certManager,
Expand All @@ -468,30 +469,23 @@ class PolykeyAgent {
sessionManager: sessionManager,
vaultManager: vaultManager,
}),
middlewareFactory: rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
clientUtilsMiddleware.middlewareServer(sessionManager, keyRing),
optionsDefaulted.rpc.parserBufferSize,
),
sensitive: false,
handlerTimeoutTime: optionsDefaulted.rpc.callTimeoutTime,
handlerTimeoutGraceTime: optionsDefaulted.rpc.callTimeoutTime + 2000,
logger: logger.getChild(RPCServer.name + 'Client'),
tlsConfig,
options: {
middlewareFactory: clientUtilsMiddleware.middlewareServer(
sessionManager,
keyRing,
),
host: optionsDefaulted.clientServiceHost,
port: optionsDefaulted.clientServicePort,
keepAliveTimeoutTime: optionsDefaulted.client.keepAliveTimeoutTime,
keepAliveIntervalTime:
optionsDefaulted.client.keepAliveIntervalTime,
rpcCallTimeoutTime: optionsDefaulted.rpc.callTimeoutTime,
rpcParserBufferSize: optionsDefaulted.rpc.parserBufferSize,
},
logger: logger.getChild(ClientService.name),
});
}
webSocketServerClient =
webSocketServerClient ??
(await WebSocketServer.createWebSocketServer({
connectionCallback: (rpcStream) =>
rpcServerClient!.handleStream(rpcStream),
host: optionsDefaulted.clientServiceHost,
port: optionsDefaulted.clientServicePort,
tlsConfig,
// FIXME: Not sure about this, maxIdleTimeout doesn't seem to be used?
maxIdleTimeout: optionsDefaulted.client.keepAliveTimeoutTime,
pingIntervalTime: optionsDefaulted.client.keepAliveIntervalTime,
pingTimeoutTimeTime: optionsDefaulted.client.keepAliveTimeoutTime,
logger: logger.getChild('WebSocketServer'),
}));
if (rpcServerAgent == null) {
rpcServerAgent = await RPCServer.createRPCServer({
manifest: agentServerManifest({
Expand Down Expand Up @@ -519,8 +513,7 @@ class PolykeyAgent {
} catch (e) {
logger.warn(`Failed Creating ${this.name}`);
await rpcServerAgent?.destroy(true);
await rpcServerClient?.destroy();
await webSocketServerClient?.stop(true);
await clientService?.stop({ force: true });
await sessionManager?.stop();
await notificationsManager?.stop();
await vaultManager?.stop();
Expand Down Expand Up @@ -556,8 +549,7 @@ class PolykeyAgent {
vaultManager,
notificationsManager,
sessionManager,
rpcServerClient,
webSocketServerClient,
clientService,
rpcServerAgent,
fs,
logger,
Expand Down Expand Up @@ -600,8 +592,7 @@ class PolykeyAgent {
public readonly sessionManager: SessionManager;
public readonly fs: FileSystem;
public readonly logger: Logger;
public readonly rpcServerClient: RPCServer;
public readonly webSocketServerClient: WebSocketServer;
public readonly clientService: ClientService;
public readonly rpcServerAgent: RPCServer;
protected workerManager: PolykeyWorkerManagerInterface | undefined;

Expand All @@ -625,7 +616,7 @@ class PolykeyAgent {
keyPrivatePem: keysUtils.privateKeyToPEM(data.keyPair.privateKey),
certChainPem: await this.certManager.getCertPEMsChainPEM(),
};
this.webSocketServerClient.setTlsConfig(tlsConfig);
this.clientService.setTlsConfig(tlsConfig);
this.nodeConnectionManager.updateTlsConfig(tlsConfig);
this.logger.info(`${KeyRing.name} change propagated`);
};
Expand All @@ -649,8 +640,7 @@ class PolykeyAgent {
vaultManager,
notificationsManager,
sessionManager,
rpcServerClient,
webSocketServerClient,
clientService,
rpcServerAgent,
fs,
logger,
Expand All @@ -673,8 +663,7 @@ class PolykeyAgent {
vaultManager: VaultManager;
notificationsManager: NotificationsManager;
sessionManager: SessionManager;
rpcServerClient: RPCServer;
webSocketServerClient: WebSocketServer;
clientService: ClientService;
rpcServerAgent: RPCServer;
fs: FileSystem;
logger: Logger;
Expand All @@ -698,13 +687,30 @@ class PolykeyAgent {
this.vaultManager = vaultManager;
this.notificationsManager = notificationsManager;
this.sessionManager = sessionManager;
this.rpcServerClient = rpcServerClient;
this.webSocketServerClient = webSocketServerClient;
this.clientService = clientService;
this.rpcServerAgent = rpcServerAgent;
this.fs = fs;
}

// TODO: add getters for runtime service information?
@ready(new errors.ErrorPolykeyAgentNotRunning())
get clientServiceHost() {
return this.clientService.host;
}

@ready(new errors.ErrorPolykeyAgentNotRunning())
get clientServicePort() {
return this.clientService.port;
}

@ready(new errors.ErrorPolykeyAgentNotRunning())
get agentServiceHost() {
return this.nodeConnectionManager.host as string;
}

@ready(new errors.ErrorPolykeyAgentNotRunning())
get agentServicePort() {
return this.nodeConnectionManager.port as number;
}

public async start({
password,
Expand Down Expand Up @@ -775,13 +781,12 @@ class PolykeyAgent {
),
certChainPem: await this.certManager.getCertPEMsChainPEM(),
};
// Client server
await this.webSocketServerClient.start({
await this.clientService.start({
tlsConfig,
host: optionsDefaulted.clientServiceHost,
port: optionsDefaulted.clientServicePort,
connectionCallback: (streamPair) =>
this.rpcServerClient.handleStream(streamPair),
options: {
host: optionsDefaulted.clientServiceHost,
port: optionsDefaulted.clientServicePort,
},
});
await this.nodeManager.start();
this.nodeConnectionManager.addEventListener(
Expand Down Expand Up @@ -812,10 +817,10 @@ class PolykeyAgent {
await this.status.finishStart({
pid: process.pid,
nodeId: this.keyRing.getNodeId(),
clientHost: this.webSocketServerClient.getHost(),
clientPort: this.webSocketServerClient.getPort(),
agentHost: this.nodeConnectionManager.host,
agentPort: this.nodeConnectionManager.port,
clientHost: this.clientServiceHost,
clientPort: this.clientServicePort,
agentHost: this.agentServiceHost,
agentPort: this.agentServicePort,
});
this.logger.info(`Started ${this.constructor.name}`);
} catch (e) {
Expand All @@ -840,7 +845,7 @@ class PolykeyAgent {
this.handleEventNodeStream,
);
await this.nodeManager?.stop();
await this.webSocketServerClient.stop(true);
await this.clientService.stop({ force: true });
await this.identitiesManager?.stop();
await this.gestaltGraph?.stop();
await this.acl?.stop();
Expand Down Expand Up @@ -881,7 +886,7 @@ class PolykeyAgent {
);
await this.nodeGraph.stop();
await this.nodeManager.stop();
await this.webSocketServerClient.stop(true);
await this.clientService.stop({ force: true });
await this.identitiesManager.stop();
await this.gestaltGraph.stop();
await this.acl.stop();
Expand Down Expand Up @@ -930,7 +935,7 @@ class PolykeyAgent {
await this.discovery.destroy();
await this.nodeGraph.destroy();
await this.rpcServerAgent.destroy();
await this.rpcServerClient.destroy();
await this.clientService.destroy({ force: true });
await this.identitiesManager.destroy();
await this.gestaltGraph.destroy();
await this.acl.destroy();
Expand Down
Loading

0 comments on commit 1ebcd20

Please sign in to comment.