Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reusable thread pool to decrypt keystores #5623

Merged
merged 1 commit into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
// This AbortController interrupts various validators ops: genesis req, clients call, clock etc
const abortController = new AbortController();

// We set infinity for abort controller used for validator operations,
// to prevent MaxListenersExceededWarning which get logged when listeners > 10
// Since it is perfectly fine to have listeners > 10
setMaxListeners(Infinity, abortController.signal);

onGracefulShutdownCbs.push(async () => abortController.abort());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abort has to be pushed to graceful shutdown callbacks before invoking getSignersFromArgs.


/**
* For rationale and documentation of how signers are loaded from args and disk,
* see {@link PersistedKeysBackend} and {@link getSignersFromArgs}
Expand All @@ -95,13 +102,6 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr

logSigners(logger, signers);

// We set infinity for abort controller used for validator operations,
// to prevent MaxListenersExceededWarning which get logged when listeners > 10
// Since it is perfectly fine to have listeners > 10
setMaxListeners(Infinity, abortController.signal);

onGracefulShutdownCbs.push(async () => abortController.abort());

const dbOps = {
config,
controller: new LevelDbController({name: dbPath}, {metrics: null, logger}),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
import {spawn, ModuleThread, Pool, QueuedTask, Worker} from "@chainsafe/threads";
import path from "node:path";
import {SignerLocal, SignerType} from "@lodestar/validator";
import {LogLevel, Logger} from "@lodestar/utils";
import bls from "@chainsafe/bls";
import {LocalKeystoreDefinition} from "../interface.js";
import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "../keystoreCache.js";
import {lockFilepath, unlockFilepath} from "../../../../util/lockfile.js";
import {maxPoolSize} from "./poolSize.js";
import {DecryptKeystoreWorkerAPI, KeystoreDecryptOptions} from "./types.js";
import {lockFilepath, unlockFilepath} from "../../../util/lockfile.js";
import {LocalKeystoreDefinition} from "./interface.js";
import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "./keystoreCache.js";
import {DecryptKeystoresThreadPool} from "./decryptKeystores/index.js";

type KeystoreDecryptOptions = {
ignoreLockFile?: boolean;
onDecrypt?: (index: number) => void;
// Try to use the cache file if it exists
cacheFilePath?: string;
logger: Pick<Logger, LogLevel.info | LogLevel.warn | LogLevel.debug>;
signal: AbortSignal;
};

type KeystoreDecryptError = {
keystoreFile: string;
error: Error;
};

/**
* Decrypt keystore definitions using a threadpool
* Decrypt keystore definitions using a thread pool
*/
export async function decryptKeystoreDefinitions(
keystoreDefinitions: LocalKeystoreDefinition[],
Expand Down Expand Up @@ -38,23 +52,15 @@ export async function decryptKeystoreDefinitions(
const keystoreCount = keystoreDefinitions.length;
const signers = new Array<SignerLocal>(keystoreCount);
const passwords = new Array<string>(keystoreCount);
const tasks: QueuedTask<ModuleThread<DecryptKeystoreWorkerAPI>, Uint8Array>[] = [];
const errors: Error[] = [];
const pool = Pool(
() =>
spawn<DecryptKeystoreWorkerAPI>(new Worker("./worker.js"), {
// The number below is big enough to almost disable the timeout which helps during tests run on unpredictablely slow hosts
timeout: 5 * 60 * 1000,
}),
Math.min(keystoreCount, maxPoolSize)
);
const errors: KeystoreDecryptError[] = [];
const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal);

for (const [index, definition] of keystoreDefinitions.entries()) {
lockKeystore(definition.keystorePath, opts);

const task = pool.queue((thread) => thread.decryptKeystoreDefinition(definition));
tasks.push(task);
task
.then((secretKeyBytes: Uint8Array) => {
decryptKeystores.queue(
definition,
(secretKeyBytes: Uint8Array) => {
const signer: SignerLocal = {
type: SignerType.Local,
secretKey: bls.SecretKey.fromBytes(secretKeyBytes),
Expand All @@ -66,31 +72,27 @@ export async function decryptKeystoreDefinitions(
if (opts?.onDecrypt) {
opts?.onDecrypt(index);
}
})
.catch((e: Error) => {
},
(error: Error) => {
// In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught
// add to the list of errors
errors.push(e);
errors.push({keystoreFile: path.basename(definition.keystorePath), error});
// cancel all pending tasks, no need to continue decrypting after we hit one error
for (const task of tasks) {
task.cancel();
}
});
decryptKeystores.cancel();
}
);
}

try {
// only resolves if there are no errored tasks
await pool.completed(true);
} catch (e) {
await decryptKeystores.completed();

if (errors.length > 0) {
// If an error occurs, the program isn't going to be running,
// so we should unlock all lockfiles we created
for (const {keystorePath} of keystoreDefinitions) {
unlockFilepath(keystorePath);
}

throw new AggregateError(errors);
} finally {
await pool.terminate();
throw formattedError(errors, signers, keystoreCount);
}

if (opts.cacheFilePath) {
Expand All @@ -114,3 +116,35 @@ function lockKeystore(keystorePath: string, opts: KeystoreDecryptOptions): void
}
}
}

function formattedError(errors: KeystoreDecryptError[], signers: SignerLocal[], keystoreCount: number): Error {
// Filter out errors due to terminating the thread pool
// https://github.com/ChainSafe/threads.js/blob/df351552cb7d08b8465f5d1e7c543c952d74ac67/src/master/pool.ts#L244
const decryptErrors = errors.filter(({error}) => !error.message.startsWith("Pool has been terminated"));

const errorCount = decryptErrors.length;
const decryptedCount = signers.filter(Boolean).length;
const abortedCount = keystoreCount - errorCount - decryptedCount;

let message = "Error importing keystores";

if (errorCount === 1) {
const {keystoreFile, error} = decryptErrors[0];
message = `Error importing keystore\n\n${keystoreFile}: ${error.message}`;
} else if (errorCount > 1) {
message =
"Multiple errors importing keystores\n\n" +
decryptErrors.map(({keystoreFile, error}) => `${keystoreFile}: ${error.message}`).join("\n");
}

if (abortedCount > 0) {
message += `\n\nAborted ${abortedCount} pending keystore import${abortedCount > 1 ? "s" : ""}`;
}

const error = new Error(message);

// Don't print out stack trace
error.stack = message;

return error;
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export {DecryptKeystoresThreadPool} from "./threadPool.js";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting DEBUG=threads:pool:* is pretty helpful to analyse thread and task lifecycle

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import {spawn, Pool, Worker, ModuleThread, QueuedTask} from "@chainsafe/threads";
import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI} from "./types.js";
import {maxPoolSize} from "./poolSize.js";

/**
* Thread pool to decrypt keystores
*/
export class DecryptKeystoresThreadPool {
private pool: Pool<ModuleThread<DecryptKeystoreWorkerAPI>>;
private tasks: QueuedTask<ModuleThread<DecryptKeystoreWorkerAPI>, Uint8Array>[] = [];
private terminatePoolHandler: () => void;

constructor(keystoreCount: number, private readonly signal: AbortSignal) {
this.pool = Pool(
() =>
spawn<DecryptKeystoreWorkerAPI>(new Worker("./worker.js"), {
// The number below is big enough to almost disable the timeout
// which helps during tests run on unpredictably slow hosts
timeout: 5 * 60 * 1000,
}),
{
// Adjust worker pool size based on keystore count
size: Math.min(keystoreCount, maxPoolSize),
// Decrypt keystores in sequence, increasing concurrency does not improve performance
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did several test runs with different concurrency but it seems to just overload the threads and throttle the CPU

concurrency: 1,
}
);
// Terminate worker threads when process receives exit signal
this.terminatePoolHandler = () => {
void this.pool.terminate(true);
};
signal.addEventListener("abort", this.terminatePoolHandler);
}

/**
* Add keystore to the task queue to be decrypted
*/
queue(
args: DecryptKeystoreArgs,
onDecrypted: (secretKeyBytes: Uint8Array) => void,
onError: (e: Error) => void
): void {
const task = this.pool.queue((thread) => thread.decryptKeystore(args));
this.tasks.push(task);
task.then(onDecrypted).catch(onError);
}

/**
* Resolves once all queued tasks are completed and terminates worker threads.
* Errors during executing can be captured in `onError` handler for each task.
*/
async completed(): Promise<void> {
await this.pool.settled(true);
await this.pool.terminate();
this.signal.removeEventListener("abort", this.terminatePoolHandler);
}

/**
* Cancel all pending tasks
*/
cancel(): void {
for (const task of this.tasks) {
task.cancel();
}
this.tasks = [];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {KeystoreStr} from "@lodestar/api/keymanager";
import {LocalKeystoreDefinition} from "../interface.js";

export type DecryptKeystoreWorkerAPI = {
decryptKeystore(args: DecryptKeystoreArgs): Promise<Uint8Array>;
};

export type DecryptKeystoreArgs = LocalKeystoreDefinition | {keystoreStr: KeystoreStr; password: string};

export function isLocalKeystoreDefinition(args: DecryptKeystoreArgs): args is LocalKeystoreDefinition {
return (args as LocalKeystoreDefinition).keystorePath !== undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@ import fs from "node:fs";
import {expose} from "@chainsafe/threads/worker";
import {Transfer, TransferDescriptor} from "@chainsafe/threads";
import {Keystore} from "@chainsafe/bls-keystore";
import {LocalKeystoreDefinition} from "../interface.js";
import {DecryptKeystoreWorkerAPI} from "./types.js";
import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI, isLocalKeystoreDefinition} from "./types.js";

/**
* Decrypt a single keystore definition, returning the secret key as a Uint8Array
* Decrypt a single keystore, returning the secret key as a Uint8Array
*
* NOTE: This is a memory (and cpu) -intensive process, since decrypting the keystore involves running a key derivation function (either pbkdf2 or scrypt)
*/
export async function decryptKeystoreDefinition({
keystorePath,
password,
}: LocalKeystoreDefinition): Promise<TransferDescriptor<Uint8Array>> {
const keystore = Keystore.parse(fs.readFileSync(keystorePath, "utf8"));
export async function decryptKeystore(args: DecryptKeystoreArgs): Promise<TransferDescriptor<Uint8Array>> {
const keystore = Keystore.parse(
isLocalKeystoreDefinition(args) ? fs.readFileSync(args.keystorePath, "utf8") : args.keystoreStr
);

// Memory-hogging function
const secret = await keystore.decrypt(password);
const secret = await keystore.decrypt(args.password);
// Transfer the underlying ArrayBuffer back to the main thread: https://threads.js.org/usage-advanced#transferable-objects
// This small performance gain may help in cases where this is run for many keystores
return Transfer(secret, [secret.buffer]);
}

expose({decryptKeystoreDefinition} as unknown as DecryptKeystoreWorkerAPI);
expose({decryptKeystore} as unknown as DecryptKeystoreWorkerAPI);
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ export async function readPassphraseOrPrompt(args: {importKeystoresPassword?: st
},
]);

// eslint-disable-next-line no-console
console.log("Password is correct");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This console.log is incorrect and not helpful as password verification is done later in the process


return answers.password;
}
}
Expand Down
4 changes: 3 additions & 1 deletion packages/cli/src/cmds/validator/signers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {assertValidPubkeysHex, isValidHttpUrl, parseRange, YargsError} from "../
import {getAccountPaths} from "../paths.js";
import {IValidatorCliArgs} from "../options.js";
import {PersistedKeysBackend} from "../keymanager/persistedKeys.js";
import {decryptKeystoreDefinitions} from "../keymanager/decryptKeystoreDefinitions/index.js";
import {decryptKeystoreDefinitions} from "../keymanager/decryptKeystoreDefinitions.js";
import {showProgress} from "../../../util/progress.js";
import {importKeystoreDefinitionsFromExternalDir, readPassphraseOrPrompt} from "./importExternalKeystores.js";

Expand Down Expand Up @@ -100,6 +100,7 @@ export async function getSignersFromArgs(
onDecrypt: needle,
cacheFilePath: path.join(accountPaths.cacheDir, "imported_keystores.cache"),
logger,
signal,
});
}

Expand Down Expand Up @@ -133,6 +134,7 @@ export async function getSignersFromArgs(
onDecrypt: needle,
cacheFilePath: path.join(accountPaths.cacheDir, "local_keystores.cache"),
logger,
signal,
});

// Read local remote keys, imported via keymanager api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import {expect} from "chai";
import {cachedSeckeysHex} from "../../utils/cachedKeys.js";
import {getKeystoresStr} from "../../utils/keystores.js";
import {testFilesDir} from "../../utils.js";
import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions/index.js";
import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js";
import {LocalKeystoreDefinition} from "../../../src/cmds/validator/keymanager/interface.js";

describe("decryptKeystoreDefinitions", function () {
this.timeout(100_000);

const signal = new AbortController().signal;
const dataDir = path.join(testFilesDir, "decrypt-keystores-test");
const importFromDir = path.join(dataDir, "eth2.0_deposit_out");

Expand Down Expand Up @@ -43,7 +44,7 @@ describe("decryptKeystoreDefinitions", function () {

beforeEach(async () => {
// create cache file to ensure keystores are loaded from cache during tests
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath, signal});
expect(fs.existsSync(cacheFilePath)).to.be.true;

// remove lockfiles created during cache file preparation
Expand All @@ -59,7 +60,7 @@ describe("decryptKeystoreDefinitions", function () {

function testDecryptKeystoreDefinitions(cacheFilePath?: string): void {
it("decrypt keystores", async () => {
const signers = await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
expect(signers.length).to.equal(secretKeys.length);
for (const signer of signers) {
const hexSecret = signer.secretKey.toHex();
Expand All @@ -68,22 +69,22 @@ describe("decryptKeystoreDefinitions", function () {
});

it("fail to decrypt keystores if lockfiles already exist", async () => {
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
// lockfiles should exist after the first run

try {
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
expect.fail("Second decrypt should fail due to failure to get lockfile");
} catch (e) {
expect((e as Error).message.startsWith("EEXIST: file already exists"), "Wrong error is thrown").to.be.true;
}
});

it("decrypt keystores if lockfiles already exist if ignoreLockFile=true", async () => {
await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
// lockfiles should exist after the first run

await decryptKeystoreDefinitions(definitions, {logger: console, cacheFilePath, ignoreLockFile: true});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath, ignoreLockFile: true});
});
}
});