Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client: Added Fetcher debug logger #1544

Merged
merged 3 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 42 additions & 23 deletions packages/client/lib/sync/fetcher/blockfetcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Block, BlockBodyBuffer, BlockBuffer } from '@ethereumjs/block'
import { Block, BlockBuffer } from '@ethereumjs/block'
import { KECCAK256_RLP, KECCAK256_RLP_ARRAY } from 'ethereumjs-util'
import { Peer } from '../../net/peer'
import { EthProtocolMethods } from '../../net/protocol'
import { Job } from './types'
Expand All @@ -24,39 +25,48 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
async request(job: Job<JobTask, Block[], Block>): Promise<Block[]> {
const { task, peer } = job
const { first, count } = task

const blocksRange = `${first}-${first.addn(count)}`
const peerInfo = `id=${peer?.id.slice(0, 8)} address=${peer?.address}`

const headersResult = await (peer!.eth as EthProtocolMethods).getBlockHeaders({
block: first,
max: count,
})
if (!headersResult) {
// Catch occasional null responses from peers who do not return block headers from peer.eth request
this.config.logger.debug(
`Peer id=${peer?.id.slice(0, 8)} address=${
peer?.address
} returned no headers for blocks=${first}-${first.addn(count)}`
)
if (!headersResult || headersResult[1].length === 0) {
// Catch occasional null or empty responses
this.debug(`Peer ${peerInfo} returned no headers for blocks=${blocksRange}`)
return []
}
const headers = headersResult[1]
const bodiesResult = await peer!.eth!.getBlockBodies({ hashes: headers.map((h) => h.hash()) })
if (!bodiesResult) {
// Catch occasional null responses from peers who do not return block bodies from peer.eth request
this.config.logger.debug(
`Peer id=${peer?.id.slice(0, 8)} address=${
peer?.address
} returned no bodies for blocks=${first}-${first.addn(count)}`
)
if (!bodiesResult || bodiesResult[1].length === 0) {
// Catch occasional null or empty responses
this.debug(`Peer ${peerInfo} returned no bodies for blocks=${blocksRange}`)
return []
}
const bodies = bodiesResult[1]
const blocks: Block[] = bodies.map(([txsData, unclesData]: BlockBodyBuffer, i: number) => {
const opts = {
common: this.config.chainCommon,
hardforkByBlockNumber: true,
this.debug(
`Requested blocks=${blocksRange} from ${peerInfo} (received: ${headers.length} headers / ${bodies.length} bodies)`
)
const blocks: Block[] = []
const blockOpts = {
common: this.config.chainCommon,
hardforkByBlockNumber: true,
}
for (const [i, [txsData, unclesData]] of bodies.entries()) {
if (
(!headers[i].transactionsTrie.equals(KECCAK256_RLP) && txsData.length === 0) ||
(!headers[i].uncleHash.equals(KECCAK256_RLP_ARRAY) && unclesData.length === 0)
) {
this.debug(
`Requested block=${headers[i].number}} from peer ${peerInfo} missing non-empty txs or uncles`
)
return []
}
const values: BlockBuffer = [headers[i].raw(), txsData, unclesData]
return Block.fromValuesArray(values, opts)
})
blocks.push(Block.fromValuesArray(values, blockOpts))
}
return blocks
}

Expand All @@ -72,6 +82,9 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
} else if (result.length > 0 && result.length < job.task.count) {
// Adopt the start block/header number from the remaining jobs
// if the number of the results provided is lower than the expected count
this.debug(
`Adopt start block/header number from remaining jobs (provided=${result.length} expected=${job.task.count})`
)
const lengthDiff = job.task.count - result.length
const adoptedJobs = []
while (this.in.length > 0) {
Expand All @@ -94,8 +107,14 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
* @param blocks fetch result
*/
async store(blocks: Block[]) {
const num = await this.chain.putBlocks(blocks)
this.config.events.emit(Event.SYNC_FETCHER_FETCHED, blocks.slice(0, num))
try {
const num = await this.chain.putBlocks(blocks)
this.debug(`Fetcher results stored in blockchain (blocks num=${blocks.length}).`)
this.config.events.emit(Event.SYNC_FETCHER_FETCHED, blocks.slice(0, num))
} catch (e: any) {
this.debug(`Error on storing fetcher results in blockchain (blocks num=${blocks.length}).`)
throw e
}
}

/**
Expand Down
7 changes: 7 additions & 0 deletions packages/client/lib/sync/fetcher/blockfetcherbase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
this.chain = options.chain
this.first = options.first
this.count = options.count
this.debug(
`Block fetcher instantiated interval=${this.interval} first=${this.first} count=${this.count} destroyWhenDone=${this.destroyWhenDone}`
)
}

/**
Expand All @@ -56,6 +59,7 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
if (count.gtn(0)) {
tasks.push({ first: first.clone(), count: count.toNumber() })
}
this.debug(`Created new tasks num=${tasks.length} first=${first} count=${count}`)
return tasks
}

Expand Down Expand Up @@ -100,5 +104,8 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
)
})
}
this.debug(
`Enqueued tasks by number list num=${numberList.length} min=${min} bulkRequest=${bulkRequest}`
)
}
}
46 changes: 32 additions & 14 deletions packages/client/lib/sync/fetcher/fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { debug as createDebugLogger, Debugger } from 'debug'
import { Readable, Writable } from 'stream'
import Heap from 'qheap'
import { PeerPool } from '../../net/peerpool'
Expand Down Expand Up @@ -40,6 +41,7 @@ export interface FetcherOptions {
*/
export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable {
public config: Config
protected debug: Debugger

protected pool: PeerPool
protected timeout: number
Expand All @@ -53,7 +55,7 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
protected finished: number // number of tasks which are both processed and also finished writing
protected running: boolean
protected reading: boolean
private destroyWhenDone: boolean // Destroy the fetcher once we are finished processing each task.
protected destroyWhenDone: boolean // Destroy the fetcher once we are finished processing each task.

private _readableState?: {
// This property is inherited from Readable. We only need `length`.
Expand All @@ -67,6 +69,8 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
super({ ...options, objectMode: true })

this.config = options.config
this.debug = createDebugLogger('client:fetcher')

this.pool = options.pool
this.timeout = options.timeout ?? 8000
this.interval = options.interval ?? 1000
Expand Down Expand Up @@ -195,6 +199,9 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
private success(job: Job<JobTask, JobResult, StorageItem>, result?: JobResult) {
if (job.state !== 'active') return
if (result === undefined || (result as any).length === 0) {
this.debug(
`Re-enqueuing job ${JSON.stringify(job.task)} (undefined or empty result set returned).`
)
this.enqueue(job)
void this.wait().then(() => {
job.peer!.idle = true
Expand All @@ -206,6 +213,7 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
this.out.insert(job)
this.dequeue()
} else {
this.debug(`Re-enqueuing job ${JSON.stringify(job.task)} (reply contains unexpected data).`)
this.enqueue(job)
}
}
Expand Down Expand Up @@ -233,12 +241,20 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
*/
next() {
const job = this.in.peek()
if (
!job ||
this._readableState!.length > this.maxQueue ||
job.index > this.processed + this.maxQueue ||
this.processed === this.total
) {

if (!job) {
this.debug(`No job found on next task, skip next job execution.`)
return false
}
if (this._readableState!.length > this.maxQueue) {
this.debug(`Readable state length extends max queue size, skip next job execution.`)
return false
}
if (job.index > this.processed + this.maxQueue) {
this.debug(`Job index greater than processed + max queue size, skip next job execution.`)
}
if (this.processed === this.total) {
this.debug(`Total number of tasks reached, skip next job execution.`)
return false
}
const peer = this.peer()
Expand All @@ -255,6 +271,9 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
.catch((error: Error) => this.failure(job, error))
.finally(() => clearTimeout(timeout))
return job
} else {
this.debug(`No idle peer available, skip next job execution.`)
return false
}
}

Expand Down Expand Up @@ -318,6 +337,7 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
this.running = false
writer.destroy()
})
this.debug(`Setup writer pipe.`)
}

/**
Expand All @@ -329,9 +349,11 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
}
this.write()
this.running = true
for (const task of this.tasks()) {
const tasks = this.tasks()
for (const task of tasks) {
this.enqueueTask(task)
}
this.debug(`Enqueued num=${tasks.length} tasks`)
while (this.running) {
if (!this.next()) {
if (this.finished === this.total) {
Expand Down Expand Up @@ -360,14 +382,10 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
expire(job: Job<JobTask, JobResult, StorageItem>) {
job.state = 'expired'
if (this.pool.contains(job.peer!)) {
this.config.logger.debug(
`Task timed out for peer (banning) ${JSON.stringify(job.task)} ${job.peer}`
)
this.debug(`Task timed out for peer (banning) ${JSON.stringify(job.task)} ${job.peer}`)
this.pool.ban(job.peer!, this.banTime)
} else {
this.config.logger.debug(
`Peer disconnected while performing task ${JSON.stringify(job.task)} ${job.peer}`
)
this.debug(`Peer disconnected while performing task ${JSON.stringify(job.task)} ${job.peer}`)
}
this.enqueue(job)
}
Expand Down
1 change: 1 addition & 0 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"@ethereumjs/tx": "^3.3.2",
"@ethereumjs/vm": "^5.5.3",
"chalk": "^4.1.2",
"debug": "^2.2.0",
"ethereumjs-util": "^7.1.3",
"fs-extra": "^10.0.0",
"it-pipe": "^1.1.0",
Expand Down