Skip to content

Commit

Permalink
Merge pull request ethereum#835 from obscuren/handler_errors
Browse files Browse the repository at this point in the history
eth, eth/downloader: error handlers and td checks
  • Loading branch information
obscuren committed May 4, 2015
2 parents 3fef601 + 37770ed commit ac85fdc
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 318 deletions.
2 changes: 1 addition & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import _ "net/http/pprof"

const (
ClientIdentifier = "Geth"
Version = "0.9.14"
Version = "0.9.15"
)

var (
Expand Down
31 changes: 23 additions & 8 deletions core/chain_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func CalcDifficulty(block, parent *types.Header) *big.Int {
}

func CalculateTD(block, parent *types.Block) *big.Int {
if parent == nil {
return block.Difficulty()
}

td := new(big.Int).Add(parent.Td, block.Header().Difficulty)

return td
Expand Down Expand Up @@ -89,6 +93,7 @@ type ChainManager struct {
futureBlocks *BlockCache

quit chan struct{}
wg sync.WaitGroup
}

func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *ChainManager {
Expand Down Expand Up @@ -478,6 +483,10 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {

func (bc *ChainManager) Stop() {
close(bc.quit)

bc.wg.Wait()

glog.V(logger.Info).Infoln("Chain manager stopped")
}

type queueEvent struct {
Expand All @@ -500,22 +509,30 @@ func (self *ChainManager) procFutureBlocks() {
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.wg.Add(1)
defer self.wg.Done()

// A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
var (
queue = make([]interface{}, len(chain))
queueEvent = queueEvent{queue: queue}
stats struct{ queued, processed int }
stats struct{ queued, processed, ignored int }
tstart = time.Now()
)
for i, block := range chain {
if block == nil {
continue
}
// Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler
block.Td = new(big.Int).Set(CalculateTD(block, self.GetBlock(block.ParentHash())))

// Call in to the block processor and check for errors. It's likely that if one block fails
// all others will fail too (unless a known block is returned).
logs, err := self.processor.Process(block)
if err != nil {
if IsKnownBlockErr(err) {
stats.ignored++
continue
}

Expand Down Expand Up @@ -545,8 +562,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err
}

block.Td = new(big.Int).Set(CalculateTD(block, self.GetBlock(block.ParentHash())))

self.mu.Lock()
{
cblock := self.currentBlock
Expand Down Expand Up @@ -589,7 +604,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
queueEvent.canonicalCount++

if glog.V(logger.Debug) {
glog.Infof("inserted block #%d (%d TXs %d UNCs) (%x...)\n", block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...)\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
}
} else {
if glog.V(logger.Detail) {
Expand All @@ -607,10 +622,10 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {

}

if (stats.queued > 0 || stats.processed > 0) && bool(glog.V(logger.Info)) {
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {
tend := time.Since(tstart)
start, end := chain[0], chain[len(chain)-1]
glog.Infof("imported %d block(s) %d queued in %v. #%v [%x / %x]\n", stats.processed, stats.queued, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
glog.Infof("imported %d block(s) (%d queued %d ignored) in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
}

go self.eventMux.Post(queueEvent)
Expand Down Expand Up @@ -654,7 +669,7 @@ func (self *ChainManager) merge(oldBlock, newBlock *types.Block) {

func (self *ChainManager) update() {
events := self.eventMux.Subscribe(queueEvent{})
futureTimer := time.NewTicker(5 * time.Second)
futureTimer := time.Tick(5 * time.Second)
out:
for {
select {
Expand All @@ -681,7 +696,7 @@ out:
self.eventMux.Post(event)
}
}
case <-futureTimer.C:
case <-futureTimer:
self.procFutureBlocks()
case <-self.quit:
break out
Expand Down
3 changes: 2 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func New(config *Config) (*Ethereum, error) {
}

eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.InsertChain)
eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.GetBlock)
eth.pow = ethash.New(eth.chainManager)
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
Expand Down Expand Up @@ -455,6 +455,7 @@ func (s *Ethereum) Stop() {
s.txSub.Unsubscribe() // quits txBroadcastLoop

s.protocolManager.Stop()
s.chainManager.Stop()
s.txPool.Stop()
s.eventMux.Stop()
if s.whisper != nil {
Expand Down
Loading

0 comments on commit ac85fdc

Please sign in to comment.