diff --git a/packages/client/lib/sync/fetcher/blockfetcher.ts b/packages/client/lib/sync/fetcher/blockfetcher.ts index ad91479957..4e6f918602 100644 --- a/packages/client/lib/sync/fetcher/blockfetcher.ts +++ b/packages/client/lib/sync/fetcher/blockfetcher.ts @@ -1,4 +1,5 @@ -import { Block, BlockBodyBuffer, BlockBuffer } from '@ethereumjs/block' +import { Block, BlockBuffer } from '@ethereumjs/block' +import { KECCAK256_RLP, KECCAK256_RLP_ARRAY } from 'ethereumjs-util' import { Peer } from '../../net/peer' import { EthProtocolMethods } from '../../net/protocol' import { Job } from './types' @@ -24,39 +25,48 @@ export class BlockFetcher extends BlockFetcherBase { async request(job: Job): Promise { const { task, peer } = job const { first, count } = task + + const blocksRange = `${first}-${first.addn(count)}` + const peerInfo = `id=${peer?.id.slice(0, 8)} address=${peer?.address}` + const headersResult = await (peer!.eth as EthProtocolMethods).getBlockHeaders({ block: first, max: count, }) - if (!headersResult) { - // Catch occasional null responses from peers who do not return block headers from peer.eth request - this.config.logger.debug( - `Peer id=${peer?.id.slice(0, 8)} address=${ - peer?.address - } returned no headers for blocks=${first}-${first.addn(count)}` - ) + if (!headersResult || headersResult[1].length === 0) { + // Catch occasional null or empty responses + this.debug(`Peer ${peerInfo} returned no headers for blocks=${blocksRange}`) return [] } const headers = headersResult[1] const bodiesResult = await peer!.eth!.getBlockBodies({ hashes: headers.map((h) => h.hash()) }) - if (!bodiesResult) { - // Catch occasional null responses from peers who do not return block bodies from peer.eth request - this.config.logger.debug( - `Peer id=${peer?.id.slice(0, 8)} address=${ - peer?.address - } returned no bodies for blocks=${first}-${first.addn(count)}` - ) + if (!bodiesResult || bodiesResult[1].length === 0) { + // Catch occasional null or empty responses + this.debug(`Peer ${peerInfo} returned no bodies for blocks=${blocksRange}`) return [] } const bodies = bodiesResult[1] - const blocks: Block[] = bodies.map(([txsData, unclesData]: BlockBodyBuffer, i: number) => { - const opts = { - common: this.config.chainCommon, - hardforkByBlockNumber: true, + this.debug( + `Requested blocks=${blocksRange} from ${peerInfo} (received: ${headers.length} headers / ${bodies.length} bodies)` + ) + const blocks: Block[] = [] + const blockOpts = { + common: this.config.chainCommon, + hardforkByBlockNumber: true, + } + for (const [i, [txsData, unclesData]] of bodies.entries()) { + if ( + (!headers[i].transactionsTrie.equals(KECCAK256_RLP) && txsData.length === 0) || + (!headers[i].uncleHash.equals(KECCAK256_RLP_ARRAY) && unclesData.length === 0) + ) { + this.debug( + `Requested block=${headers[i].number}} from peer ${peerInfo} missing non-empty txs or uncles` + ) + return [] } const values: BlockBuffer = [headers[i].raw(), txsData, unclesData] - return Block.fromValuesArray(values, opts) - }) + blocks.push(Block.fromValuesArray(values, blockOpts)) + } return blocks } @@ -72,6 +82,9 @@ export class BlockFetcher extends BlockFetcherBase { } else if (result.length > 0 && result.length < job.task.count) { // Adopt the start block/header number from the remaining jobs // if the number of the results provided is lower than the expected count + this.debug( + `Adopt start block/header number from remaining jobs (provided=${result.length} expected=${job.task.count})` + ) const lengthDiff = job.task.count - result.length const adoptedJobs = [] while (this.in.length > 0) { @@ -94,8 +107,14 @@ export class BlockFetcher extends BlockFetcherBase { * @param blocks fetch result */ async store(blocks: Block[]) { - const num = await this.chain.putBlocks(blocks) - this.config.events.emit(Event.SYNC_FETCHER_FETCHED, blocks.slice(0, num)) + try { + const num = await this.chain.putBlocks(blocks) + this.debug(`Fetcher results stored in blockchain (blocks num=${blocks.length}).`) + this.config.events.emit(Event.SYNC_FETCHER_FETCHED, blocks.slice(0, num)) + } catch (e: any) { + this.debug(`Error on storing fetcher results in blockchain (blocks num=${blocks.length}).`) + throw e + } } /** diff --git a/packages/client/lib/sync/fetcher/blockfetcherbase.ts b/packages/client/lib/sync/fetcher/blockfetcherbase.ts index 31bfc300a6..8a30bea547 100644 --- a/packages/client/lib/sync/fetcher/blockfetcherbase.ts +++ b/packages/client/lib/sync/fetcher/blockfetcherbase.ts @@ -39,6 +39,9 @@ export abstract class BlockFetcherBase extends Fetcher< this.chain = options.chain this.first = options.first this.count = options.count + this.debug( + `Block fetcher instantiated interval=${this.interval} first=${this.first} count=${this.count} destroyWhenDone=${this.destroyWhenDone}` + ) } /** @@ -56,6 +59,7 @@ export abstract class BlockFetcherBase extends Fetcher< if (count.gtn(0)) { tasks.push({ first: first.clone(), count: count.toNumber() }) } + this.debug(`Created new tasks num=${tasks.length} first=${first} count=${count}`) return tasks } @@ -100,5 +104,8 @@ export abstract class BlockFetcherBase extends Fetcher< ) }) } + this.debug( + `Enqueued tasks by number list num=${numberList.length} min=${min} bulkRequest=${bulkRequest}` + ) } } diff --git a/packages/client/lib/sync/fetcher/fetcher.ts b/packages/client/lib/sync/fetcher/fetcher.ts index 6e211ecde7..d4a5c7d40f 100644 --- a/packages/client/lib/sync/fetcher/fetcher.ts +++ b/packages/client/lib/sync/fetcher/fetcher.ts @@ -1,3 +1,4 @@ +import { debug as createDebugLogger, Debugger } from 'debug' import { Readable, Writable } from 'stream' import Heap from 'qheap' import { PeerPool } from '../../net/peerpool' @@ -40,6 +41,7 @@ export interface FetcherOptions { */ export abstract class Fetcher extends Readable { public config: Config + protected debug: Debugger protected pool: PeerPool protected timeout: number @@ -53,7 +55,7 @@ export abstract class Fetcher extends Readable protected finished: number // number of tasks which are both processed and also finished writing protected running: boolean protected reading: boolean - private destroyWhenDone: boolean // Destroy the fetcher once we are finished processing each task. + protected destroyWhenDone: boolean // Destroy the fetcher once we are finished processing each task. private _readableState?: { // This property is inherited from Readable. We only need `length`. @@ -67,6 +69,8 @@ export abstract class Fetcher extends Readable super({ ...options, objectMode: true }) this.config = options.config + this.debug = createDebugLogger('client:fetcher') + this.pool = options.pool this.timeout = options.timeout ?? 8000 this.interval = options.interval ?? 1000 @@ -195,6 +199,9 @@ export abstract class Fetcher extends Readable private success(job: Job, result?: JobResult) { if (job.state !== 'active') return if (result === undefined || (result as any).length === 0) { + this.debug( + `Re-enqueuing job ${JSON.stringify(job.task)} (undefined or empty result set returned).` + ) this.enqueue(job) void this.wait().then(() => { job.peer!.idle = true @@ -206,6 +213,7 @@ export abstract class Fetcher extends Readable this.out.insert(job) this.dequeue() } else { + this.debug(`Re-enqueuing job ${JSON.stringify(job.task)} (reply contains unexpected data).`) this.enqueue(job) } } @@ -233,12 +241,20 @@ export abstract class Fetcher extends Readable */ next() { const job = this.in.peek() - if ( - !job || - this._readableState!.length > this.maxQueue || - job.index > this.processed + this.maxQueue || - this.processed === this.total - ) { + + if (!job) { + this.debug(`No job found on next task, skip next job execution.`) + return false + } + if (this._readableState!.length > this.maxQueue) { + this.debug(`Readable state length extends max queue size, skip next job execution.`) + return false + } + if (job.index > this.processed + this.maxQueue) { + this.debug(`Job index greater than processed + max queue size, skip next job execution.`) + } + if (this.processed === this.total) { + this.debug(`Total number of tasks reached, skip next job execution.`) return false } const peer = this.peer() @@ -255,6 +271,9 @@ export abstract class Fetcher extends Readable .catch((error: Error) => this.failure(job, error)) .finally(() => clearTimeout(timeout)) return job + } else { + this.debug(`No idle peer available, skip next job execution.`) + return false } } @@ -318,6 +337,7 @@ export abstract class Fetcher extends Readable this.running = false writer.destroy() }) + this.debug(`Setup writer pipe.`) } /** @@ -329,9 +349,11 @@ export abstract class Fetcher extends Readable } this.write() this.running = true - for (const task of this.tasks()) { + const tasks = this.tasks() + for (const task of tasks) { this.enqueueTask(task) } + this.debug(`Enqueued num=${tasks.length} tasks`) while (this.running) { if (!this.next()) { if (this.finished === this.total) { @@ -360,14 +382,10 @@ export abstract class Fetcher extends Readable expire(job: Job) { job.state = 'expired' if (this.pool.contains(job.peer!)) { - this.config.logger.debug( - `Task timed out for peer (banning) ${JSON.stringify(job.task)} ${job.peer}` - ) + this.debug(`Task timed out for peer (banning) ${JSON.stringify(job.task)} ${job.peer}`) this.pool.ban(job.peer!, this.banTime) } else { - this.config.logger.debug( - `Peer disconnected while performing task ${JSON.stringify(job.task)} ${job.peer}` - ) + this.debug(`Peer disconnected while performing task ${JSON.stringify(job.task)} ${job.peer}`) } this.enqueue(job) } diff --git a/packages/client/package.json b/packages/client/package.json index 6501a4f4b0..8e01e21c87 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -58,6 +58,7 @@ "@ethereumjs/tx": "^3.3.2", "@ethereumjs/vm": "^5.5.3", "chalk": "^4.1.2", + "debug": "^2.2.0", "ethereumjs-util": "^7.1.3", "fs-extra": "^10.0.0", "it-pipe": "^1.1.0",