Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow vault efs resource acquisition to operate on multiple vaults in parallel #847

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"@matrixai/mdns": "^1.2.6",
"@matrixai/quic": "^1.3.1",
"@matrixai/resources": "^1.1.5",
"@matrixai/rpc": "^0.6.0",
"@matrixai/rpc": "^0.6.2",
"@matrixai/timer": "^1.1.3",
"@matrixai/workers": "^1.3.7",
"@matrixai/ws": "^1.1.7",
Expand Down
12 changes: 12 additions & 0 deletions src/client/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ class ErrorClientAuthDenied<T> extends ErrorClient<T> {
exitCode = sysexits.NOPERM;
}

class ErrorClientInvalidHeader<T> extends ErrorClient<T> {
static description = 'The header message does not match the expected type';
exitCode = sysexits.USAGE;
}

class ErrorClientProtocolError<T> extends ErrorClient<T> {
static description = 'Data does not match the protocol requirements';
exitCode = sysexits.USAGE;
}

class ErrorClientService<T> extends ErrorClient<T> {}

class ErrorClientServiceRunning<T> extends ErrorClientService<T> {
Expand Down Expand Up @@ -45,6 +55,8 @@ export {
ErrorClientAuthMissing,
ErrorClientAuthFormat,
ErrorClientAuthDenied,
ErrorClientInvalidHeader,
ErrorClientProtocolError,
ErrorClientService,
ErrorClientServiceRunning,
ErrorClientServiceNotRunning,
Expand Down
4 changes: 3 additions & 1 deletion src/client/handlers/VaultsSecretsCat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class VaultsSecretsCat extends DuplexHandler<
ClientRPCResponseResult<ContentOrErrorMessage>
> {
public handle = async function* (
input: AsyncIterable<ClientRPCRequestParams<SecretIdentifierMessage>>,
input: AsyncIterableIterator<
ClientRPCRequestParams<SecretIdentifierMessage>
>,
): AsyncGenerator<ClientRPCResponseResult<ContentOrErrorMessage>> {
const { db, vaultManager }: { db: DB; vaultManager: VaultManager } =
this.container;
Expand Down
4 changes: 2 additions & 2 deletions src/client/handlers/VaultsSecretsEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { DuplexHandler } from '@matrixai/rpc';
import * as vaultsUtils from '../../vaults/utils';
import * as vaultsErrors from '../../vaults/errors';

class VaultsSecretsList extends DuplexHandler<
class VaultsSecretsEnv extends DuplexHandler<
{
db: DB;
vaultManager: VaultManager;
Expand Down Expand Up @@ -86,4 +86,4 @@ class VaultsSecretsList extends DuplexHandler<
};
}

export default VaultsSecretsList;
export default VaultsSecretsEnv;
2 changes: 1 addition & 1 deletion src/client/handlers/VaultsSecretsMkdir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class VaultsSecretsMkdir extends DuplexHandler<
ClientRPCResponseResult<SuccessOrErrorMessage>
> {
public handle = async function* (
input: AsyncIterable<ClientRPCRequestParams<SecretDirMessage>>,
input: AsyncIterableIterator<ClientRPCRequestParams<SecretDirMessage>>,
): AsyncGenerator<ClientRPCResponseResult<SuccessOrErrorMessage>> {
const { db, vaultManager }: { db: DB; vaultManager: VaultManager } =
this.container;
Expand Down
170 changes: 100 additions & 70 deletions src/client/handlers/VaultsSecretsRemove.ts
Original file line number Diff line number Diff line change
@@ -1,99 +1,129 @@
import type { DB } from '@matrixai/db';
import type { ResourceAcquire } from '@matrixai/resources';
import type {
ClientRPCRequestParams,
ClientRPCResponseResult,
SecretIdentifierMessage,
SecretsRemoveHeaderMessage,
SecretIdentifierMessageTagged,
SuccessOrErrorMessage,
} from '../types';
import type VaultManager from '../../vaults/VaultManager';
import type { FileSystemWritable } from '../../vaults/types';
import { withG } from '@matrixai/resources';
import { DuplexHandler } from '@matrixai/rpc';
import * as vaultsUtils from '../../vaults/utils';
import * as vaultsErrors from '../../vaults/errors';
import * as clientErrors from '../errors';

class VaultsSecretsRemove extends DuplexHandler<
{
db: DB;
vaultManager: VaultManager;
},
ClientRPCRequestParams<SecretIdentifierMessage>,
ClientRPCRequestParams<
SecretsRemoveHeaderMessage | SecretIdentifierMessageTagged
>,
ClientRPCResponseResult<SuccessOrErrorMessage>
> {
public handle = async function* (
input: AsyncIterable<ClientRPCRequestParams<SecretIdentifierMessage>>,
input: AsyncIterableIterator<
ClientRPCRequestParams<
SecretsRemoveHeaderMessage | SecretIdentifierMessageTagged
>
>,
): AsyncGenerator<ClientRPCResponseResult<SuccessOrErrorMessage>> {
const { db, vaultManager }: { db: DB; vaultManager: VaultManager } =
this.container;
// Create a record of secrets to be removed, grouped by vault names
const vaultGroups: Record<string, Array<string>> = {};
const secretNames: Array<[string, string]> = [];
let metadata: any = undefined;
for await (const secretRemoveMessage of input) {
if (metadata == null) metadata = secretRemoveMessage.metadata ?? {};
secretNames.push([
secretRemoveMessage.nameOrId,
secretRemoveMessage.secretName,
]);
// Extract the header message from the iterator
const headerMessagePair = await input.next();
const headerMessage:
| SecretsRemoveHeaderMessage
| SecretIdentifierMessageTagged = headerMessagePair.value;
// Testing if the header is of the expected format
if (
headerMessagePair.done ||
headerMessage.type !== 'VaultNamesHeaderMessage'
) {
throw new clientErrors.ErrorClientInvalidHeader();
}
secretNames.forEach(([vaultName, secretName]) => {
if (vaultGroups[vaultName] == null) {
vaultGroups[vaultName] = [];
// Create an array of write acquires
const vaultAcquires = await db.withTransactionF(async (tran) => {
const vaultAcquires: Array<ResourceAcquire<FileSystemWritable>> = [];
for (const vaultName of headerMessage.vaultNames) {
const vaultIdFromName = await vaultManager.getVaultId(vaultName, tran);
const vaultId = vaultIdFromName ?? vaultsUtils.decodeVaultId(vaultName);
if (vaultId == null) {
throw new vaultsErrors.ErrorVaultsVaultUndefined(
`Vault ${vaultName} does not exist`,
);
}
const acquire = await vaultManager.withVaults(
[vaultId],
async (vault) => vault.acquireWrite(),
);
vaultAcquires.push(acquire);
aryanjassal marked this conversation as resolved.
Show resolved Hide resolved
}
vaultGroups[vaultName].push(secretName);
return vaultAcquires;
});
// Now, all the paths will be removed for a vault within a single commit
yield* db.withTransactionG(
async function* (tran): AsyncGenerator<SuccessOrErrorMessage> {
for (const [vaultName, secretNames] of Object.entries(vaultGroups)) {
const vaultIdFromName = await vaultManager.getVaultId(
vaultName,
tran,
);
const vaultId =
vaultIdFromName ?? vaultsUtils.decodeVaultId(vaultName);
if (vaultId == null) {
throw new vaultsErrors.ErrorVaultsVaultUndefined();
// Acquire all locks in parallel and perform all operations at once
yield* withG(
vaultAcquires,
async function* (efses): AsyncGenerator<SuccessOrErrorMessage> {
// Creating the vault name to efs map for easy access
const vaultMap = new Map<string, FileSystemWritable>();
for (let i = 0; i < efses.length; i++) {
vaultMap.set(headerMessage!.vaultNames[i], efses[i]);
}
let loopRan = false;
for await (const message of input) {
loopRan = true;
// Header messages should not be seen anymore
if (message.type === 'VaultNamesHeaderMessage') {
throw new clientErrors.ErrorClientProtocolError(
'The header message cannot be sent multiple times',
);
}
yield* vaultManager.withVaultsG(
[vaultId],
async function* (vault): AsyncGenerator<SuccessOrErrorMessage> {
yield* vault.writeG(
async function* (efs): AsyncGenerator<SuccessOrErrorMessage> {
for (const secretName of secretNames) {
try {
const stat = await efs.stat(secretName);
if (stat.isDirectory()) {
await efs.rmdir(secretName, {
recursive: metadata?.options?.recursive,
});
} else {
await efs.unlink(secretName);
}
yield {
type: 'success',
success: true,
};
} catch (e) {
if (
e.code === 'ENOENT' ||
e.code === 'ENOTEMPTY' ||
e.code === 'EINVAL'
) {
// INVAL can be triggered if removing the root of the
// vault is attempted.
yield {
type: 'error',
code: e.code,
reason: secretName,
};
} else {
throw e;
}
}
}
},
);
},
tran,
const efs = vaultMap.get(message.nameOrId);
if (efs == null) {
throw new vaultsErrors.ErrorVaultsVaultUndefined(
`Vault ${message.nameOrId} was not present in the header message`,
);
}
try {
const stat = await efs.stat(message.secretName);
if (stat.isDirectory()) {
await efs.rmdir(message.secretName, {
recursive: headerMessage.recursive,
});
} else {
await efs.unlink(message.secretName);
}
yield {
type: 'success',
success: true,
};
} catch (e) {
if (
e.code === 'ENOENT' ||
e.code === 'ENOTEMPTY' ||
e.code === 'EINVAL'
) {
// EINVAL can be triggered if removing the root of the
// vault is attempted.
yield {
type: 'error',
code: e.code,
reason: message.secretName,
};
} else {
throw e;
}
}
}
// Content messages must follow header messages
if (!loopRan) {
throw new clientErrors.ErrorClientProtocolError(
'No content messages followed header message',
);
}
},
Expand Down
16 changes: 16 additions & 0 deletions src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,19 @@ type SecretStatMessage = {
};
};

type SecretIdentifierMessageTagged = SecretIdentifierMessage & {
type: 'SecretIdentifierMessage';
};

type VaultNamesHeaderMessage = {
type: 'VaultNamesHeaderMessage';
vaultNames: Array<string>;
};

type SecretsRemoveHeaderMessage = VaultNamesHeaderMessage & {
recursive?: boolean;
};

// Type casting for tricky handlers

type OverrideRPClientType<T extends RPCClient<ClientManifest>> = Omit<
Expand Down Expand Up @@ -435,6 +448,9 @@ export type {
SecretRenameMessage,
SecretFilesMessage,
SecretStatMessage,
SecretIdentifierMessageTagged,
VaultNamesHeaderMessage,
SecretsRemoveHeaderMessage,
SignatureMessage,
OverrideRPClientType,
AuditMetricGetTypeOverride,
Expand Down
4 changes: 3 additions & 1 deletion src/git/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ async function listObjects({
}
return;
default:
utils.never();
utils.never(
`type must be one of "commit", "tree", "blob", or "tag", got "${type}"`,
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function getDefaultNodePath(): string | undefined {
return p;
}

function never(message?: string): never {
function never(message: string): never {
throw new utilsErrors.ErrorUtilsUndefinedBehaviour(message);
}

Expand Down
2 changes: 2 additions & 0 deletions src/vaults/Vault.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ interface Vault {
writeG: VaultInternal['writeG'];
readF: VaultInternal['readF'];
readG: VaultInternal['readG'];
acquireRead: VaultInternal['acquireRead'];
acquireWrite: VaultInternal['acquireWrite'];
log: VaultInternal['log'];
version: VaultInternal['version'];
}
Expand Down
Loading
Loading