Skip to content

Commit

Permalink
eth, eth/downloader: remove references to LightChain, LightSync (#2776)
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 authored Nov 28, 2024
1 parent d354496 commit a4fb352
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 152 deletions.
3 changes: 1 addition & 2 deletions core/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (
)

// ChainReader defines a small collection of methods needed to access the local
// blockchain during header verification. It's implemented by both blockchain
// and lightchain.
// blockchain during header verification. It's implemented by blockchain.
type ChainReader interface {
// Config retrieves the header chain's chain configuration.
Config() *params.ChainConfig
Expand Down
137 changes: 45 additions & 92 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ var (
MaxReceiptFetch = 256 // Number of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Number of node state values to allow fetching per request

maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
FullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
lightMaxForkAncestry uint64 = 30000 // Maximum chain reorganisation (locally redeclared so tests can reduce it)
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
FullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)

reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
Expand Down Expand Up @@ -96,9 +95,8 @@ type Downloader struct {
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events

genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed

stateDB ethdb.Database // Database to state sync into (and deduplicate via)

Expand All @@ -107,7 +105,6 @@ type Downloader struct {
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields

lightchain LightChain
blockchain BlockChain

// Callbacks
Expand Down Expand Up @@ -151,8 +148,8 @@ type Downloader struct {
syncLogTime time.Time // Time instance when status was last reported
}

// LightChain encapsulates functions required to synchronise a light chain.
type LightChain interface {
// BlockChain encapsulates functions required to sync a (full or snap) blockchain.
type BlockChain interface {
// HasHeader verifies a header's presence in the local chain.
HasHeader(common.Hash, uint64) bool

Expand All @@ -170,11 +167,6 @@ type LightChain interface {

// SetHead rewinds the local chain to a new head.
SetHead(uint64) error
}

// BlockChain encapsulates functions required to sync a (full or snap) blockchain.
type BlockChain interface {
LightChain

// HasBlock verifies a block's presence in the local chain.
HasBlock(common.Hash, uint64) bool
Expand Down Expand Up @@ -217,17 +209,13 @@ type BlockChain interface {
type DownloadOption func(downloader *Downloader) *Downloader

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, _ func()) *Downloader {
if lightchain == nil {
lightchain = chain
}
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, _ func()) *Downloader {
dl := &Downloader{
stateDB: stateDb,
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
Expand All @@ -254,15 +242,13 @@ func (d *Downloader) Progress() ethereum.SyncProgress {

current := uint64(0)
mode := d.getMode()
switch {
case d.blockchain != nil && mode == FullSync:
switch mode {
case FullSync:
current = d.blockchain.CurrentBlock().Number.Uint64()
case d.blockchain != nil && mode == SnapSync:
case SnapSync:
current = d.blockchain.CurrentSnapBlock().Number.Uint64()
case d.lightchain != nil:
current = d.lightchain.CurrentHeader().Number.Uint64()
default:
log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode)
log.Error("Unknown downloader mode", "mode", mode)
}
progress, pending := d.SnapSyncer.Progress()

Expand Down Expand Up @@ -455,7 +441,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
latest := d.lightchain.CurrentHeader()
latest := d.blockchain.CurrentHeader()
d.mux.Post(DoneEvent{latest})
}
}()
Expand Down Expand Up @@ -492,7 +478,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
case SnapSync:
localHeight = d.blockchain.CurrentSnapBlock().Number.Uint64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
localHeight = d.blockchain.CurrentHeader().Number.Uint64()
}

origin, err := d.findAncestor(p, localHeight, remoteHeader)
Expand All @@ -502,8 +488,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *

if localHeight >= remoteHeight {
// if remoteHeader does not exist in local chain, will move on to insert it as a side chain.
if d.blockchain.GetBlockByHash(remoteHeader.Hash()) != nil ||
(mode == LightSync && d.blockchain.GetHeaderByHash(remoteHeader.Hash()) != nil) {
if d.blockchain.GetBlockByHash(remoteHeader.Hash()) != nil {
p.log.Warn("syncWithPeer", "local", localHeight, "remote", remoteHeight, "mode", mode, "err", errLaggingPeer)
p.peer.MarkLagging()
return errLaggingPeer
Expand Down Expand Up @@ -570,7 +555,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
}
// Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen {
if err := d.lightchain.SetHead(origin); err != nil {
if err := d.blockchain.SetHead(origin); err != nil {
return err
}
log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin)
Expand Down Expand Up @@ -786,16 +771,13 @@ func (d *Downloader) findAncestor(p *peerConnection, localHeight uint64, remoteH
case SnapSync:
localHeight = d.blockchain.CurrentSnapBlock().Number.Uint64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
localHeight = d.blockchain.CurrentHeader().Number.Uint64()
}
*/
p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)

// Recap floor value for binary search
maxForkAncestry := FullMaxForkAncestry
if d.getMode() == LightSync {
maxForkAncestry = lightMaxForkAncestry
}
if localHeight >= maxForkAncestry {
// We're above the max reorg threshold, find the earliest fork point
floor = int64(localHeight - maxForkAncestry)
Expand All @@ -805,26 +787,6 @@ func (d *Downloader) findAncestor(p *peerConnection, localHeight uint64, remoteH
floor = int64(tail)
}

// If we're doing a light sync, ensure the floor doesn't go below the CHT, as
// all headers before that point will be missing.
if mode == LightSync {
// If we don't know the current CHT position, find it
if d.genesis == 0 {
header := d.lightchain.CurrentHeader()
for header != nil {
d.genesis = header.Number.Uint64()
if floor >= int64(d.genesis)-1 {
break
}
header = d.lightchain.GetHeaderByHash(header.ParentHash)
}
}
// We already know the "genesis" block number, cap floor to that
if floor < int64(d.genesis)-1 {
floor = int64(d.genesis) - 1
}
}

ancestor, err := d.findAncestorSpanSearch(p, mode, remoteHeight, localHeight, floor)
if err == nil {
return ancestor, nil
Expand Down Expand Up @@ -885,7 +847,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
case SnapSync:
known = d.blockchain.HasFastBlock(h, n)
default:
known = d.lightchain.HasHeader(h, n)
known = d.blockchain.HasHeader(h, n)
}
if known {
number, hash = n, h
Expand Down Expand Up @@ -938,13 +900,13 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
case SnapSync:
known = d.blockchain.HasFastBlock(h, n)
default:
known = d.lightchain.HasHeader(h, n)
known = d.blockchain.HasHeader(h, n)
}
if !known {
end = check
continue
}
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
header := d.blockchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
if header == nil {
p.log.Error("header not found", "hash", h, "request", check)
return 0, fmt.Errorf("%w: header no found (%s)", errBadPeer, h)
Expand Down Expand Up @@ -982,7 +944,6 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
skeleton = true // Skeleton assembly phase or finishing up
pivoting = false // Whether the next request is pivot verification
ancestor = from
mode = d.getMode()
)
for {
// Pull the next batch of headers, it either:
Expand Down Expand Up @@ -1129,13 +1090,9 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
if n := len(headers); n > 0 {
// Retrieve the current head we're at
var head uint64
if mode == LightSync {
head = d.lightchain.CurrentHeader().Number.Uint64()
} else {
head = d.blockchain.CurrentSnapBlock().Number.Uint64()
if full := d.blockchain.CurrentBlock().Number.Uint64(); head < full {
head = full
}
head = d.blockchain.CurrentSnapBlock().Number.Uint64()
if full := d.blockchain.CurrentBlock().Number.Uint64(); head < full {
head = full
}
// If the head is below the common ancestor, we're actually deduplicating
// already existing chain segments, so use the ancestor as the fake head.
Expand Down Expand Up @@ -1275,11 +1232,9 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
// If snap or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
Expand All @@ -1288,9 +1243,9 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if mode == SnapSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
if mode == SnapSync {
head := d.blockchain.CurrentHeader()
if td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
Expand All @@ -1317,31 +1272,29 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
chunkHashes := hashes[:limit]

// In case of header only syncing, validate the chunk immediately
if mode == SnapSync || mode == LightSync {
if mode == SnapSync {
if len(chunkHeaders) > 0 {
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders); err != nil {
if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil {
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
}

headers = headers[limit:]
hashes = hashes[limit:]
origin += uint64(limit)
Expand Down
Loading

0 comments on commit a4fb352

Please sign in to comment.