Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: make fast sync resilient to critical section fails #2647

Merged
merged 1 commit into from
Jun 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing
)

var (
Expand Down Expand Up @@ -103,13 +104,15 @@ var (
)

type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
noFast bool // Flag to disable fast syncing in case of a security error
mux *event.TypeMux // Event multiplexer to announce sync operation events
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events

queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed

fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails int // Number of fast sync failures in the critical section

interrupt int32 // Atomic boolean to signal termination

// Statistics
Expand Down Expand Up @@ -314,6 +317,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
default:
}
}
for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
for empty := false; !empty; {
select {
case <-ch:
default:
empty = true
}
}
}
for empty := false; !empty; {
select {
case <-d.headerProcCh:
Expand All @@ -330,7 +342,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode

// Set the requested sync mode, unless it's forbidden
d.mode = mode
if d.mode == FastSync && d.noFast {
if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials {
d.mode = FullSync
}
// Retrieve the origin peer and initiate the downloading process
Expand Down Expand Up @@ -413,12 +425,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
pivot = height
case FastSync:
// Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
if err != nil {
panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
}
if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
if d.fsPivotLock == nil {
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need crypto rand here? Can't you use math's rand package and omit the big int to uint casting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this part is quite critical to be truly random and non guessable. As it's executed once per sync cycle max, performance wise it doesn't matter and I figured that it's better to be on the safe side.

if err != nil {
panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
}
if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
}
} else {
// Pivot point locked in, use this and do not pick a new one!
pivot = d.fsPivotLock.Number.Uint64()
}
// If the point is below the origin, move origin back to ensure state download
if pivot < origin {
Expand Down Expand Up @@ -1218,8 +1235,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
// If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
d.headerProcCh <- nil
return nil
select {
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCancelHeaderFetch
}
}
headers := packet.(*headerPack).headers

Expand Down Expand Up @@ -1611,9 +1632,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())

// If we're already past the pivot point, this could be an attack, disable fast sync
// If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot {
d.noFast = true
// If we didn't ever fail, lock in te pivot header (must! not! change!)
if d.fsPivotFails == 0 {
for _, header := range rollback {
if header.Number.Uint64() == pivot {
glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
d.fsPivotLock = header
}
}
}
d.fsPivotFails++
}
}
}()
Expand Down Expand Up @@ -1712,6 +1742,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
}
// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
return errInvalidChain
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
Expand Down
70 changes: 58 additions & 12 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,25 @@ type downloadTester struct {
peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains

peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return

lock sync.RWMutex
}

// newTester creates a new downloader test mocker.
func newTester() *downloadTester {
tester := &downloadTester{
ownHashes: []common.Hash{genesis.Hash()},
ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
peerHashes: make(map[string][]common.Hash),
peerHeaders: make(map[string]map[common.Hash]*types.Header),
peerBlocks: make(map[string]map[common.Hash]*types.Block),
peerReceipts: make(map[string]map[common.Hash]types.Receipts),
peerChainTds: make(map[string]map[common.Hash]*big.Int),
ownHashes: []common.Hash{genesis.Hash()},
ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
peerHashes: make(map[string][]common.Hash),
peerHeaders: make(map[string]map[common.Hash]*types.Header),
peerBlocks: make(map[string]map[common.Hash]*types.Block),
peerReceipts: make(map[string]map[common.Hash]types.Receipts),
peerChainTds: make(map[string]map[common.Hash]*big.Int),
peerMissingStates: make(map[string]map[common.Hash]bool),
}
tester.stateDb, _ = ethdb.NewMemDatabase()
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
Expand Down Expand Up @@ -408,6 +411,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
dl.peerBlocks[id] = make(map[common.Hash]*types.Block)
dl.peerReceipts[id] = make(map[common.Hash]types.Receipts)
dl.peerChainTds[id] = make(map[common.Hash]*big.Int)
dl.peerMissingStates[id] = make(map[common.Hash]bool)

genesis := hashes[len(hashes)-1]
if header := headers[genesis]; header != nil {
Expand Down Expand Up @@ -648,7 +652,9 @@ func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func
results := make([][]byte, 0, len(hashes))
for _, hash := range hashes {
if data, err := testdb.Get(hash.Bytes()); err == nil {
results = append(results, data)
if !dl.peerMissingStates[id][hash] {
results = append(results, data)
}
}
}
go dl.downloader.DeliverNodeData(id, results)
Expand Down Expand Up @@ -1288,7 +1294,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts)
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1

tester.downloader.noFast = false
tester.downloader.fsPivotFails = 0
tester.downloader.syncInitHook = func(uint64, uint64) {
for i := missing; i <= len(hashes); i++ {
delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i])
Expand All @@ -1307,6 +1313,8 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}
tester.downloader.fsPivotFails = fsCriticalTrials

// Synchronise with the valid peer and make sure sync succeeds. Since the last
// rollback should also disable fast syncing for this process, verify that we
// did a fresh full sync. Note, we can't assert anything about the receipts
Expand Down Expand Up @@ -1749,3 +1757,41 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
}
}
}

// Tests that if fast sync aborts in the critical section, it can restart a few
// times before giving up.
func TestFastCriticalRestarts63(t *testing.T) { testFastCriticalRestarts(t, 63) }
func TestFastCriticalRestarts64(t *testing.T) { testFastCriticalRestarts(t, 64) }

func testFastCriticalRestarts(t *testing.T, protocol int) {
t.Parallel()

// Create a large enough blockchin to actually fast sync on
targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)

// Create a tester peer with the critical section state roots missing (force failures)
tester := newTester()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)

for i := 0; i < fsPivotInterval; i++ {
tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
}
// Synchronise with the peer a few times and make sure they fail until the retry limit
for i := 0; i < fsCriticalTrials; i++ {
// Attempt a sync and ensure it fails properly
if err := tester.sync("peer", nil, FastSync); err == nil {
t.Fatalf("failing fast sync succeeded: %v", err)
}
// If it's the first failure, pivot should be locked => reenable all others to detect pivot changes
if i == 0 {
tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
}
time.Sleep(100 * time.Millisecond) // Make sure no in-flight requests remain
}
// Retry limit exhausted, downloader will switch to full sync, should succeed
if err := tester.sync("peer", nil, FastSync); err != nil {
t.Fatalf("failed to synchronise blocks in slow sync: %v", err)
}
assertOwnChain(t, tester, targetBlocks+1)
}