Skip to content

Commit

Permalink
Client: New Mechanism to Keep Peer Latest Block Updated (#3354)
Browse files Browse the repository at this point in the history
* Client -> Peer: add lastEthStatusUpdate property to Peer, set initially on protocol binding, small local refactor

* Redo handshake in client peer pool periodically, add force option to sendStatus for devp2p protocols

* Some basic refinement of peer pool periodic request logic (refactor preparation)

* Client: consolidate latest() method in base Synchronizer class

* Client: add preparatory updatedBestHeader to Peer, refactor latest() to move from Sync -> Peer

* Client: add potential best header num differentiated logic to Peer, latest() call on peers in peer pool

* Client: add Fetcher hack to call peer.latest()

* Various cleanups

* Client: Fix lightsync.spec.ts tests

* Some clean-ups

* Client: Fix beaconsync.spec.ts tests

* Client: Fix fullsync.spec.tst tests

* Client: Fix snapsync.spec.ts tests

* Client: Fix fetcher tests

* Client: Fix eth syncing.spec.ts tests

* Client: Fix integration tests

* Client: Backup lightsync integration tests (lightsync not supported anymore)

* Lightsync tests deprecation note

* Make lightsync tests run (but fail)

* Client: Removed doubled lightsync integration test file

---------

Co-authored-by: acolytec3 <[email protected]>
Co-authored-by: Scorbajio <[email protected]>
  • Loading branch information
3 people authored Apr 22, 2024
1 parent 02e8054 commit 46d09ca
Show file tree
Hide file tree
Showing 29 changed files with 327 additions and 143 deletions.
86 changes: 72 additions & 14 deletions packages/client/src/net/peer/peer.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { BIGINT_0 } from '@ethereumjs/util'
import { EventEmitter } from 'events'

import { short } from '../../util'
import { BoundEthProtocol, BoundLesProtocol, BoundSnapProtocol } from '../protocol'

import type { Config } from '../../config'
import type { BoundProtocol, Protocol, Sender } from '../protocol'
import type { Server } from '../server'
import type { BlockHeader } from '@ethereumjs/block'

export interface PeerOptions {
/* Config */
Expand Down Expand Up @@ -90,6 +93,64 @@ export abstract class Peer extends EventEmitter {

abstract connect(): Promise<void>

/**
* Eventually updates and returns the latest header of peer
*/
async latest(): Promise<BlockHeader | undefined> {
if (!this.eth) {
return
}
let block: bigint | Uint8Array
if (!this.eth!.updatedBestHeader) {
// If there is no updated best header stored yet, start with the status hash
block = this.eth!.status.bestHash
} else {
block = this.getPotentialBestHeaderNum()
}
const result = await this.eth!.getBlockHeaders({
block,
max: 1,
})
if (result !== undefined) {
const latest = result[1][0]
this.eth!.updatedBestHeader = latest
if (latest !== undefined) {
const height = latest.number
if (
height > BIGINT_0 &&
(this.config.syncTargetHeight === undefined ||
this.config.syncTargetHeight === BIGINT_0 ||
this.config.syncTargetHeight < latest.number)
) {
this.config.syncTargetHeight = height
this.config.logger.info(`New sync target height=${height} hash=${short(latest.hash())}`)
}
}
}
return this.eth!.updatedBestHeader
}

/**
* Returns a potential best block header number for the peer
* (not necessarily verified by block request) derived from
* either the client-wide sync target height or the last best
* header timestamp "forward-calculated" by block/slot times (12s).
*/
getPotentialBestHeaderNum(): bigint {
let forwardCalculatedNum = BIGINT_0
const bestSyncTargetNum = this.config.syncTargetHeight ?? BIGINT_0
if (this.eth?.updatedBestHeader !== undefined) {
const bestHeaderNum = this.eth!.updatedBestHeader.number
const nowSec = Math.floor(Date.now() / 1000)
const diffSec = nowSec - Number(this.eth!.updatedBestHeader.timestamp)
const SLOT_TIME = 12
const diffBlocks = BigInt(Math.floor(diffSec / SLOT_TIME))
forwardCalculatedNum = bestHeaderNum + diffBlocks
}
const best = forwardCalculatedNum > bestSyncTargetNum ? forwardCalculatedNum : bestSyncTargetNum
return best
}

/**
* Handle unhandled messages along handshake
*/
Expand All @@ -108,28 +169,25 @@ export abstract class Peer extends EventEmitter {

if (protocol.name === 'eth') {
bound = new BoundEthProtocol(boundOpts)

await bound!.handshake(sender)

this.eth = <BoundEthProtocol>bound
} else if (protocol.name === 'les') {
bound = new BoundLesProtocol(boundOpts)
} else if (protocol.name === 'snap') {
bound = new BoundSnapProtocol(boundOpts)
} else {
throw new Error(`addProtocol: ${protocol.name} protocol not supported`)
}

// Handshake only when snap, else
if (protocol.name !== 'snap') {
await bound!.handshake(sender)
} else {
if (sender.status === undefined) throw Error('Snap can only be bound on handshaked peer')
}

if (protocol.name === 'eth') {
this.eth = <BoundEthProtocol>bound
this.les = <BoundLesProtocol>bound
} else if (protocol.name === 'snap') {
bound = new BoundSnapProtocol(boundOpts)
if (sender.status === undefined) throw Error('Snap can only be bound on handshaked peer')

this.snap = <BoundSnapProtocol>bound
} else if (protocol.name === 'les') {
this.les = <BoundLesProtocol>bound
} else {
throw new Error(`addProtocol: ${protocol.name} protocol not supported`)
}

this.boundProtocols.push(bound)
}

Expand Down
29 changes: 28 additions & 1 deletion packages/client/src/net/peerpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { Hardfork } from '@ethereumjs/common'

import { Event } from '../types'

import { type Peer, RlpxPeer } from './peer'

import type { Config } from '../config'
import type { Peer } from './peer'

export interface PeerPoolOptions {
/* Config */
Expand Down Expand Up @@ -31,7 +32,13 @@ export class PeerPool {
*/
private DEFAULT_STATUS_CHECK_INTERVAL = 20000

/**
* Default peer best header update interval (in ms)
*/
private DEFAULT_PEER_BEST_HEADER_UPDATE_INTERVAL = 5000

private _statusCheckInterval: NodeJS.Timeout | undefined /* global NodeJS */
private _peerBestHeaderUpdateInterval: NodeJS.Timeout | undefined
private _reconnectTimeout: NodeJS.Timeout | undefined

/**
Expand Down Expand Up @@ -87,6 +94,12 @@ export class PeerPool {
this.DEFAULT_STATUS_CHECK_INTERVAL
)

this._peerBestHeaderUpdateInterval = setInterval(
// eslint-disable-next-line @typescript-eslint/await-thenable
await this._peerBestHeaderUpdate.bind(this),
this.DEFAULT_PEER_BEST_HEADER_UPDATE_INTERVAL
)

this.running = true
return true
}
Expand All @@ -99,6 +112,7 @@ export class PeerPool {
await this.close()
}
clearInterval(this._statusCheckInterval as NodeJS.Timeout)
clearInterval(this._peerBestHeaderUpdateInterval as NodeJS.Timeout)
clearTimeout(this._reconnectTimeout as NodeJS.Timeout)
this.running = false
return true
Expand Down Expand Up @@ -252,4 +266,17 @@ export class PeerPool {
this.noPeerPeriods = 0
}
}

/**
* Periodically update the latest best known header for peers
*/
async _peerBestHeaderUpdate() {
for (const p of this.peers) {
if (p.idle && p.eth !== undefined && p instanceof RlpxPeer) {
p.idle = false
await p.latest()
p.idle = true
}
}
}
}
9 changes: 9 additions & 0 deletions packages/client/src/net/protocol/boundprotocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ export class BoundProtocol {
private resolvers: Map<string | number, any>
private messageQueue: Message[] = []

/**
* An eventual updated best head.
*
* If set this is by design known to be greater or equal the block hash from
* the initial `STATUS` exchange (`_status` property here) and `updatedBestHash`
* number/hash should take precedence.
*/
public updatedBestHeader?: BlockHeader

/**
* Create bound protocol
*/
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/rpc/modules/eth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ export class Eth {
message: `no peer available for synchronization`,
}
}
const highestBlockHeader = await synchronizer.latest(bestPeer)
const highestBlockHeader = await bestPeer.latest()
if (!highestBlockHeader) {
throw {
code: INTERNAL_ERROR,
Expand Down
17 changes: 3 additions & 14 deletions packages/client/src/sync/beaconsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ export class BeaconSynchronizer extends Synchronizer {
const peers = this.pool.peers.filter(this.syncable.bind(this))
if (peers.length < this.config.minPeers && !this.forceSync) return
for (const peer of peers) {
const latest = await this.latest(peer)
if (latest) {
const latest = await peer.latest()
if (latest !== undefined) {
const { number } = latest
if (!best || best[1] < number) {
best = [peer, number]
Expand All @@ -126,17 +126,6 @@ export class BeaconSynchronizer extends Synchronizer {
return best ? best[0] : undefined
}

/**
* Get latest header of peer
*/
async latest(peer: Peer) {
const result = await peer.eth?.getBlockHeaders({
block: peer.eth!.status.bestHash,
max: 1,
})
return result ? result[1][0] : undefined
}

/**
* Start synchronizer.
* If passed a block, will initialize sync starting from the block.
Expand Down Expand Up @@ -221,7 +210,7 @@ export class BeaconSynchronizer extends Synchronizer {
return false
}

const latest = peer ? await this.latest(peer) : undefined
const latest = peer ? await peer.latest() : undefined
if (!latest) return false

const height = latest.number
Expand Down
5 changes: 5 additions & 0 deletions packages/client/src/sync/fetcher/accountfetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ export class AccountFetcher extends Fetcher<JobTask, AccountData[], AccountData>
job: Job<JobTask, AccountData[], AccountData>
): Promise<AccountDataResponse | undefined> {
const { peer } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()
const origin = this.getOrigin(job)
const limit = this.getLimit(job)

Expand Down
6 changes: 6 additions & 0 deletions packages/client/src/sync/fetcher/blockfetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
*/
async request(job: Job<JobTask, Block[], Block>): Promise<Block[]> {
const { task, peer, partialResult } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

let { first, count } = task
if (partialResult) {
first = !this.reverse
Expand Down
5 changes: 5 additions & 0 deletions packages/client/src/sync/fetcher/bytecodefetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ export class ByteCodeFetcher extends Fetcher<JobTask, Uint8Array[], Uint8Array>
job: Job<JobTask, Uint8Array[], Uint8Array>
): Promise<ByteCodeDataResponse | undefined> {
const { task, peer } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

this.debug(`requested code hashes: ${Array.from(task.hashes).map((h) => bytesToHex(h))}`)

Expand Down
7 changes: 7 additions & 0 deletions packages/client/src/sync/fetcher/headerfetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export class HeaderFetcher extends BlockFetcherBase<BlockHeaderResult, BlockHead
*/
async request(job: Job<JobTask, BlockHeaderResult, BlockHeader>) {
const { task, peer, partialResult } = job

// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

if (this.flow.maxRequestCount(peer!, 'GetBlockHeaders') < this.config.maxPerRequest) {
// we reached our request limit. try with a different peer.
return
Expand Down
6 changes: 6 additions & 0 deletions packages/client/src/sync/fetcher/storagefetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ export class StorageFetcher extends Fetcher<JobTask, StorageData[][], StorageDat
job: Job<JobTask, StorageData[][], StorageData[]>
): Promise<StorageDataResponse | undefined> {
const { task, peer } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

const origin = this.getOrigin(job)
const limit = this.getLimit(job)

Expand Down
6 changes: 6 additions & 0 deletions packages/client/src/sync/fetcher/trienodefetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ export class TrieNodeFetcher extends Fetcher<JobTask, Uint8Array[], Uint8Array>
job: Job<JobTask, Uint8Array[], Uint8Array>
): Promise<TrieNodesResponse | undefined> {
const { task, peer } = job
// Currently this is the only safe place to call peer.latest() without interfering with the fetcher
// TODOs:
// 1. Properly rewrite Fetcher with async/await -> allow to at least place in Fetcher.next()
// 2. Properly implement ETH request IDs -> allow to call on non-idle in Peer Pool
await peer?.latest()

const { paths, pathStrings } = task

const rangeResult = await peer!.snap!.getTrieNodes({
Expand Down
13 changes: 1 addition & 12 deletions packages/client/src/sync/fullsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,6 @@ export class FullSynchronizer extends Synchronizer {
return best
}

/**
* Get latest header of peer
*/
async latest(peer: Peer) {
const result = await peer.eth?.getBlockHeaders({
block: peer.eth!.status.bestHash,
max: 1,
})
return result ? result[1][0] : undefined
}

/**
* Checks if tx pool should be started
*/
Expand All @@ -175,7 +164,7 @@ export class FullSynchronizer extends Synchronizer {
* @returns a boolean if the setup was successful
*/
async syncWithPeer(peer?: Peer): Promise<boolean> {
const latest = peer ? await this.latest(peer) : undefined
const latest = peer ? await peer.latest() : undefined
if (!latest) return false

const height = latest.number
Expand Down
13 changes: 1 addition & 12 deletions packages/client/src/sync/lightsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,13 @@ export class LightSynchronizer extends Synchronizer {
return best
}

/**
* Get latest header of peer
*/
async latest(peer: Peer) {
const result = await peer.les?.getBlockHeaders({
block: peer.les!.status.headHash,
max: 1,
})
return result?.headers[0]
}

/**
* Called from `sync()` to sync headers and state from peer starting from current height.
* @param peer remote peer to sync with
* @returns a boolean if the setup was successful
*/
async syncWithPeer(peer?: Peer): Promise<boolean> {
const latest = peer ? await this.latest(peer) : undefined
const latest = peer ? await peer.latest() : undefined
if (!latest) return false

const height = peer!.les!.status.headNum
Expand Down
Loading

0 comments on commit 46d09ca

Please sign in to comment.