From 8c9b4e67611455313564ef74bd98673844a5600d Mon Sep 17 00:00:00 2001 From: officeyutong Date: Tue, 3 Dec 2024 23:34:58 +0800 Subject: [PATCH] Add lock for command invocation --- light-client-db-common/src/lib.rs | 2 +- light-client-db-worker/src/db.rs | 4 ++ light-client-js/package.json | 1 + light-client-js/src/index.ts | 49 ++++++++++++++++------ light-client-js/src/types.test.ts | 2 +- light-client-js/src/types.ts | 8 ++-- light-client-lib/src/storage/db/browser.rs | 5 +++ 7 files changed, 54 insertions(+), 17 deletions(-) diff --git a/light-client-db-common/src/lib.rs b/light-client-db-common/src/lib.rs index e393fe6..69ef4ab 100644 --- a/light-client-db-common/src/lib.rs +++ b/light-client-db-common/src/lib.rs @@ -8,7 +8,7 @@ pub struct KV { pub key: Vec, pub value: Vec, } -#[derive(Serialize, Deserialize, Default, Debug)] +#[derive(Serialize, Deserialize, Default, Debug, Clone, Copy)] /// A serializable CursorDirection pub enum CursorDirection { #[default] diff --git a/light-client-db-worker/src/db.rs b/light-client-db-worker/src/db.rs index 4db4ab2..4e58a4f 100644 --- a/light-client-db-worker/src/db.rs +++ b/light-client-db-worker/src/db.rs @@ -253,6 +253,10 @@ where ) .await .with_context(|| anyhow!("Failed to collect iterator"))?; + debug!( + "Called iterator, args=<{:?}, {:?}, {:?}, {:?}>, result={:?}", + start_key_bound, order, limit, skip, kvs + ); DbCommandResponse::Iterator { kvs } } DbCommandRequest::IteratorKey { diff --git a/light-client-js/package.json b/light-client-js/package.json index 3cfdbaf..1fc6154 100644 --- a/light-client-js/package.json +++ b/light-client-js/package.json @@ -15,6 +15,7 @@ }, "dependencies": { "@ckb-ccc/core": "0.1.0-alpha.6", + "async-mutex": "^0.5.0", "light-client-db-worker": "file:../light-client-db-worker", "light-client-wasm": "file:../light-client-wasm", "stream-browserify": "^3.0.0" diff --git a/light-client-js/src/index.ts b/light-client-js/src/index.ts index d92d9ba..e71ceeb 100644 --- a/light-client-js/src/index.ts +++ b/light-client-js/src/index.ts @@ -2,15 +2,18 @@ import { ClientFindCellsResponse, ClientFindTransactionsGroupedResponse, ClientF import { FetchResponse, LocalNode, localNodeTo, NetworkFlag, RemoteNode, remoteNodeTo, ScriptStatus, scriptStatusFrom, scriptStatusTo, LightClientSetScriptsCommand, transformFetchResponse, cccOrderToLightClientWasmOrder, GetTransactionsResponse, TxWithCell, TxWithCells, lightClientGetTransactionsResultTo, LightClientLocalNode, LightClientRemoteNode, LightClientScriptStatus } from "./types"; import { ClientBlock, ClientBlockHeader, Hex, hexFrom, HexLike, Num, numFrom, NumLike, numToHex, TransactionLike } from "@ckb-ccc/core/barrel"; import { JsonRpcBlockHeader, JsonRpcTransformers } from "@ckb-ccc/core/advancedBarrel"; +import { Mutex } from "async-mutex"; + const DEFAULT_BUFFER_SIZE = 50 * (1 << 20); /** * A LightClient instance */ class LightClient { - dbWorker: Worker | null - lightClientWorker: Worker | null - inputBuffer: SharedArrayBuffer - outputBuffer: SharedArrayBuffer + private dbWorker: Worker | null + private lightClientWorker: Worker | null + private inputBuffer: SharedArrayBuffer + private outputBuffer: SharedArrayBuffer + private commandInvokeLock: Mutex /** * Construct a LightClient instance. * inputBuffer and outputBuffer are buffers used for transporting data between database and light client. Set them to appropriate sizes. @@ -22,6 +25,7 @@ class LightClient { this.lightClientWorker = new Worker(new URL("./lightclient.worker.ts", import.meta.url), { type: "module" }); this.inputBuffer = new SharedArrayBuffer(inputBufferSize); this.outputBuffer = new SharedArrayBuffer(outputBufferSize); + this.commandInvokeLock = new Mutex(); } /** @@ -51,14 +55,35 @@ class LightClient { }); } private invokeLightClientCommand(name: string, args?: any[]): Promise { - this.lightClientWorker.postMessage({ - name, - args: args || [] - }); - return new Promise((res, rej) => { - this.lightClientWorker.onmessage = (e) => res(e.data); - this.lightClientWorker.onerror = (evt) => rej(evt); - }); + // Why use lock here? + // light-client-wasm provides synchronous APIs, means if we send a call request through postMessage, onmessage will be called only when the command call resolved. + // We use lock here to avoid multiple call to postMessage before onmessage fired, to avoid mixed result of different calls + // Since light-client-wasm is synchronous, we won't lose any performance by locking here + return this.commandInvokeLock.runExclusive(async () => { + this.lightClientWorker.postMessage({ + name, + args: args || [] + }); + return await new Promise((resolve, reject) => { + const clean = () => { + this.lightClientWorker.removeEventListener("message", resolveFn); + this.lightClientWorker.removeEventListener("error", errorFn); + } + const resolveFn = (evt: MessageEvent) => { + resolve(evt.data); + clean(); + + }; + const errorFn = (evt: ErrorEvent) => { + reject(evt); + clean(); + + }; + this.lightClientWorker.addEventListener("message", resolveFn); + this.lightClientWorker.addEventListener("error", errorFn); + }) + }) + } /** * Stop the light client instance. diff --git a/light-client-js/src/types.test.ts b/light-client-js/src/types.test.ts index 649da3b..20af81c 100644 --- a/light-client-js/src/types.test.ts +++ b/light-client-js/src/types.test.ts @@ -17,7 +17,7 @@ test("test scriptStatusTo/From", () => { args: "0x0011223344" }, script_type: "lock", - block_number: 1234n + block_number: "0x1234" }; const transformed = scriptStatusTo(raw); diff --git a/light-client-js/src/types.ts b/light-client-js/src/types.ts index 201bdbe..4b2a373 100644 --- a/light-client-js/src/types.ts +++ b/light-client-js/src/types.ts @@ -1,5 +1,7 @@ import { numFrom } from "@ckb-ccc/core"; import { Hex } from "@ckb-ccc/core"; +import { hexFrom } from "@ckb-ccc/core"; +import { numToHex } from "@ckb-ccc/core"; import { ClientBlockHeader } from "@ckb-ccc/core"; import { ScriptLike } from "@ckb-ccc/core"; import { JsonRpcBlockHeader, JsonRpcScript, JsonRpcTransaction, JsonRpcTransformers } from "@ckb-ccc/core/advancedBarrel"; @@ -40,7 +42,7 @@ type JsonRpcScriptType = "lock" | "type"; interface LightClientScriptStatus { script: JsonRpcScript; script_type: JsonRpcScriptType; - block_number: Num; + block_number: Hex; } interface ScriptStatus { @@ -51,7 +53,7 @@ interface ScriptStatus { export function scriptStatusTo(input: LightClientScriptStatus): ScriptStatus { return ({ - blockNumber: input.block_number, + blockNumber: numFrom(input.block_number), script: JsonRpcTransformers.scriptTo(input.script), scriptType: input.script_type }) @@ -59,7 +61,7 @@ export function scriptStatusTo(input: LightClientScriptStatus): ScriptStatus { export function scriptStatusFrom(input: ScriptStatus): LightClientScriptStatus { return ({ - block_number: input.blockNumber, + block_number: numToHex(input.blockNumber), script: JsonRpcTransformers.scriptFrom(input.script), script_type: input.scriptType }) diff --git a/light-client-lib/src/storage/db/browser.rs b/light-client-lib/src/storage/db/browser.rs index 3535a09..45a1bf1 100644 --- a/light-client-lib/src/storage/db/browser.rs +++ b/light-client-lib/src/storage/db/browser.rs @@ -195,6 +195,7 @@ impl CommunicationChannel { Some(take_while), ), }; + debug!("Dispatching database command: {:?}", new_cmd); let CommunicationChannel { input_i32_arr, input_u8_arr, @@ -1101,6 +1102,10 @@ impl Storage { // (block-hash, proved) matched_blocks: Vec<(Byte32, bool)>, ) { + debug!( + "Adding matched blocks: ({:?}, {:?}, {:?})", + start_number, blocks_count, matched_blocks + ); assert!(!matched_blocks.is_empty()); let mut key = Key::Meta(MATCHED_FILTER_BLOCKS_KEY).into_vec(); key.extend(start_number.to_be_bytes());