Skip to content

Commit

Permalink
Merge pull request #3586 from JackyWYX/stream_consensus
Browse files Browse the repository at this point in the history
[Stream] added stream downloader to consensus
  • Loading branch information
rlan35 authored Mar 16, 2021
2 parents 53bde71 + 72286f0 commit 1699ff7
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 13 deletions.
2 changes: 2 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type Consensus struct {
finality int64
// finalityCounter keep tracks of the finality time
finalityCounter int64

dHelper *downloadHelper
}

// SetCommitDelay sets the commit message delay. If set to non-zero,
Expand Down
19 changes: 17 additions & 2 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ func (consensus *Consensus) Start(
break
}
}

// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed
case <-consensus.syncReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
consensus.mutex.Lock()
Expand All @@ -352,6 +354,7 @@ func (consensus *Consensus) Start(
}
consensus.mutex.Unlock()

// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed
case <-consensus.syncNotReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
Expand Down Expand Up @@ -467,13 +470,26 @@ func (consensus *Consensus) Start(
}
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Ended.")
}()

if consensus.dHelper != nil {
consensus.dHelper.start()
}
}

// Close close the consensus. If current is in normal commit phase, wait until the commit
// phase end.
func (consensus *Consensus) Close() error {
if consensus.dHelper != nil {
consensus.dHelper.close()
}
consensus.waitForCommit()
return nil
}

// waitForCommit wait extra 2 seconds for commit phase to finish
func (consensus *Consensus) waitForCommit() {
if consensus.Mode() != Normal || consensus.phase != FBFTCommit {
return nil
return
}
// We only need to wait consensus is in normal commit phase
utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait")
Expand All @@ -483,7 +499,6 @@ func (consensus *Consensus) Close() error {
utils.Logger().Warn().Msg("[shutdown] wait for consensus finished")
time.Sleep(time.Millisecond * 100)
}
return nil
}

// LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache.
Expand Down
131 changes: 131 additions & 0 deletions consensus/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package consensus

import (
"github.com/ethereum/go-ethereum/event"
"github.com/harmony-one/harmony/core/types"
"github.com/pkg/errors"
)

// downloader is the adapter interface for downloader.Downloader, which is used for
// 1. Subscribe download finished event to help syncing to the latest block.
// 2. Trigger the downloader to start working
type downloader interface {
SubscribeDownloadFinished(ch chan struct{}) event.Subscription
SubscribeDownloadStarted(ch chan struct{}) event.Subscription
DownloadAsync()
}

// Set downloader set the downloader of the shard to consensus
// TODO: It will be better to move this to consensus.New and register consensus as a service
func (consensus *Consensus) SetDownloader(d downloader) {
consensus.dHelper = newDownloadHelper(consensus, d)
}

type downloadHelper struct {
d downloader
c *Consensus

startedCh chan struct{}
finishedCh chan struct{}

startedSub event.Subscription
finishedSub event.Subscription
}

func newDownloadHelper(c *Consensus, d downloader) *downloadHelper {
startedCh := make(chan struct{}, 1)
startedSub := d.SubscribeDownloadStarted(startedCh)

finishedCh := make(chan struct{}, 1)
finishedSub := d.SubscribeDownloadFinished(finishedCh)

return &downloadHelper{
c: c,
d: d,
startedCh: startedCh,
finishedCh: finishedCh,
startedSub: startedSub,
finishedSub: finishedSub,
}
}

func (dh *downloadHelper) start() {
go dh.downloadStartedLoop()
go dh.downloadFinishedLoop()
}

func (dh *downloadHelper) close() {
dh.startedSub.Unsubscribe()
dh.finishedSub.Unsubscribe()
}

func (dh *downloadHelper) downloadStartedLoop() {
for {
select {
case <-dh.startedCh:
dh.c.BlocksNotSynchronized()

case err := <-dh.startedSub.Err():
dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed")
return
}
}
}

func (dh *downloadHelper) downloadFinishedLoop() {
for {
select {
case <-dh.finishedCh:
err := dh.c.addConsensusLastMile()
if err != nil {
dh.c.getLogger().Error().Err(err).Msg("add last mile failed")
}
dh.c.BlocksSynchronized()

case err := <-dh.finishedSub.Err():
dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed")
return
}
}
}

func (consensus *Consensus) addConsensusLastMile() error {
curBN := consensus.Blockchain.CurrentBlock().NumberU64()
blockIter, err := consensus.GetLastMileBlockIter(curBN + 1)
if err != nil {
return err
}
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := consensus.Blockchain.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
}
return nil
}

func (consensus *Consensus) spinUpStateSync() {
if consensus.dHelper != nil {
consensus.dHelper.d.DownloadAsync()
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
} else {
consensus.spinLegacyStateSync()
}
}

func (consensus *Consensus) spinLegacyStateSync() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
default:
}
}
11 changes: 0 additions & 11 deletions consensus/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,3 @@ func (consensus *Consensus) broadcastConsensusP2pMessages(p2pMsgs []*NetworkMess
}
return nil
}

func (consensus *Consensus) spinUpStateSync() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
default:
}
}

0 comments on commit 1699ff7

Please sign in to comment.