Skip to content

Commit

Permalink
Refactor header exchange to accommodate block exchange service (cosmo…
Browse files Browse the repository at this point in the history
…s#1028)

<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview

Closes: cosmos#1029 

<!-- 
Please provide an explanation of the PR, including the appropriate
context,
background, goal, and rationale. If there is an issue with this
information,
please provide a tl;dr and link the issue. 
-->

## Checklist

<!-- 
Please complete the checklist to ensure that the PR is ready to be
reviewed.

IMPORTANT:
PRs should be left in Draft until the below checklist is completed.
-->

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [x] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords

---------

Co-authored-by: Ganesha Upadhyaya <[email protected]>
  • Loading branch information
Manav-Aggarwal and Ganesha Upadhyaya authored Jun 27, 2023
1 parent 323eb87 commit ef9851e
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 44 deletions.
7 changes: 6 additions & 1 deletion node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,12 @@ func (n *FullNode) headerPublishLoop(ctx context.Context) {
for {
select {
case signedHeader := <-n.blockManager.HeaderCh:
n.hExService.writeToHeaderStoreAndBroadcast(ctx, signedHeader)
err := n.hExService.writeToHeaderStoreAndBroadcast(ctx, signedHeader)
if err != nil {
// failed to init or start headerstore
n.Logger.Error(err.Error())
return
}
case <-ctx.Done():
return
}
Expand Down
10 changes: 9 additions & 1 deletion node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,15 @@ func TestLazyAggregator(t *testing.T) {
key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
genesisValidators, signingKey := getGenesisValidatorSetWithSigner(1)
blockManagerConfig := config.BlockManagerConfig{
BlockTime: 1 * time.Millisecond,
// After the genesis header is published, the syncer is started
// which takes little longer (due to initialization) and the syncer
// tries to retrieve the genesis header and check that is it recent
// (genesis header time is not older than current minus 1.5x blocktime)
// to allow sufficient time for syncer initialization, we cannot set
// the blocktime too short. in future, we can add a configuration
// in go-header syncer initialization to not rely on blocktime, but the
// config variable
BlockTime: 500 * time.Millisecond,
NamespaceID: types.NamespaceID{1, 2, 3, 4, 5, 6, 7, 8},
}

Expand Down
103 changes: 61 additions & 42 deletions node/header_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,39 @@ import (
"github.com/rollkit/rollkit/types"
)

// P2P Exchange Service for header that implements the go-header interface.
// Contains a header store where synced headers are stored.
// Uses the go-header library for handling all P2P logic.
type HeaderExchangeService struct {
conf config.NodeConfig
genesis *tmtypes.GenesisDoc
p2p *p2p.Client
ex *goheaderp2p.Exchange[*types.SignedHeader]
conf config.NodeConfig
genesis *tmtypes.GenesisDoc
p2p *p2p.Client
ex *goheaderp2p.Exchange[*types.SignedHeader]
sub *goheaderp2p.Subscriber[*types.SignedHeader]
p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader]
headerStore *goheaderstore.Store[*types.SignedHeader]

syncer *goheadersync.Syncer[*types.SignedHeader]
sub *goheaderp2p.Subscriber[*types.SignedHeader]
p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader]
headerStore *goheaderstore.Store[*types.SignedHeader]
syncerStatus *SyncerStatus

logger log.Logger
ctx context.Context
}

func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *tmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderExchangeService, error) {
if genesis == nil {
return nil, errors.New("genesis doc cannot be nil")
}
if p2p == nil {
return nil, errors.New("p2p client cannot be nil")
}
// store is TxnDatastore, but we require Batching, hence the type assertion
// note, the badger datastore impl that is used in the background implements both
storeBatch, ok := store.(ds.Batching)
if !ok {
return nil, errors.New("failed to access the datastore")
}
ss, err := goheaderstore.NewStore[*types.SignedHeader](storeBatch)
ss, err := goheaderstore.NewStore[*types.SignedHeader](storeBatch, goheaderstore.WithStorePrefix("headerEx"))
if err != nil {
return nil, fmt.Errorf("failed to initialize the header store: %w", err)
}
Expand All @@ -63,87 +73,84 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c
}

func (hExService *HeaderExchangeService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error {
if initial == nil {
return fmt.Errorf("failed to initialize the headerstore and start syncer")
}
if err := hExService.headerStore.Init(ctx, initial); err != nil {
return err
}
if err := hExService.syncer.Start(hExService.ctx); err != nil {
if err := hExService.StartSyncer(); err != nil {
return err
}
hExService.syncerStatus.m.Lock()
defer hExService.syncerStatus.m.Unlock()
hExService.syncerStatus.started = true
return nil
}

func (hExService *HeaderExchangeService) tryInitHeaderStoreAndStartSyncer(ctx context.Context, trustedHeader *types.SignedHeader) {
if trustedHeader != nil {
if err := hExService.initHeaderStoreAndStartSyncer(ctx, trustedHeader); err != nil {
hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err)
}
}
}

func (hExService *HeaderExchangeService) writeToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) {
// Initialize header store if needed and broadcasts provided header.
// Note: Only returns an error in case header store can't be initialized. Logs error if there's one while broadcasting.
func (hExService *HeaderExchangeService) writeToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error {
// For genesis header initialize the store and start the syncer
if signedHeader.Height() == hExService.genesis.InitialHeight {
if err := hExService.headerStore.Init(ctx, signedHeader); err != nil {
hExService.logger.Error("failed to initialize header store", "error", err)
return fmt.Errorf("failed to initialize header store")
}

if err := hExService.syncer.Start(hExService.ctx); err != nil {
hExService.logger.Error("failed to start syncer after initializing header store", "error", err)
if err := hExService.StartSyncer(); err != nil {
return fmt.Errorf("failed to start syncer after initializing header store: %w", err)
}
}

// Broadcast for subscribers
if err := hExService.sub.Broadcast(ctx, signedHeader); err != nil {
hExService.logger.Error("failed to broadcast block header", "error", err)
}
return nil
}

func (hExService *HeaderExchangeService) isInitialized() bool {
return hExService.headerStore.Height() > 0
}

// OnStart is a part of Service interface.
func (hExService *HeaderExchangeService) Start() error {
var err error
// have to do the initializations here to utilize the p2p node which is created on start
ps := hExService.p2p.PubSub()
hExService.sub = goheaderp2p.NewSubscriber[*types.SignedHeader](ps, pubsub.DefaultMsgIdFn, hExService.genesis.ChainID)
if err = hExService.sub.Start(hExService.ctx); err != nil {
if err := hExService.sub.Start(hExService.ctx); err != nil {
return fmt.Errorf("error while starting subscriber: %w", err)
}
if _, err := hExService.sub.Subscribe(); err != nil {
return fmt.Errorf("error while subscribing: %w", err)
}

if err = hExService.headerStore.Start(hExService.ctx); err != nil {
if err := hExService.headerStore.Start(hExService.ctx); err != nil {
return fmt.Errorf("error while starting header store: %w", err)
}

_, _, network := hExService.p2p.Info()
var err error
if hExService.p2pServer, err = newP2PServer(hExService.p2p.Host(), hExService.headerStore, network); err != nil {
return err
return fmt.Errorf("error while creating p2p server: %w", err)
}
if err = hExService.p2pServer.Start(hExService.ctx); err != nil {
if err := hExService.p2pServer.Start(hExService.ctx); err != nil {
return fmt.Errorf("error while starting p2p server: %w", err)
}

peerIDs := hExService.p2p.PeerIDs()
if hExService.ex, err = newP2PExchange(hExService.p2p.Host(), peerIDs, network, hExService.genesis.ChainID, hExService.p2p.ConnectionGater()); err != nil {
return err
return fmt.Errorf("error while creating exchange: %w", err)
}
if err = hExService.ex.Start(hExService.ctx); err != nil {
if err := hExService.ex.Start(hExService.ctx); err != nil {
return fmt.Errorf("error while starting exchange: %w", err)
}

if hExService.syncer, err = newSyncer(hExService.ex, hExService.headerStore, hExService.sub, goheadersync.WithBlockTime(hExService.conf.BlockTime)); err != nil {
return err
return fmt.Errorf("error while creating syncer: %w", err)
}

// Check if the headerstore is not initialized and try initializing
if hExService.headerStore.Height() > 0 {
if err := hExService.syncer.Start(hExService.ctx); err != nil {
if hExService.isInitialized() {
if err := hExService.StartSyncer(); err != nil {
return fmt.Errorf("error while starting the syncer: %w", err)
}
hExService.syncerStatus.started = true
return nil
}

Expand All @@ -165,12 +172,12 @@ func (hExService *HeaderExchangeService) Start() error {
if trustedHeader, err = hExService.ex.GetByHeight(hExService.ctx, uint64(hExService.genesis.InitialHeight)); err != nil {
// Full/light nodes have to wait for aggregator to publish the genesis header
// proposing aggregator can init the store and start the syncer when the first header is published
hExService.logger.Info("failed to fetch the genesis header", "error", err)
return fmt.Errorf("failed to fetch the genesis header: %w", err)
}
}
}
go hExService.tryInitHeaderStoreAndStartSyncer(hExService.ctx, trustedHeader)
return hExService.initHeaderStoreAndStartSyncer(hExService.ctx, trustedHeader)

}
return nil
}

Expand All @@ -180,9 +187,7 @@ func (hExService *HeaderExchangeService) Stop() error {
err = multierr.Append(err, hExService.p2pServer.Stop(hExService.ctx))
err = multierr.Append(err, hExService.ex.Stop(hExService.ctx))
err = multierr.Append(err, hExService.sub.Stop(hExService.ctx))
hExService.syncerStatus.m.Lock()
defer hExService.syncerStatus.m.Unlock()
if hExService.syncerStatus.started {
if hExService.syncerStatus.isStarted() {
err = multierr.Append(err, hExService.syncer.Stop(hExService.ctx))
}
return err
Expand Down Expand Up @@ -224,3 +229,17 @@ func newSyncer(
) (*goheadersync.Syncer[*types.SignedHeader], error) {
return goheadersync.NewSyncer[*types.SignedHeader](ex, store, sub, opt)
}

func (hExService *HeaderExchangeService) StartSyncer() error {
hExService.syncerStatus.m.Lock()
defer hExService.syncerStatus.m.Unlock()
if hExService.syncerStatus.started {
return nil
}
err := hExService.syncer.Start(hExService.ctx)
if err != nil {
return err
}
hExService.syncerStatus.started = true
return nil
}

0 comments on commit ef9851e

Please sign in to comment.