Skip to content

Commit

Permalink
Merge pull request #846 from MatrixAI/feature-eng-432-review-and-refa…
Browse files Browse the repository at this point in the history
…ctor-rpc-handlers-to-handle-cancellation

Add cancellation to RPC handlers for the `vaults` domain
  • Loading branch information
aryanjassal authored Jan 30, 2025
2 parents 906c32e + 2972ad3 commit 23176b7
Show file tree
Hide file tree
Showing 78 changed files with 2,741 additions and 1,392 deletions.
10 changes: 3 additions & 7 deletions src/acl/ACL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,7 @@ class ACL {
if (permId in permIds) {
nodePerm = permIds[permId];
// Get the first existing perm object
let perm: Permission;
for (const nodeId_ in nodePerm) {
perm = nodePerm[nodeId_];
break;
}
const perm = Object.values(nodePerm)[0];
// All perm objects are shared
nodePerm[nodeId] = perm!;
} else {
Expand Down Expand Up @@ -614,8 +610,8 @@ class ACL {
[...this.aclNodesDbPath, nodeId.toBuffer()],
true,
);
// Skip if the nodeId doesn't exist
// this means that it previously been removed
// Skip if the nodeId doesn't exist. This means that it has previously
// been removed.
if (permId == null) {
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions src/client/callers/vaultsSecretsGet.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { HandlerTypes } from '@matrixai/rpc';
import type VaultsSecretsGet from '../handlers/VaultsSecretsGet';
import { ServerCaller } from '@matrixai/rpc';
import { UnaryCaller } from '@matrixai/rpc';

type CallerTypes = HandlerTypes<VaultsSecretsGet>;

const vaultsSecretsGet = new ServerCaller<
const vaultsSecretsGet = new UnaryCaller<
CallerTypes['input'],
CallerTypes['output']
>();
Expand Down
2 changes: 1 addition & 1 deletion src/client/handlers/AgentStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import type {
} from '../types';
import type PolykeyAgent from '../../PolykeyAgent';
import { UnaryHandler } from '@matrixai/rpc';
import * as nodesUtils from '../../nodes/utils';
import config from '../../config';
import * as nodesUtils from '../../nodes/utils';

class AgentStatus extends UnaryHandler<
{
Expand Down
5 changes: 3 additions & 2 deletions src/client/handlers/AuditEventsGet.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { JSONValue } from '@matrixai/rpc';
import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types';
import type {
AuditEvent,
Expand Down Expand Up @@ -42,8 +43,8 @@ class AuditEventsGet extends ServerHandler<
}> & {
paths: Array<TopicSubPath>;
},
_cancel,
_meta,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextTimed,
): AsyncGenerator<ClientRPCResponseResult<AuditEventSerialized>> {
const { audit }: { audit: Audit } = this.container;
Expand Down
2 changes: 1 addition & 1 deletion src/client/handlers/GestaltsActionsGetByIdentity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import type GestaltGraph from '../../gestalts/GestaltGraph';
import type { GestaltAction } from '../../gestalts/types';
import type { IdentityId, ProviderId } from '../../ids';
import { UnaryHandler } from '@matrixai/rpc';
import * as ids from '../../ids';
import { validateSync } from '../../validation';
import { matchSync } from '../../utils';
import * as ids from '../../ids';

class GestaltsActionsGetByIdentity extends UnaryHandler<
{
Expand Down
13 changes: 7 additions & 6 deletions src/client/handlers/IdentitiesAuthenticate.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { JSONValue } from '@matrixai/rpc';
import type {
AuthProcessMessage,
ClientRPCRequestParams,
Expand All @@ -21,11 +23,10 @@ class IdentitiesAuthenticate extends ServerHandler<
public timeout = 120000; // 2 Minutes
public handle = async function* (
input: ClientRPCRequestParams<{ providerId: string }>,
_cancel,
_meta,
ctx,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextTimed,
): AsyncGenerator<ClientRPCResponseResult<AuthProcessMessage>> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const { identitiesManager }: { identitiesManager: IdentitiesManager } =
this.container;
const {
Expand All @@ -52,7 +53,7 @@ class IdentitiesAuthenticate extends ServerHandler<
if (authFlowResult.done) {
never('authFlow signalled done too soon');
}
if (ctx.signal.aborted) throw ctx.signal.reason;
ctx.signal.throwIfAborted();
yield {
request: {
url: authFlowResult.value.url,
Expand All @@ -63,7 +64,7 @@ class IdentitiesAuthenticate extends ServerHandler<
if (!authFlowResult.done) {
never('authFlow did not signal done when expected');
}
if (ctx.signal.aborted) throw ctx.signal.reason;
ctx.signal.throwIfAborted();
yield {
response: {
identityId: authFlowResult.value,
Expand Down
15 changes: 7 additions & 8 deletions src/client/handlers/IdentitiesAuthenticatedGet.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { JSONValue } from '@matrixai/rpc';
import type {
ClientRPCRequestParams,
ClientRPCResponseResult,
Expand All @@ -19,11 +21,10 @@ class IdentitiesAuthenticatedGet extends ServerHandler<
> {
public handle = async function* (
input: ClientRPCRequestParams<{ providerId?: string }>,
_cancel,
_meta,
ctx,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextTimed,
): AsyncGenerator<ClientRPCResponseResult<IdentityMessage>> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const { identitiesManager }: { identitiesManager: IdentitiesManager } =
this.container;
let providerId: ProviderId | undefined;
Expand All @@ -46,12 +47,10 @@ class IdentitiesAuthenticatedGet extends ServerHandler<
: [providerId];
for (const providerId of providerIds) {
const provider = identitiesManager.getProvider(providerId);
if (provider == null) {
continue;
}
if (provider == null) continue;
const identities = await provider.getAuthIdentityIds();
for (const identityId of identities) {
if (ctx.signal.aborted) throw ctx.signal.reason;
ctx.signal.throwIfAborted();
yield {
providerId: provider.id,
identityId: identityId,
Expand Down
12 changes: 7 additions & 5 deletions src/client/handlers/IdentitiesInfoConnectedGet.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { JSONValue } from '@matrixai/rpc';
import type {
ClientRPCRequestParams,
ClientRPCResponseResult,
Expand All @@ -22,11 +24,10 @@ class IdentitiesInfoConnectedGet extends ServerHandler<
> {
public handle = async function* (
input: ClientRPCRequestParams<ProviderSearchMessage>,
_cancel,
_meta,
ctx,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextTimed,
): AsyncGenerator<ClientRPCResponseResult<IdentityInfoMessage>> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const { identitiesManager }: { identitiesManager: IdentitiesManager } =
this.container;
const {
Expand Down Expand Up @@ -71,6 +72,7 @@ class IdentitiesInfoConnectedGet extends ServerHandler<
}
const identities: Array<AsyncGenerator<IdentityData>> = [];
for (const providerId of providerIds) {
ctx.signal.throwIfAborted();
// Get provider from id
const provider = identitiesManager.getProvider(providerId);
if (provider === undefined) {
Expand All @@ -94,7 +96,7 @@ class IdentitiesInfoConnectedGet extends ServerHandler<
let count = 0;
for (const gen of identities) {
for await (const identity of gen) {
if (ctx.signal.aborted) throw ctx.signal.reason;
ctx.signal.throwIfAborted();
if (input.limit !== undefined && count >= input.limit) break;
yield {
providerId: identity.providerId,
Expand Down
14 changes: 8 additions & 6 deletions src/client/handlers/IdentitiesInfoGet.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { JSONValue } from '@matrixai/rpc';
import type {
ClientRPCRequestParams,
ClientRPCResponseResult,
Expand All @@ -8,11 +10,11 @@ import type { IdentityId, ProviderId } from '../../ids';
import type IdentitiesManager from '../../identities/IdentitiesManager';
import type { IdentityData } from '../../identities/types';
import { ServerHandler } from '@matrixai/rpc';
import { validateSync } from '../../validation';
import { matchSync } from '../../utils';
import * as ids from '../../ids';
import * as identitiesErrors from '../../identities/errors';
import * as identitiesUtils from '../../identities/utils';
import { validateSync } from '../../validation';
import { matchSync } from '../../utils';

class IdentitiesInfoGet extends ServerHandler<
{
Expand All @@ -23,9 +25,9 @@ class IdentitiesInfoGet extends ServerHandler<
> {
public handle = async function* (
input: ClientRPCRequestParams<ProviderSearchMessage>,
_cancel,
_meta,
ctx,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextTimed,
): AsyncGenerator<ClientRPCResponseResult<IdentityInfoMessage>> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const { identitiesManager }: { identitiesManager: IdentitiesManager } =
Expand Down Expand Up @@ -86,7 +88,7 @@ class IdentitiesInfoGet extends ServerHandler<
input.limit = identities.length;
}
for (let i = 0; i < input.limit; i++) {
if (ctx.signal.aborted) throw ctx.signal.reason;
ctx.signal.throwIfAborted();
const identity = identities[i];
if (identity !== undefined) {
if (identitiesUtils.matchIdentityData(identity, searchTerms)) {
Expand Down
16 changes: 8 additions & 8 deletions src/client/handlers/KeysCertsChainGet.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { JSONValue } from '@matrixai/rpc';
import type {
CertMessage,
ClientRPCRequestParams,
Expand All @@ -14,17 +16,15 @@ class KeysCertsChainGet extends ServerHandler<
ClientRPCResponseResult<CertMessage>
> {
public handle = async function* (
_input,
_cancel,
_meta,
ctx,
_input: ClientRPCRequestParams,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextTimed,
): AsyncGenerator<ClientRPCResponseResult<CertMessage>> {
const { certManager }: { certManager: CertManager } = this.container;
for (const certPEM of await certManager.getCertPEMsChain()) {
if (ctx.signal.aborted) throw ctx.signal.reason;
yield {
cert: certPEM,
};
ctx.signal.throwIfAborted();
yield { cert: certPEM };
}
};
}
Expand Down
4 changes: 1 addition & 3 deletions src/client/handlers/KeysCertsGet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ class KeysCertsGet extends UnaryHandler<
public handle = async (): Promise<ClientRPCResponseResult<CertMessage>> => {
const { certManager }: { certManager: CertManager } = this.container;
const cert = await certManager.getCurrentCertPEM();
return {
cert,
};
return { cert };
};
}

Expand Down
2 changes: 0 additions & 2 deletions src/client/handlers/KeysKeyPairRenew.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ class KeysKeyPairRenew extends UnaryHandler<
input: ClientRPCRequestParams<PasswordMessage>,
): Promise<ClientRPCResponseResult> => {
const { certManager }: { certManager: CertManager } = this.container;

// Other domains will be updated accordingly via the `EventBus` so we
// only need to modify the KeyManager
await certManager.renewCertWithNewKeyPair(input.password);

return {};
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/handlers/KeysVerify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class KeysVerify extends UnaryHandler<
Buffer.from(input.data, 'binary'),
Buffer.from(input.signature, 'binary') as Signature,
);
return { type: 'success', success: success };
return { success: success };
};
}

Expand Down
11 changes: 8 additions & 3 deletions src/client/handlers/NodesAdd.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { DB } from '@matrixai/db';
import type { JSONValue } from '@matrixai/rpc';
import type {
ClientRPCRequestParams,
ClientRPCResponseResult,
Expand All @@ -8,11 +10,11 @@ import type { NodeId } from '../../ids';
import type { Host, Port } from '../../network/types';
import type NodeManager from '../../nodes/NodeManager';
import { UnaryHandler } from '@matrixai/rpc';
import { matchSync } from '../../utils';
import { validateSync } from '../../validation';
import * as ids from '../../ids';
import * as networkUtils from '../../network/utils';
import * as nodeErrors from '../../nodes/errors';
import { matchSync } from '../../utils';
import { validateSync } from '../../validation';

class NodesAdd extends UnaryHandler<
{
Expand All @@ -24,6 +26,9 @@ class NodesAdd extends UnaryHandler<
> {
public handle = async (
input: ClientRPCRequestParams<NodesAddMessage>,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextTimed,
): Promise<ClientRPCResponseResult> => {
const { db, nodeManager }: { db: DB; nodeManager: NodeManager } =
this.container;
Expand Down Expand Up @@ -72,8 +77,8 @@ class NodesAdd extends UnaryHandler<
true,
input.force ?? false,
1500,
undefined,
tran,
ctx,
),
);
return {};
Expand Down
6 changes: 3 additions & 3 deletions src/client/handlers/NodesClaim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ class NodesClaim extends UnaryHandler<
},
);
await db.withTransactionF(async (tran) => {
// Attempt to claim the node,
// if there is no permission then we get an error
// Attempt to claim the node. If there is no permission then we get an
// error.
await nodeManager.claimNode(nodeId, tran);
});
return { type: 'success', success: true };
return { success: true };
};
}

Expand Down
18 changes: 7 additions & 11 deletions src/client/handlers/NodesFind.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { JSONValue } from '@matrixai/rpc';
import type { ContextTimed } from '@matrixai/contexts';
import type {
ClientRPCRequestParams,
ClientRPCResponseResult,
Expand All @@ -6,12 +8,11 @@ import type {
} from '../types';
import type { NodeId } from '../../ids';
import type NodeManager from '../../nodes/NodeManager';
import type { ContextTimed } from '@matrixai/contexts';
import { UnaryHandler } from '@matrixai/rpc';
import * as ids from '../../ids';
import * as nodesErrors from '../../nodes/errors';
import { validateSync } from '../../validation';
import { matchSync } from '../../utils';
import * as ids from '../../ids';
import * as nodesErrors from '../../nodes/errors';

class NodesFind extends UnaryHandler<
{
Expand All @@ -22,8 +23,8 @@ class NodesFind extends UnaryHandler<
> {
public handle = async (
input: ClientRPCRequestParams<NodeIdMessage>,
_cancel,
_meta,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextTimed,
): Promise<ClientRPCResponseResult<NodesFindMessage>> => {
const { nodeManager }: { nodeManager: NodeManager } = this.container;
Expand All @@ -42,12 +43,7 @@ class NodesFind extends UnaryHandler<
nodeId: input.nodeIdEncoded,
},
);
const result = await nodeManager.findNode(
{
nodeId: nodeId,
},
ctx,
);
const result = await nodeManager.findNode({ nodeId: nodeId }, ctx);
if (result == null) {
throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
}
Expand Down
Loading

0 comments on commit 23176b7

Please sign in to comment.