Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client: New Mechanism to Keep Peer Latest Block Updated #3354

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6a2b266
Client -> Peer: add lastEthStatusUpdate property to Peer, set initial…
holgerd77 Apr 11, 2024
4dd9bf8
Redo handshake in client peer pool periodically, add force option to …
holgerd77 Apr 11, 2024
e268f9a
Some basic refinement of peer pool periodic request logic (refactor p…
holgerd77 Apr 15, 2024
b053277
Client: consolidate latest() method in base Synchronizer class
holgerd77 Apr 15, 2024
72a28d1
Client: add preparatory updatedBestHeader to Peer, refactor latest() …
holgerd77 Apr 15, 2024
f29c3c6
Client: add potential best header num differentiated logic to Peer, l…
holgerd77 Apr 15, 2024
6ea23b0
Client: add Fetcher hack to call peer.latest()
holgerd77 Apr 15, 2024
fabf53c
Various cleanups
holgerd77 Apr 15, 2024
ccf9366
Client: Fix lightsync.spec.ts tests
holgerd77 Apr 16, 2024
4cd01aa
Some clean-ups
holgerd77 Apr 16, 2024
5777c42
Client: Fix beaconsync.spec.ts tests
holgerd77 Apr 16, 2024
9ea28d8
Client: Fix fullsync.spec.tst tests
holgerd77 Apr 16, 2024
9a6ae27
Client: Fix snapsync.spec.ts tests
holgerd77 Apr 16, 2024
5c42d73
Client: Fix fetcher tests
holgerd77 Apr 16, 2024
9e60a09
Client: Fix eth syncing.spec.ts tests
holgerd77 Apr 16, 2024
d6c2a6c
Client: Fix integration tests
holgerd77 Apr 16, 2024
58cb2b4
Client: Backup lightsync integration tests (lightsync not supported a…
holgerd77 Apr 16, 2024
c53fa4b
Lightsync tests deprecation note
holgerd77 Apr 16, 2024
20bf36f
Merge remote-tracking branch 'origin/master' into client-add-mechanis…
acolytec3 Apr 17, 2024
fe9a4ec
Make lightsync tests run (but fail)
acolytec3 Apr 17, 2024
0117083
Merge branch 'master' into client-add-mechanism-to-redo-peer-handshak…
scorbajio Apr 18, 2024
7d877f2
Merge branch 'master' into client-add-mechanism-to-redo-peer-handshak…
holgerd77 Apr 22, 2024
4e8aee6
Client: Removed doubled lightsync integration test file
holgerd77 Apr 22, 2024
4e7bd84
Merge branch 'master' into client-add-mechanism-to-redo-peer-handshak…
holgerd77 Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 72 additions & 14 deletions packages/client/src/net/peer/peer.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { BIGINT_0 } from '@ethereumjs/util'
import { EventEmitter } from 'events'

import { short } from '../../util'
import { BoundEthProtocol, BoundLesProtocol, BoundSnapProtocol } from '../protocol'

import type { Config } from '../../config'
import type { BoundProtocol, Protocol, Sender } from '../protocol'
import type { Server } from '../server'
import type { BlockHeader } from '@ethereumjs/block'

export interface PeerOptions {
/* Config */
Expand Down Expand Up @@ -90,6 +93,64 @@ export abstract class Peer extends EventEmitter {

abstract connect(): Promise<void>

/**
* Eventually updates and returns the latest header of peer
*/
async latest(): Promise<BlockHeader | undefined> {
if (!this.eth) {
return
}
let block: bigint | Uint8Array
if (!this.eth!.updatedBestHeader) {
// If there is no updated best header stored yet, start with the status hash
block = this.eth!.status.bestHash
} else {
block = this.getPotentialBestHeaderNum()
}
const result = await this.eth!.getBlockHeaders({
block,
max: 1,
})
if (result !== undefined) {
const latest = result[1][0]
this.eth!.updatedBestHeader = latest
if (latest !== undefined) {
const height = latest.number
if (
height > BIGINT_0 &&
(this.config.syncTargetHeight === undefined ||
this.config.syncTargetHeight === BIGINT_0 ||
this.config.syncTargetHeight < latest.number)
) {
this.config.syncTargetHeight = height
this.config.logger.info(`New sync target height=${height} hash=${short(latest.hash())}`)
}
}
}
return this.eth!.updatedBestHeader
}

/**
* Returns a potential best block header number for the peer
* (not necessarily verified by block request) derived from
* either the client-wide sync target height or the last best
* header timestamp "forward-calculated" by block/slot times (12s).
*/
getPotentialBestHeaderNum(): bigint {
let forwardCalculatedNum = BIGINT_0
const bestSyncTargetNum = this.config.syncTargetHeight ?? BIGINT_0
if (this.eth?.updatedBestHeader !== undefined) {
const bestHeaderNum = this.eth!.updatedBestHeader.number
const nowSec = Math.floor(Date.now() / 1000)
const diffSec = nowSec - Number(this.eth!.updatedBestHeader.timestamp)
const SLOT_TIME = 12
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a slot is missed? Then this block is definitely not there 🤔

Copy link
Member Author

@holgerd77 holgerd77 Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I initially planned to do 2-3 more commits fine tuning the existing solution a bit with a somewhat more adaptive algorithm (basically remember the last unsuccessful try (or remove on success again) and if present then gradually downgrade the request (so: try a lower block number)).

Guess we likely can/want to do this in a small follow-up PR, should be not such a big effort.

const diffBlocks = BigInt(Math.floor(diffSec / SLOT_TIME))
forwardCalculatedNum = bestHeaderNum + diffBlocks
}
const best = forwardCalculatedNum > bestSyncTargetNum ? forwardCalculatedNum : bestSyncTargetNum
return best
}

/**
* Handle unhandled messages along handshake
*/
Expand All @@ -108,28 +169,25 @@ export abstract class Peer extends EventEmitter {

if (protocol.name === 'eth') {
bound = new BoundEthProtocol(boundOpts)

await bound!.handshake(sender)

this.eth = <BoundEthProtocol>bound
} else if (protocol.name === 'les') {
bound = new BoundLesProtocol(boundOpts)
} else if (protocol.name === 'snap') {
bound = new BoundSnapProtocol(boundOpts)
} else {
throw new Error(`addProtocol: ${protocol.name} protocol not supported`)
}

// Handshake only when snap, else
if (protocol.name !== 'snap') {
await bound!.handshake(sender)
} else {
if (sender.status === undefined) throw Error('Snap can only be bound on handshaked peer')
}

if (protocol.name === 'eth') {
this.eth = <BoundEthProtocol>bound
this.les = <BoundLesProtocol>bound
} else if (protocol.name === 'snap') {
bound = new BoundSnapProtocol(boundOpts)
if (sender.status === undefined) throw Error('Snap can only be bound on handshaked peer')

this.snap = <BoundSnapProtocol>bound
} else if (protocol.name === 'les') {
this.les = <BoundLesProtocol>bound
} else {
throw new Error(`addProtocol: ${protocol.name} protocol not supported`)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wild but is only a small local refactor making things a bit cleaner. 🙂


this.boundProtocols.push(bound)
}

Expand Down
29 changes: 28 additions & 1 deletion packages/client/src/net/peerpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { Hardfork } from '@ethereumjs/common'

import { Event } from '../types'

import { type Peer, RlpxPeer } from './peer'

import type { Config } from '../config'
import type { Peer } from './peer'

export interface PeerPoolOptions {
/* Config */
Expand Down Expand Up @@ -31,7 +32,13 @@ export class PeerPool {
*/
private DEFAULT_STATUS_CHECK_INTERVAL = 20000

/**
* Default peer best header update interval (in ms)
*/
private DEFAULT_PEER_BEST_HEADER_UPDATE_INTERVAL = 5000

private _statusCheckInterval: NodeJS.Timeout | undefined /* global NodeJS */
private _peerBestHeaderUpdateInterval: NodeJS.Timeout | undefined
private _reconnectTimeout: NodeJS.Timeout | undefined

/**
Expand Down Expand Up @@ -87,6 +94,12 @@ export class PeerPool {
this.DEFAULT_STATUS_CHECK_INTERVAL
)

this._peerBestHeaderUpdateInterval = setInterval(
// eslint-disable-next-line @typescript-eslint/await-thenable
await this._peerBestHeaderUpdate.bind(this),
this.DEFAULT_PEER_BEST_HEADER_UPDATE_INTERVAL
)

this.running = true
return true
}
Expand All @@ -99,6 +112,7 @@ export class PeerPool {
await this.close()
}
clearInterval(this._statusCheckInterval as NodeJS.Timeout)
clearInterval(this._peerBestHeaderUpdateInterval as NodeJS.Timeout)
clearTimeout(this._reconnectTimeout as NodeJS.Timeout)
this.running = false
return true
Expand Down Expand Up @@ -252,4 +266,17 @@ export class PeerPool {
this.noPeerPeriods = 0
}
}

/**
* Periodically update the latest best known header for peers
*/
async _peerBestHeaderUpdate() {
for (const p of this.peers) {
if (p.idle && p.eth !== undefined && p instanceof RlpxPeer) {
p.idle = false
await p.latest()
p.idle = true
}
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally would think it would make more sense to have all (or: most, so except for the first one) latest() updates being triggered from here. Atm this is not very effective though, since we can only call on idle peers (see reasoning in separate comment) and mostly peers are not idle and so this is not getting called on peers very often in the current status quo.

}
9 changes: 9 additions & 0 deletions packages/client/src/net/protocol/boundprotocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ export class BoundProtocol {
private resolvers: Map<string | number, any>
private messageQueue: Message[] = []

/**
* An eventual updated best head.
*
* If set this is by design known to be greater or equal the block hash from
* the initial `STATUS` exchange (`_status` property here) and `updatedBestHash`
* number/hash should take precedence.
*/
public updatedBestHeader?: BlockHeader

/**
* Create bound protocol
*/
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/rpc/modules/eth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ export class Eth {
message: `no peer available for synchronization`,
}
}
const highestBlockHeader = await synchronizer.latest(bestPeer)
const highestBlockHeader = await bestPeer.latest()
if (!highestBlockHeader) {
throw {
code: INTERNAL_ERROR,
Expand Down
17 changes: 3 additions & 14 deletions packages/client/src/sync/beaconsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ export class BeaconSynchronizer extends Synchronizer {
const peers = this.pool.peers.filter(this.syncable.bind(this))
if (peers.length < this.config.minPeers && !this.forceSync) return
for (const peer of peers) {
const latest = await this.latest(peer)
if (latest) {
const latest = await peer.latest()
if (latest !== undefined) {
const { number } = latest
if (!best || best[1] < number) {
best = [peer, number]
Expand All @@ -126,17 +126,6 @@ export class BeaconSynchronizer extends Synchronizer {
return best ? best[0] : undefined
}

/**
* Get latest header of peer
*/
async latest(peer: Peer) {
const result = await peer.eth?.getBlockHeaders({
block: peer.eth!.status.bestHash,
max: 1,
})
return result ? result[1][0] : undefined
}

/**
* Start synchronizer.
* If passed a block, will initialize sync starting from the block.
Expand Down Expand Up @@ -221,7 +210,7 @@ export class BeaconSynchronizer extends Synchronizer {
return false
}

const latest = peer ? await this.latest(peer) : undefined
const latest = peer ? await peer.latest() : undefined
if (!latest) return false

const height = latest.number
Expand Down
5 changes: 5 additions & 0 deletions packages/client/src/sync/fetcher/accountfetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData>
job: Job<JobTask, AccountData[], AccountData>
): Promise<AccountDataResponse | undefined> {
const { peer } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()
const origin = this.getOrigin(job)
const limit = this.getLimit(job)

Expand Down
6 changes: 6 additions & 0 deletions packages/client/src/sync/fetcher/blockfetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
*/
async request(job: Job<JobTask, Block[], Block>): Promise<Block[]> {
const { task, peer, partialResult } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

let { first, count } = task
if (partialResult) {
first = !this.reverse
Expand Down
5 changes: 5 additions & 0 deletions packages/client/src/sync/fetcher/bytecodefetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ export class ByteCodeFetcher extends Fetcher<JobTask, Uint8Array[], Uint8Array>
job: Job<JobTask, Uint8Array[], Uint8Array>
): Promise<ByteCodeDataResponse | undefined> {
const { task, peer } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

this.debug(`requested code hashes: ${Array.from(task.hashes).map((h) => bytesToHex(h))}`)

Expand Down
7 changes: 7 additions & 0 deletions packages/client/src/sync/fetcher/headerfetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export class HeaderFetcher extends BlockFetcherBase<BlockHeaderResult, BlockHead
*/
async request(job: Job<JobTask, BlockHeaderResult, BlockHeader>) {
const { task, peer, partialResult } = job

// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

if (this.flow.maxRequestCount(peer!, 'GetBlockHeaders') < this.config.maxPerRequest) {
// we reached our request limit. try with a different peer.
return
Expand Down
6 changes: 6 additions & 0 deletions packages/client/src/sync/fetcher/storagefetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ export class StorageFetcher extends Fetcher<JobTask, StorageData[][], StorageDat
job: Job<JobTask, StorageData[][], StorageData[]>
): Promise<StorageDataResponse | undefined> {
const { task, peer } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

const origin = this.getOrigin(job)
const limit = this.getLimit(job)

Expand Down
6 changes: 6 additions & 0 deletions packages/client/src/sync/fetcher/trienodefetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ export class TrieNodeFetcher extends Fetcher<JobTask, Uint8Array[], Uint8Array>
job: Job<JobTask, Uint8Array[], Uint8Array>
): Promise<TrieNodesResponse | undefined> {
const { task, peer } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

const { paths, pathStrings } = task

const rangeResult = await peer!.snap!.getTrieNodes({
Expand Down
13 changes: 1 addition & 12 deletions packages/client/src/sync/fullsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,6 @@ export class FullSynchronizer extends Synchronizer {
return best
}

/**
* Get latest header of peer
*/
async latest(peer: Peer) {
const result = await peer.eth?.getBlockHeaders({
block: peer.eth!.status.bestHash,
max: 1,
})
return result ? result[1][0] : undefined
}

/**
* Checks if tx pool should be started
*/
Expand All @@ -175,7 +164,7 @@ export class FullSynchronizer extends Synchronizer {
* @returns a boolean if the setup was successful
*/
async syncWithPeer(peer?: Peer): Promise<boolean> {
const latest = peer ? await this.latest(peer) : undefined
const latest = peer ? await peer.latest() : undefined
if (!latest) return false

const height = latest.number
Expand Down
13 changes: 1 addition & 12 deletions packages/client/src/sync/lightsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,13 @@ export class LightSynchronizer extends Synchronizer {
return best
}

/**
* Get latest header of peer
*/
async latest(peer: Peer) {
const result = await peer.les?.getBlockHeaders({
block: peer.les!.status.headHash,
max: 1,
})
return result?.headers[0]
}

/**
* Called from `sync()` to sync headers and state from peer starting from current height.
* @param peer remote peer to sync with
* @returns a boolean if the setup was successful
*/
async syncWithPeer(peer?: Peer): Promise<boolean> {
const latest = peer ? await this.latest(peer) : undefined
const latest = peer ? await peer.latest() : undefined
if (!latest) return false

const height = peer!.les!.status.headNum
Expand Down
Loading
Loading