From a529ddf37dc56dfe700320936998796b8f9ffadf Mon Sep 17 00:00:00 2001 From: Nico Flaig Date: Thu, 8 Jun 2023 00:01:34 +0200 Subject: [PATCH] refactor: reusable thread pool to decrypt keystores --- packages/cli/src/cmds/validator/handler.ts | 14 +-- ...index.ts => decryptKeystoreDefinitions.ts} | 104 ++++++++++++------ .../decryptKeystoreDefinitions/types.ts | 14 --- .../keymanager/decryptKeystores/index.ts | 1 + .../poolSize.ts | 0 .../keymanager/decryptKeystores/threadPool.ts | 67 +++++++++++ .../keymanager/decryptKeystores/types.ts | 12 ++ .../worker.ts | 18 ++- .../signers/importExternalKeystores.ts | 3 - .../cli/src/cmds/validator/signers/index.ts | 4 +- .../decryptKeystoreDefinitions.test.ts | 15 +-- 11 files changed, 175 insertions(+), 77 deletions(-) rename packages/cli/src/cmds/validator/keymanager/{decryptKeystoreDefinitions/index.ts => decryptKeystoreDefinitions.ts} (50%) delete mode 100644 packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/types.ts create mode 100644 packages/cli/src/cmds/validator/keymanager/decryptKeystores/index.ts rename packages/cli/src/cmds/validator/keymanager/{decryptKeystoreDefinitions => decryptKeystores}/poolSize.ts (100%) create mode 100644 packages/cli/src/cmds/validator/keymanager/decryptKeystores/threadPool.ts create mode 100644 packages/cli/src/cmds/validator/keymanager/decryptKeystores/types.ts rename packages/cli/src/cmds/validator/keymanager/{decryptKeystoreDefinitions => decryptKeystores}/worker.ts (54%) diff --git a/packages/cli/src/cmds/validator/handler.ts b/packages/cli/src/cmds/validator/handler.ts index 7ac1d9304b88..8b2f88821939 100644 --- a/packages/cli/src/cmds/validator/handler.ts +++ b/packages/cli/src/cmds/validator/handler.ts @@ -74,6 +74,13 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr // This AbortController interrupts various validators ops: genesis req, clients call, clock etc const abortController = new AbortController(); + // We set infinity for abort controller used for validator operations, + // to prevent MaxListenersExceededWarning which get logged when listeners > 10 + // Since it is perfectly fine to have listeners > 10 + setMaxListeners(Infinity, abortController.signal); + + onGracefulShutdownCbs.push(async () => abortController.abort()); + /** * For rationale and documentation of how signers are loaded from args and disk, * see {@link PersistedKeysBackend} and {@link getSignersFromArgs} @@ -95,13 +102,6 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr logSigners(logger, signers); - // We set infinity for abort controller used for validator operations, - // to prevent MaxListenersExceededWarning which get logged when listeners > 10 - // Since it is perfectly fine to have listeners > 10 - setMaxListeners(Infinity, abortController.signal); - - onGracefulShutdownCbs.push(async () => abortController.abort()); - const dbOps = { config, controller: new LevelDbController({name: dbPath}, {metrics: null, logger}), diff --git a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/index.ts b/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions.ts similarity index 50% rename from packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/index.ts rename to packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions.ts index 560475884c14..9121945046d8 100644 --- a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/index.ts +++ b/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions.ts @@ -1,14 +1,28 @@ -import {spawn, ModuleThread, Pool, QueuedTask, Worker} from "@chainsafe/threads"; +import path from "node:path"; import {SignerLocal, SignerType} from "@lodestar/validator"; +import {LogLevel, Logger} from "@lodestar/utils"; import bls from "@chainsafe/bls"; -import {LocalKeystoreDefinition} from "../interface.js"; -import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "../keystoreCache.js"; -import {lockFilepath, unlockFilepath} from "../../../../util/lockfile.js"; -import {maxPoolSize} from "./poolSize.js"; -import {DecryptKeystoreWorkerAPI, KeystoreDecryptOptions} from "./types.js"; +import {lockFilepath, unlockFilepath} from "../../../util/lockfile.js"; +import {LocalKeystoreDefinition} from "./interface.js"; +import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "./keystoreCache.js"; +import {DecryptKeystoresThreadPool} from "./decryptKeystores/index.js"; + +type KeystoreDecryptOptions = { + ignoreLockFile?: boolean; + onDecrypt?: (index: number) => void; + // Try to use the cache file if it exists + cacheFilePath?: string; + logger: Pick; + signal: AbortSignal; +}; + +type KeystoreDecryptError = { + keystoreFile: string; + error: Error; +}; /** - * Decrypt keystore definitions using a threadpool + * Decrypt keystore definitions using a thread pool */ export async function decryptKeystoreDefinitions( keystoreDefinitions: LocalKeystoreDefinition[], @@ -38,23 +52,15 @@ export async function decryptKeystoreDefinitions( const keystoreCount = keystoreDefinitions.length; const signers = new Array(keystoreCount); const passwords = new Array(keystoreCount); - const tasks: QueuedTask, Uint8Array>[] = []; - const errors: Error[] = []; - const pool = Pool( - () => - spawn(new Worker("./worker.js"), { - // The number below is big enough to almost disable the timeout which helps during tests run on unpredictablely slow hosts - timeout: 5 * 60 * 1000, - }), - Math.min(keystoreCount, maxPoolSize) - ); + const errors: KeystoreDecryptError[] = []; + const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal); + for (const [index, definition] of keystoreDefinitions.entries()) { lockKeystore(definition.keystorePath, opts); - const task = pool.queue((thread) => thread.decryptKeystoreDefinition(definition)); - tasks.push(task); - task - .then((secretKeyBytes: Uint8Array) => { + decryptKeystores.queue( + definition, + (secretKeyBytes: Uint8Array) => { const signer: SignerLocal = { type: SignerType.Local, secretKey: bls.SecretKey.fromBytes(secretKeyBytes), @@ -66,31 +72,27 @@ export async function decryptKeystoreDefinitions( if (opts?.onDecrypt) { opts?.onDecrypt(index); } - }) - .catch((e: Error) => { + }, + (error: Error) => { // In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught // add to the list of errors - errors.push(e); + errors.push({keystoreFile: path.basename(definition.keystorePath), error}); // cancel all pending tasks, no need to continue decrypting after we hit one error - for (const task of tasks) { - task.cancel(); - } - }); + decryptKeystores.cancel(); + } + ); } - try { - // only resolves if there are no errored tasks - await pool.completed(true); - } catch (e) { + await decryptKeystores.completed(); + + if (errors.length > 0) { // If an error occurs, the program isn't going to be running, // so we should unlock all lockfiles we created for (const {keystorePath} of keystoreDefinitions) { unlockFilepath(keystorePath); } - throw new AggregateError(errors); - } finally { - await pool.terminate(); + throw formattedError(errors, signers, keystoreCount); } if (opts.cacheFilePath) { @@ -114,3 +116,35 @@ function lockKeystore(keystorePath: string, opts: KeystoreDecryptOptions): void } } } + +function formattedError(errors: KeystoreDecryptError[], signers: SignerLocal[], keystoreCount: number): Error { + // Filter out errors due to terminating the thread pool + // https://github.com/ChainSafe/threads.js/blob/df351552cb7d08b8465f5d1e7c543c952d74ac67/src/master/pool.ts#L244 + const decryptErrors = errors.filter(({error}) => !error.message.startsWith("Pool has been terminated")); + + const errorCount = decryptErrors.length; + const decryptedCount = signers.filter(Boolean).length; + const abortedCount = keystoreCount - errorCount - decryptedCount; + + let message = "Error importing keystores"; + + if (errorCount === 1) { + const {keystoreFile, error} = decryptErrors[0]; + message = `Error importing keystore\n\n${keystoreFile}: ${error.message}`; + } else if (errorCount > 1) { + message = + "Multiple errors importing keystores\n\n" + + decryptErrors.map(({keystoreFile, error}) => `${keystoreFile}: ${error.message}`).join("\n"); + } + + if (abortedCount > 0) { + message += `\n\nAborted ${abortedCount} pending keystore import${abortedCount > 1 ? "s" : ""}`; + } + + const error = new Error(message); + + // Don't print out stack trace + error.stack = message; + + return error; +} diff --git a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/types.ts b/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/types.ts deleted file mode 100644 index 9ebe9b83a878..000000000000 --- a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/types.ts +++ /dev/null @@ -1,14 +0,0 @@ -import {LogLevel, Logger} from "@lodestar/utils"; -import {LocalKeystoreDefinition} from "../interface.js"; - -export type DecryptKeystoreWorkerAPI = { - decryptKeystoreDefinition({keystorePath, password}: LocalKeystoreDefinition): Promise; -}; - -export type KeystoreDecryptOptions = { - ignoreLockFile?: boolean; - onDecrypt?: (index: number) => void; - // Try to use the cache file if it exists - cacheFilePath?: string; - logger: Pick; -}; diff --git a/packages/cli/src/cmds/validator/keymanager/decryptKeystores/index.ts b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/index.ts new file mode 100644 index 000000000000..2c17cc4fbb5f --- /dev/null +++ b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/index.ts @@ -0,0 +1 @@ +export {DecryptKeystoresThreadPool} from "./threadPool.js"; diff --git a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/poolSize.ts b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/poolSize.ts similarity index 100% rename from packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/poolSize.ts rename to packages/cli/src/cmds/validator/keymanager/decryptKeystores/poolSize.ts diff --git a/packages/cli/src/cmds/validator/keymanager/decryptKeystores/threadPool.ts b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/threadPool.ts new file mode 100644 index 000000000000..e622baa49b66 --- /dev/null +++ b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/threadPool.ts @@ -0,0 +1,67 @@ +import {spawn, Pool, Worker, ModuleThread, QueuedTask} from "@chainsafe/threads"; +import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI} from "./types.js"; +import {maxPoolSize} from "./poolSize.js"; + +/** + * Thread pool to decrypt keystores + */ +export class DecryptKeystoresThreadPool { + private pool: Pool>; + private tasks: QueuedTask, Uint8Array>[] = []; + private terminatePoolHandler: () => void; + + constructor(keystoreCount: number, private readonly signal: AbortSignal) { + this.pool = Pool( + () => + spawn(new Worker("./worker.js"), { + // The number below is big enough to almost disable the timeout + // which helps during tests run on unpredictably slow hosts + timeout: 5 * 60 * 1000, + }), + { + // Adjust worker pool size based on keystore count + size: Math.min(keystoreCount, maxPoolSize), + // Decrypt keystores in sequence, increasing concurrency does not improve performance + concurrency: 1, + } + ); + // Terminate worker threads when process receives exit signal + this.terminatePoolHandler = () => { + void this.pool.terminate(true); + }; + signal.addEventListener("abort", this.terminatePoolHandler); + } + + /** + * Add keystore to the task queue to be decrypted + */ + queue( + args: DecryptKeystoreArgs, + onDecrypted: (secretKeyBytes: Uint8Array) => void, + onError: (e: Error) => void + ): void { + const task = this.pool.queue((thread) => thread.decryptKeystore(args)); + this.tasks.push(task); + task.then(onDecrypted).catch(onError); + } + + /** + * Resolves once all queued tasks are completed and terminates worker threads. + * Errors during executing can be captured in `onError` handler for each task. + */ + async completed(): Promise { + await this.pool.settled(true); + await this.pool.terminate(); + this.signal.removeEventListener("abort", this.terminatePoolHandler); + } + + /** + * Cancel all pending tasks + */ + cancel(): void { + for (const task of this.tasks) { + task.cancel(); + } + this.tasks = []; + } +} diff --git a/packages/cli/src/cmds/validator/keymanager/decryptKeystores/types.ts b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/types.ts new file mode 100644 index 000000000000..79a1fa538042 --- /dev/null +++ b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/types.ts @@ -0,0 +1,12 @@ +import {KeystoreStr} from "@lodestar/api/keymanager"; +import {LocalKeystoreDefinition} from "../interface.js"; + +export type DecryptKeystoreWorkerAPI = { + decryptKeystore(args: DecryptKeystoreArgs): Promise; +}; + +export type DecryptKeystoreArgs = LocalKeystoreDefinition | {keystoreStr: KeystoreStr; password: string}; + +export function isLocalKeystoreDefinition(args: DecryptKeystoreArgs): args is LocalKeystoreDefinition { + return (args as LocalKeystoreDefinition).keystorePath !== undefined; +} diff --git a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/worker.ts b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/worker.ts similarity index 54% rename from packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/worker.ts rename to packages/cli/src/cmds/validator/keymanager/decryptKeystores/worker.ts index fe655dbf3d0b..711ec4f88b00 100644 --- a/packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions/worker.ts +++ b/packages/cli/src/cmds/validator/keymanager/decryptKeystores/worker.ts @@ -2,25 +2,23 @@ import fs from "node:fs"; import {expose} from "@chainsafe/threads/worker"; import {Transfer, TransferDescriptor} from "@chainsafe/threads"; import {Keystore} from "@chainsafe/bls-keystore"; -import {LocalKeystoreDefinition} from "../interface.js"; -import {DecryptKeystoreWorkerAPI} from "./types.js"; +import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI, isLocalKeystoreDefinition} from "./types.js"; /** - * Decrypt a single keystore definition, returning the secret key as a Uint8Array + * Decrypt a single keystore, returning the secret key as a Uint8Array * * NOTE: This is a memory (and cpu) -intensive process, since decrypting the keystore involves running a key derivation function (either pbkdf2 or scrypt) */ -export async function decryptKeystoreDefinition({ - keystorePath, - password, -}: LocalKeystoreDefinition): Promise> { - const keystore = Keystore.parse(fs.readFileSync(keystorePath, "utf8")); +export async function decryptKeystore(args: DecryptKeystoreArgs): Promise> { + const keystore = Keystore.parse( + isLocalKeystoreDefinition(args) ? fs.readFileSync(args.keystorePath, "utf8") : args.keystoreStr + ); // Memory-hogging function - const secret = await keystore.decrypt(password); + const secret = await keystore.decrypt(args.password); // Transfer the underlying ArrayBuffer back to the main thread: https://threads.js.org/usage-advanced#transferable-objects // This small performance gain may help in cases where this is run for many keystores return Transfer(secret, [secret.buffer]); } -expose({decryptKeystoreDefinition} as unknown as DecryptKeystoreWorkerAPI); +expose({decryptKeystore} as unknown as DecryptKeystoreWorkerAPI); diff --git a/packages/cli/src/cmds/validator/signers/importExternalKeystores.ts b/packages/cli/src/cmds/validator/signers/importExternalKeystores.ts index 3984ccfa26ce..7b90d16d1d88 100644 --- a/packages/cli/src/cmds/validator/signers/importExternalKeystores.ts +++ b/packages/cli/src/cmds/validator/signers/importExternalKeystores.ts @@ -37,9 +37,6 @@ export async function readPassphraseOrPrompt(args: {importKeystoresPassword?: st }, ]); - // eslint-disable-next-line no-console - console.log("Password is correct"); - return answers.password; } } diff --git a/packages/cli/src/cmds/validator/signers/index.ts b/packages/cli/src/cmds/validator/signers/index.ts index fb8f2aedbf9b..a14516e4c86a 100644 --- a/packages/cli/src/cmds/validator/signers/index.ts +++ b/packages/cli/src/cmds/validator/signers/index.ts @@ -10,7 +10,7 @@ import {assertValidPubkeysHex, isValidHttpUrl, parseRange, YargsError} from "../ import {getAccountPaths} from "../paths.js"; import {IValidatorCliArgs} from "../options.js"; import {PersistedKeysBackend} from "../keymanager/persistedKeys.js"; -import {decryptKeystoreDefinitions} from "../keymanager/decryptKeystoreDefinitions/index.js"; +import {decryptKeystoreDefinitions} from "../keymanager/decryptKeystoreDefinitions.js"; import {showProgress} from "../../../util/progress.js"; import {importKeystoreDefinitionsFromExternalDir, readPassphraseOrPrompt} from "./importExternalKeystores.js"; @@ -100,6 +100,7 @@ export async function getSignersFromArgs( onDecrypt: needle, cacheFilePath: path.join(accountPaths.cacheDir, "imported_keystores.cache"), logger, + signal, }); } @@ -133,6 +134,7 @@ export async function getSignersFromArgs( onDecrypt: needle, cacheFilePath: path.join(accountPaths.cacheDir, "local_keystores.cache"), logger, + signal, }); // Read local remote keys, imported via keymanager api diff --git a/packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts b/packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts index f3922cfeb83b..296105012b69 100644 --- a/packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts +++ b/packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts @@ -5,12 +5,13 @@ import {expect} from "chai"; import {cachedSeckeysHex} from "../../utils/cachedKeys.js"; import {getKeystoresStr} from "../../utils/keystores.js"; import {testFilesDir} from "../../utils.js"; -import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions/index.js"; +import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js"; import {LocalKeystoreDefinition} from "../../../src/cmds/validator/keymanager/interface.js"; describe("decryptKeystoreDefinitions", function () { this.timeout(100_000); + const signal = new AbortController().signal; const dataDir = path.join(testFilesDir, "decrypt-keystores-test"); const importFromDir = path.join(dataDir, "eth2.0_deposit_out"); @@ -43,7 +44,7 @@ describe("decryptKeystoreDefinitions", function () { beforeEach(async () => { // create cache file to ensure keystores are loaded from cache during tests - await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath}); + await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath, signal}); expect(fs.existsSync(cacheFilePath)).to.be.true; // remove lockfiles created during cache file preparation @@ -59,7 +60,7 @@ describe("decryptKeystoreDefinitions", function () { function testDecryptKeystoreDefinitions(cacheFilePath?: string): void { it("decrypt keystores", async () => { - const signers = await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath}); + const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath}); expect(signers.length).to.equal(secretKeys.length); for (const signer of signers) { const hexSecret = signer.secretKey.toHex(); @@ -68,11 +69,11 @@ describe("decryptKeystoreDefinitions", function () { }); it("fail to decrypt keystores if lockfiles already exist", async () => { - await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath}); + await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath}); // lockfiles should exist after the first run try { - await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath}); + await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath}); expect.fail("Second decrypt should fail due to failure to get lockfile"); } catch (e) { expect((e as Error).message.startsWith("EEXIST: file already exists"), "Wrong error is thrown").to.be.true; @@ -80,10 +81,10 @@ describe("decryptKeystoreDefinitions", function () { }); it("decrypt keystores if lockfiles already exist if ignoreLockFile=true", async () => { - await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath}); + await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath}); // lockfiles should exist after the first run - await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath, ignoreLockFile: true}); + await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath, ignoreLockFile: true}); }); } });