Skip to content

Commit

Permalink
refactor: reusable thread pool to decrypt keystores
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed Jun 9, 2023
1 parent 8cd7bd2 commit a529ddf
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 77 deletions.
14 changes: 7 additions & 7 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
// This AbortController interrupts various validators ops: genesis req, clients call, clock etc
const abortController = new AbortController();

// We set infinity for abort controller used for validator operations,
// to prevent MaxListenersExceededWarning which get logged when listeners > 10
// Since it is perfectly fine to have listeners > 10
setMaxListeners(Infinity, abortController.signal);

onGracefulShutdownCbs.push(async () => abortController.abort());

/**
* For rationale and documentation of how signers are loaded from args and disk,
* see {@link PersistedKeysBackend} and {@link getSignersFromArgs}
Expand All @@ -95,13 +102,6 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr

logSigners(logger, signers);

// We set infinity for abort controller used for validator operations,
// to prevent MaxListenersExceededWarning which get logged when listeners > 10
// Since it is perfectly fine to have listeners > 10
setMaxListeners(Infinity, abortController.signal);

onGracefulShutdownCbs.push(async () => abortController.abort());

const dbOps = {
config,
controller: new LevelDbController({name: dbPath}, {metrics: null, logger}),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
import {spawn, ModuleThread, Pool, QueuedTask, Worker} from "@chainsafe/threads";
import path from "node:path";
import {SignerLocal, SignerType} from "@lodestar/validator";
import {LogLevel, Logger} from "@lodestar/utils";
import bls from "@chainsafe/bls";
import {LocalKeystoreDefinition} from "../interface.js";
import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "../keystoreCache.js";
import {lockFilepath, unlockFilepath} from "../../../../util/lockfile.js";
import {maxPoolSize} from "./poolSize.js";
import {DecryptKeystoreWorkerAPI, KeystoreDecryptOptions} from "./types.js";
import {lockFilepath, unlockFilepath} from "../../../util/lockfile.js";
import {LocalKeystoreDefinition} from "./interface.js";
import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "./keystoreCache.js";
import {DecryptKeystoresThreadPool} from "./decryptKeystores/index.js";

type KeystoreDecryptOptions = {
ignoreLockFile?: boolean;
onDecrypt?: (index: number) => void;
// Try to use the cache file if it exists
cacheFilePath?: string;
logger: Pick<Logger, LogLevel.info | LogLevel.warn | LogLevel.debug>;
signal: AbortSignal;
};

type KeystoreDecryptError = {
keystoreFile: string;
error: Error;
};

/**
* Decrypt keystore definitions using a threadpool
* Decrypt keystore definitions using a thread pool
*/
export async function decryptKeystoreDefinitions(
keystoreDefinitions: LocalKeystoreDefinition[],
Expand Down Expand Up @@ -38,23 +52,15 @@ export async function decryptKeystoreDefinitions(
const keystoreCount = keystoreDefinitions.length;
const signers = new Array<SignerLocal>(keystoreCount);
const passwords = new Array<string>(keystoreCount);
const tasks: QueuedTask<ModuleThread<DecryptKeystoreWorkerAPI>, Uint8Array>[] = [];
const errors: Error[] = [];
const pool = Pool(
() =>
spawn<DecryptKeystoreWorkerAPI>(new Worker("./worker.js"), {
// The number below is big enough to almost disable the timeout which helps during tests run on unpredictablely slow hosts
timeout: 5 * 60 * 1000,
}),
Math.min(keystoreCount, maxPoolSize)
);
const errors: KeystoreDecryptError[] = [];
const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal);

for (const [index, definition] of keystoreDefinitions.entries()) {
lockKeystore(definition.keystorePath, opts);

const task = pool.queue((thread) => thread.decryptKeystoreDefinition(definition));
tasks.push(task);
task
.then((secretKeyBytes: Uint8Array) => {
decryptKeystores.queue(
definition,
(secretKeyBytes: Uint8Array) => {
const signer: SignerLocal = {
type: SignerType.Local,
secretKey: bls.SecretKey.fromBytes(secretKeyBytes),
Expand All @@ -66,31 +72,27 @@ export async function decryptKeystoreDefinitions(
if (opts?.onDecrypt) {
opts?.onDecrypt(index);
}
})
.catch((e: Error) => {
},
(error: Error) => {
// In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught
// add to the list of errors
errors.push(e);
errors.push({keystoreFile: path.basename(definition.keystorePath), error});
// cancel all pending tasks, no need to continue decrypting after we hit one error
for (const task of tasks) {
task.cancel();
}
});
decryptKeystores.cancel();
}
);
}

try {
// only resolves if there are no errored tasks
await pool.completed(true);
} catch (e) {
await decryptKeystores.completed();

if (errors.length > 0) {
// If an error occurs, the program isn't going to be running,
// so we should unlock all lockfiles we created
for (const {keystorePath} of keystoreDefinitions) {
unlockFilepath(keystorePath);
}

throw new AggregateError(errors);
} finally {
await pool.terminate();
throw formattedError(errors, signers, keystoreCount);
}

if (opts.cacheFilePath) {
Expand All @@ -114,3 +116,35 @@ function lockKeystore(keystorePath: string, opts: KeystoreDecryptOptions): void
}
}
}

function formattedError(errors: KeystoreDecryptError[], signers: SignerLocal[], keystoreCount: number): Error {
// Filter out errors due to terminating the thread pool
// https://github.com/ChainSafe/threads.js/blob/df351552cb7d08b8465f5d1e7c543c952d74ac67/src/master/pool.ts#L244
const decryptErrors = errors.filter(({error}) => !error.message.startsWith("Pool has been terminated"));

const errorCount = decryptErrors.length;
const decryptedCount = signers.filter(Boolean).length;
const abortedCount = keystoreCount - errorCount - decryptedCount;

let message = "Error importing keystores";

if (errorCount === 1) {
const {keystoreFile, error} = decryptErrors[0];
message = `Error importing keystore\n\n${keystoreFile}: ${error.message}`;
} else if (errorCount > 1) {
message =
"Multiple errors importing keystores\n\n" +
decryptErrors.map(({keystoreFile, error}) => `${keystoreFile}: ${error.message}`).join("\n");
}

if (abortedCount > 0) {
message += `\n\nAborted ${abortedCount} pending keystore import${abortedCount > 1 ? "s" : ""}`;
}

const error = new Error(message);

// Don't print out stack trace
error.stack = message;

return error;
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export {DecryptKeystoresThreadPool} from "./threadPool.js";
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import {spawn, Pool, Worker, ModuleThread, QueuedTask} from "@chainsafe/threads";
import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI} from "./types.js";
import {maxPoolSize} from "./poolSize.js";

/**
* Thread pool to decrypt keystores
*/
export class DecryptKeystoresThreadPool {
private pool: Pool<ModuleThread<DecryptKeystoreWorkerAPI>>;
private tasks: QueuedTask<ModuleThread<DecryptKeystoreWorkerAPI>, Uint8Array>[] = [];
private terminatePoolHandler: () => void;

constructor(keystoreCount: number, private readonly signal: AbortSignal) {
this.pool = Pool(
() =>
spawn<DecryptKeystoreWorkerAPI>(new Worker("./worker.js"), {
// The number below is big enough to almost disable the timeout
// which helps during tests run on unpredictably slow hosts
timeout: 5 * 60 * 1000,
}),
{
// Adjust worker pool size based on keystore count
size: Math.min(keystoreCount, maxPoolSize),
// Decrypt keystores in sequence, increasing concurrency does not improve performance
concurrency: 1,
}
);
// Terminate worker threads when process receives exit signal
this.terminatePoolHandler = () => {
void this.pool.terminate(true);
};
signal.addEventListener("abort", this.terminatePoolHandler);
}

/**
* Add keystore to the task queue to be decrypted
*/
queue(
args: DecryptKeystoreArgs,
onDecrypted: (secretKeyBytes: Uint8Array) => void,
onError: (e: Error) => void
): void {
const task = this.pool.queue((thread) => thread.decryptKeystore(args));
this.tasks.push(task);
task.then(onDecrypted).catch(onError);
}

/**
* Resolves once all queued tasks are completed and terminates worker threads.
* Errors during executing can be captured in `onError` handler for each task.
*/
async completed(): Promise<void> {
await this.pool.settled(true);
await this.pool.terminate();
this.signal.removeEventListener("abort", this.terminatePoolHandler);
}

/**
* Cancel all pending tasks
*/
cancel(): void {
for (const task of this.tasks) {
task.cancel();
}
this.tasks = [];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {KeystoreStr} from "@lodestar/api/keymanager";
import {LocalKeystoreDefinition} from "../interface.js";

export type DecryptKeystoreWorkerAPI = {
decryptKeystore(args: DecryptKeystoreArgs): Promise<Uint8Array>;
};

export type DecryptKeystoreArgs = LocalKeystoreDefinition | {keystoreStr: KeystoreStr; password: string};

export function isLocalKeystoreDefinition(args: DecryptKeystoreArgs): args is LocalKeystoreDefinition {
return (args as LocalKeystoreDefinition).keystorePath !== undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@ import fs from "node:fs";
import {expose} from "@chainsafe/threads/worker";
import {Transfer, TransferDescriptor} from "@chainsafe/threads";
import {Keystore} from "@chainsafe/bls-keystore";
import {LocalKeystoreDefinition} from "../interface.js";
import {DecryptKeystoreWorkerAPI} from "./types.js";
import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI, isLocalKeystoreDefinition} from "./types.js";

/**
* Decrypt a single keystore definition, returning the secret key as a Uint8Array
* Decrypt a single keystore, returning the secret key as a Uint8Array
*
* NOTE: This is a memory (and cpu) -intensive process, since decrypting the keystore involves running a key derivation function (either pbkdf2 or scrypt)
*/
export async function decryptKeystoreDefinition({
keystorePath,
password,
}: LocalKeystoreDefinition): Promise<TransferDescriptor<Uint8Array>> {
const keystore = Keystore.parse(fs.readFileSync(keystorePath, "utf8"));
export async function decryptKeystore(args: DecryptKeystoreArgs): Promise<TransferDescriptor<Uint8Array>> {
const keystore = Keystore.parse(
isLocalKeystoreDefinition(args) ? fs.readFileSync(args.keystorePath, "utf8") : args.keystoreStr
);

// Memory-hogging function
const secret = await keystore.decrypt(password);
const secret = await keystore.decrypt(args.password);
// Transfer the underlying ArrayBuffer back to the main thread: https://threads.js.org/usage-advanced#transferable-objects
// This small performance gain may help in cases where this is run for many keystores
return Transfer(secret, [secret.buffer]);
}

expose({decryptKeystoreDefinition} as unknown as DecryptKeystoreWorkerAPI);
expose({decryptKeystore} as unknown as DecryptKeystoreWorkerAPI);
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ export async function readPassphraseOrPrompt(args: {importKeystoresPassword?: st
},
]);

// eslint-disable-next-line no-console
console.log("Password is correct");

return answers.password;
}
}
Expand Down
4 changes: 3 additions & 1 deletion packages/cli/src/cmds/validator/signers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {assertValidPubkeysHex, isValidHttpUrl, parseRange, YargsError} from "../
import {getAccountPaths} from "../paths.js";
import {IValidatorCliArgs} from "../options.js";
import {PersistedKeysBackend} from "../keymanager/persistedKeys.js";
import {decryptKeystoreDefinitions} from "../keymanager/decryptKeystoreDefinitions/index.js";
import {decryptKeystoreDefinitions} from "../keymanager/decryptKeystoreDefinitions.js";
import {showProgress} from "../../../util/progress.js";
import {importKeystoreDefinitionsFromExternalDir, readPassphraseOrPrompt} from "./importExternalKeystores.js";

Expand Down Expand Up @@ -100,6 +100,7 @@ export async function getSignersFromArgs(
onDecrypt: needle,
cacheFilePath: path.join(accountPaths.cacheDir, "imported_keystores.cache"),
logger,
signal,
});
}

Expand Down Expand Up @@ -133,6 +134,7 @@ export async function getSignersFromArgs(
onDecrypt: needle,
cacheFilePath: path.join(accountPaths.cacheDir, "local_keystores.cache"),
logger,
signal,
});

// Read local remote keys, imported via keymanager api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import {expect} from "chai";
import {cachedSeckeysHex} from "../../utils/cachedKeys.js";
import {getKeystoresStr} from "../../utils/keystores.js";
import {testFilesDir} from "../../utils.js";
import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions/index.js";
import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js";
import {LocalKeystoreDefinition} from "../../../src/cmds/validator/keymanager/interface.js";

describe("decryptKeystoreDefinitions", function () {
this.timeout(100_000);

const signal = new AbortController().signal;
const dataDir = path.join(testFilesDir, "decrypt-keystores-test");
const importFromDir = path.join(dataDir, "eth2.0_deposit_out");

Expand Down Expand Up @@ -43,7 +44,7 @@ describe("decryptKeystoreDefinitions", function () {

beforeEach(async () => {
// create cache file to ensure keystores are loaded from cache during tests
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath, signal});
expect(fs.existsSync(cacheFilePath)).to.be.true;

// remove lockfiles created during cache file preparation
Expand All @@ -59,7 +60,7 @@ describe("decryptKeystoreDefinitions", function () {

function testDecryptKeystoreDefinitions(cacheFilePath?: string): void {
it("decrypt keystores", async () => {
const signers = await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
expect(signers.length).to.equal(secretKeys.length);
for (const signer of signers) {
const hexSecret = signer.secretKey.toHex();
Expand All @@ -68,22 +69,22 @@ describe("decryptKeystoreDefinitions", function () {
});

it("fail to decrypt keystores if lockfiles already exist", async () => {
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
// lockfiles should exist after the first run

try {
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
expect.fail("Second decrypt should fail due to failure to get lockfile");
} catch (e) {
expect((e as Error).message.startsWith("EEXIST: file already exists"), "Wrong error is thrown").to.be.true;
}
});

it("decrypt keystores if lockfiles already exist if ignoreLockFile=true", async () => {
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
// lockfiles should exist after the first run

await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath, ignoreLockFile: true});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath, ignoreLockFile: true});
});
}
});

0 comments on commit a529ddf

Please sign in to comment.