Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader, trie: pull head state concurrently with chain #2627

Merged
merged 1 commit into from
May 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
)

Expand Down Expand Up @@ -114,7 +115,6 @@ type Downloader struct {
// Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsStateTotal uint64 // Total number of node state entries known so far
syncStatsStateDone uint64 // Number of state trie entries already pulled
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields

Expand Down Expand Up @@ -321,12 +321,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
empty = true
}
}
// Reset any ephemeral sync statistics
d.syncStatsLock.Lock()
d.syncStatsStateTotal = 0
d.syncStatsStateDone = 0
d.syncStatsLock.Unlock()

// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
Expand Down Expand Up @@ -382,7 +376,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsLock.Unlock()

// Initiate the sync using a concurrent hash and block retrieval algorithm
d.queue.Prepare(origin+1, d.mode, 0)
d.queue.Prepare(origin+1, d.mode, 0, nil)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
Expand All @@ -397,30 +391,32 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if err != nil {
return err
}
origin, err := d.findAncestor(p, latest)
height := latest.Number.Uint64()

origin, err := d.findAncestor(p, height)
if err != nil {
return err
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = latest
d.syncStatsChainHeight = height
d.syncStatsLock.Unlock()

// Initiate the sync using a concurrent header and content retrieval algorithm
pivot := uint64(0)
switch d.mode {
case LightSync:
pivot = latest
pivot = height
case FastSync:
// Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
if err != nil {
panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
}
if latest > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
pivot = latest - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
}
// If the point is below the origin, move origin back to ensure state download
if pivot < origin {
Expand All @@ -432,9 +428,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
}
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
}
d.queue.Prepare(origin+1, d.mode, pivot)
d.queue.Prepare(origin+1, d.mode, pivot, latest)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
d.syncInitHook(origin, height)
}
return d.spawnSync(origin+1,
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
Expand Down Expand Up @@ -952,7 +948,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {

// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)

// Request the advertised remote head block and wait for the response
Expand All @@ -962,7 +958,7 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
for {
select {
case <-d.cancelCh:
return 0, errCancelBlockFetch
return nil, errCancelBlockFetch

case packet := <-d.headerCh:
// Discard anything not from the origin peer
Expand All @@ -974,13 +970,13 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
headers := packet.(*headerPack).headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
return 0, errBadPeer
return nil, errBadPeer
}
return headers[0].Number.Uint64(), nil
return headers[0], nil

case <-timeout:
glog.V(logger.Debug).Infof("%v: head header timeout", p)
return 0, errTimeout
return nil, errTimeout

case <-d.bodyCh:
case <-d.stateCh:
Expand Down Expand Up @@ -1369,10 +1365,10 @@ func (d *Downloader) fetchNodeData() error {
deliver = func(packet dataPack) (int, error) {
start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
// If the peer gave us nothing, stalling fast sync, drop
if delivered == 0 {
glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId())
d.dropPeer(packet.PeerId())
// If the peer returned old-requested data, forgive
if err == trie.ErrNotRequested {
glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
return
}
if err != nil {
// If the node data processing failed, the root hash is very wrong, abort
Expand All @@ -1381,17 +1377,21 @@ func (d *Downloader) fetchNodeData() error {
return
}
// Processing succeeded, notify state fetcher of continuation
if d.queue.PendingNodeData() > 0 {
pending := d.queue.PendingNodeData()
if pending > 0 {
select {
case d.stateWakeCh <- true:
default:
}
}
// Log a message to the user and return
d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock()
d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
d.syncStatsLock.Unlock()

// Log a message to the user and return
if delivered > 0 {
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d, pending at least %d", delivered, time.Since(start), d.syncStatsStateDone, pending)
}
})
}
expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
Expand Down
8 changes: 7 additions & 1 deletion eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1262,13 +1262,19 @@ func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error,

// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) {
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
q.lock.Lock()
defer q.lock.Unlock()

// Prepare the queue for sync results
if q.resultOffset < offset {
q.resultOffset = offset
}
q.fastSyncPivot = pivot
q.mode = mode

// If long running fast sync, also start up a head stateretrieval immediately
if mode == FastSync && pivot > 0 {
q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase)
}
}
7 changes: 6 additions & 1 deletion trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package trie

import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

// ErrNotRequested is returned by the trie sync when it's requested to process a
// node it did not request.
var ErrNotRequested = errors.New("not requested")

// request represents a scheduled or already in-flight state retrieval request.
type request struct {
hash common.Hash // Hash of the node data content to retrieve
Expand Down Expand Up @@ -143,7 +148,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) {
// If the item was not requested, bail out
request := s.requests[item.Hash]
if request == nil {
return i, fmt.Errorf("not requested: %x", item.Hash)
return i, ErrNotRequested
}
// If the item is a raw entry request, commit directly
if request.object == nil {
Expand Down