Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(driver): improve driver state (#591)
Browse files Browse the repository at this point in the history
Co-authored-by: David <[email protected]>
  • Loading branch information
mask-pp and davidtaikocha authored Mar 6, 2024
1 parent 40325bc commit 1fd9084
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 86 deletions.
152 changes: 74 additions & 78 deletions driver/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"math/big"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -20,69 +21,48 @@ import (

// State contains all states which will be used by driver.
type State struct {
// Subscriptions, will automatically resubscribe on errors
l1HeadSub event.Subscription // L1 new heads
l2HeadSub event.Subscription // L2 new heads
l2TransitionProvedSub event.Subscription // TaikoL1.TransitionProved events
l2BlockVerifiedSub event.Subscription // TaikoL1.BlockVerified events
l2BlockProposedSub event.Subscription // TaikoL1.BlockProposed events

l1HeadCh chan *types.Header
l2HeadCh chan *types.Header
blockProposedCh chan *bindings.TaikoL1ClientBlockProposed
transitionProvedCh chan *bindings.TaikoL1ClientTransitionProved
blockVerifiedCh chan *bindings.TaikoL1ClientBlockVerified

// Feeds
l1HeadsFeed event.Feed // L1 new heads notification feed

l1Head *atomic.Value // Latest known L1 head
l2Head *atomic.Value // Current L2 execution engine's local chain head
l2HeadBlockID *atomic.Value // Latest known L2 block ID
l2VerifiedHead *atomic.Value // Latest known L2 verified head
l1Current *atomic.Value // Current L1 block sync cursor
l1Head *atomic.Value // Latest known L1 head
l2Head *atomic.Value // Current L2 execution engine's local chain head
l2HeadBlockID *atomic.Value // Latest known L2 block ID
l1Current *atomic.Value // Current L1 block sync cursor

// Constants
GenesisL1Height *big.Int
BlockDeadendHash common.Hash
GenesisL1Height *big.Int

// RPC clients
rpc *rpc.Client

stopCh chan struct{}
wg sync.WaitGroup
}

// New creates a new driver state instance.
func New(ctx context.Context, rpc *rpc.Client) (*State, error) {
s := &State{
rpc: rpc,
l1Head: new(atomic.Value),
l2Head: new(atomic.Value),
l2HeadBlockID: new(atomic.Value),
l2VerifiedHead: new(atomic.Value),
l1Current: new(atomic.Value),
l1HeadCh: make(chan *types.Header, 10),
l2HeadCh: make(chan *types.Header, 10),
blockProposedCh: make(chan *bindings.TaikoL1ClientBlockProposed, 10),
transitionProvedCh: make(chan *bindings.TaikoL1ClientTransitionProved, 10),
blockVerifiedCh: make(chan *bindings.TaikoL1ClientBlockVerified, 10),
BlockDeadendHash: common.BigToHash(common.Big1),
rpc: rpc,
l1Head: new(atomic.Value),
l2Head: new(atomic.Value),
l2HeadBlockID: new(atomic.Value),
l1Current: new(atomic.Value),
stopCh: make(chan struct{}),
}

if err := s.init(ctx); err != nil {
return nil, err
}

s.startSubscriptions(ctx)
go s.eventLoop(ctx)

return s, nil
}

// Close closes all inner subscriptions.
func (s *State) Close() {
s.l1HeadSub.Unsubscribe()
s.l2HeadSub.Unsubscribe()
s.l2BlockVerifiedSub.Unsubscribe()
s.l2BlockProposedSub.Unsubscribe()
s.l2TransitionProvedSub.Unsubscribe()
close(s.stopCh)
s.wg.Wait()
}

// init fetches the latest status and initializes the state instance.
Expand Down Expand Up @@ -123,47 +103,63 @@ func (s *State) init(ctx context.Context) error {
return nil
}

// startSubscriptions initializes all subscriptions in the given state instance.
func (s *State) startSubscriptions(ctx context.Context) {
s.l1HeadSub = rpc.SubscribeChainHead(s.rpc.L1, s.l1HeadCh)
s.l2HeadSub = rpc.SubscribeChainHead(s.rpc.L2, s.l2HeadCh)
s.l2BlockVerifiedSub = rpc.SubscribeBlockVerified(s.rpc.TaikoL1, s.blockVerifiedCh)
s.l2BlockProposedSub = rpc.SubscribeBlockProposed(s.rpc.TaikoL1, s.blockProposedCh)
s.l2TransitionProvedSub = rpc.SubscribeTransitionProved(s.rpc.TaikoL1, s.transitionProvedCh)

go func() {
for {
select {
case <-ctx.Done():
return
case e := <-s.blockProposedCh:
s.setHeadBlockID(e.BlockId)
case e := <-s.transitionProvedCh:
log.Info(
"✅ Transition proven",
"blockID", e.BlockId,
"parentHash", common.Hash(e.Tran.ParentHash),
"hash", common.Hash(e.Tran.BlockHash),
"stateRoot", common.Hash(e.Tran.StateRoot),
"prover", e.Prover,
)
case e := <-s.blockVerifiedCh:
log.Info(
"📈 Block verified",
"blockID", e.BlockId,
"hash", common.Hash(e.BlockHash),
"stateRoot", common.Hash(e.StateRoot),
"assignedProver", e.AssignedProver,
"prover", e.Prover,
)
case newHead := <-s.l1HeadCh:
s.setL1Head(newHead)
s.l1HeadsFeed.Send(newHead)
case newHead := <-s.l2HeadCh:
s.setL2Head(newHead)
}
}
// eventLoop initializes and starts all subscriptions and callbacks in the given state instance.
func (s *State) eventLoop(ctx context.Context) {
s.wg.Add(1)
defer s.wg.Done()

l1HeadCh := make(chan *types.Header, 10)
l2HeadCh := make(chan *types.Header, 10)
blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, 10)
transitionProvedCh := make(chan *bindings.TaikoL1ClientTransitionProved, 10)
blockVerifiedCh := make(chan *bindings.TaikoL1ClientBlockVerified, 10)

l1HeadSub := rpc.SubscribeChainHead(s.rpc.L1, l1HeadCh)
l2HeadSub := rpc.SubscribeChainHead(s.rpc.L2, l2HeadCh)
l2BlockVerifiedSub := rpc.SubscribeBlockVerified(s.rpc.TaikoL1, blockVerifiedCh)
l2BlockProposedSub := rpc.SubscribeBlockProposed(s.rpc.TaikoL1, blockProposedCh)
l2TransitionProvedSub := rpc.SubscribeTransitionProved(s.rpc.TaikoL1, transitionProvedCh)
defer func() {
l1HeadSub.Unsubscribe()
l2HeadSub.Unsubscribe()
l2BlockVerifiedSub.Unsubscribe()
l2BlockProposedSub.Unsubscribe()
l2TransitionProvedSub.Unsubscribe()
}()

for {
select {
case <-ctx.Done():
return
case <-s.stopCh:
return
case e := <-blockProposedCh:
s.setHeadBlockID(e.BlockId)
case e := <-transitionProvedCh:
log.Info(
"✅ Transition proven",
"blockID", e.BlockId,
"parentHash", common.Hash(e.Tran.ParentHash),
"hash", common.Hash(e.Tran.BlockHash),
"stateRoot", common.Hash(e.Tran.StateRoot),
"prover", e.Prover,
)
case e := <-blockVerifiedCh:
log.Info(
"📈 Block verified",
"blockID", e.BlockId,
"hash", common.Hash(e.BlockHash),
"stateRoot", common.Hash(e.StateRoot),
"assignedProver", e.AssignedProver,
"prover", e.Prover,
)
case newHead := <-l1HeadCh:
s.setL1Head(newHead)
s.l1HeadsFeed.Send(newHead)
case newHead := <-l2HeadCh:
s.setL2Head(newHead)
}
}
}

// setL1Head sets the L1 head concurrent safely.
Expand Down
12 changes: 5 additions & 7 deletions pkg/rpc/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,10 @@ func SubscribeChainHead(

// waitSubErr keeps waiting until the given subscription failed.
func waitSubErr(ctx context.Context, sub event.Subscription) (event.Subscription, error) {
for {
select {
case err := <-sub.Err():
return sub, err
case <-ctx.Done():
return sub, nil
}
select {
case err := <-sub.Err():
return sub, err
case <-ctx.Done():
return sub, nil
}
}
1 change: 0 additions & 1 deletion pkg/rpc/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/stretchr/testify/require"

"github.com/taikoxyz/taiko-client/bindings"
)

Expand Down

0 comments on commit 1fd9084

Please sign in to comment.