Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(state): upgrade state #591

Merged
merged 8 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 74 additions & 78 deletions driver/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"math/big"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -20,69 +21,48 @@ import (

// State contains all states which will be used by driver.
type State struct {
// Subscriptions, will automatically resubscribe on errors
l1HeadSub event.Subscription // L1 new heads
l2HeadSub event.Subscription // L2 new heads
l2TransitionProvedSub event.Subscription // TaikoL1.TransitionProved events
l2BlockVerifiedSub event.Subscription // TaikoL1.BlockVerified events
l2BlockProposedSub event.Subscription // TaikoL1.BlockProposed events

l1HeadCh chan *types.Header
l2HeadCh chan *types.Header
blockProposedCh chan *bindings.TaikoL1ClientBlockProposed
transitionProvedCh chan *bindings.TaikoL1ClientTransitionProved
blockVerifiedCh chan *bindings.TaikoL1ClientBlockVerified

// Feeds
l1HeadsFeed event.Feed // L1 new heads notification feed

l1Head *atomic.Value // Latest known L1 head
l2Head *atomic.Value // Current L2 execution engine's local chain head
l2HeadBlockID *atomic.Value // Latest known L2 block ID
l2VerifiedHead *atomic.Value // Latest known L2 verified head
l1Current *atomic.Value // Current L1 block sync cursor
l1Head *atomic.Value // Latest known L1 head
l2Head *atomic.Value // Current L2 execution engine's local chain head
l2HeadBlockID *atomic.Value // Latest known L2 block ID
l1Current *atomic.Value // Current L1 block sync cursor

// Constants
GenesisL1Height *big.Int
BlockDeadendHash common.Hash
GenesisL1Height *big.Int

// RPC clients
rpc *rpc.Client

stopCh chan struct{}
wg sync.WaitGroup
}

// New creates a new driver state instance.
func New(ctx context.Context, rpc *rpc.Client) (*State, error) {
s := &State{
rpc: rpc,
l1Head: new(atomic.Value),
l2Head: new(atomic.Value),
l2HeadBlockID: new(atomic.Value),
l2VerifiedHead: new(atomic.Value),
l1Current: new(atomic.Value),
l1HeadCh: make(chan *types.Header, 10),
l2HeadCh: make(chan *types.Header, 10),
blockProposedCh: make(chan *bindings.TaikoL1ClientBlockProposed, 10),
transitionProvedCh: make(chan *bindings.TaikoL1ClientTransitionProved, 10),
blockVerifiedCh: make(chan *bindings.TaikoL1ClientBlockVerified, 10),
BlockDeadendHash: common.BigToHash(common.Big1),
rpc: rpc,
l1Head: new(atomic.Value),
l2Head: new(atomic.Value),
l2HeadBlockID: new(atomic.Value),
l1Current: new(atomic.Value),
stopCh: make(chan struct{}),
}

if err := s.init(ctx); err != nil {
return nil, err
}

s.startSubscriptions(ctx)
go s.eventLoop(ctx)

return s, nil
}

// Close closes all inner subscriptions.
func (s *State) Close() {
s.l1HeadSub.Unsubscribe()
s.l2HeadSub.Unsubscribe()
s.l2BlockVerifiedSub.Unsubscribe()
s.l2BlockProposedSub.Unsubscribe()
s.l2TransitionProvedSub.Unsubscribe()
close(s.stopCh)
s.wg.Wait()
}

// init fetches the latest status and initializes the state instance.
Expand Down Expand Up @@ -123,47 +103,63 @@ func (s *State) init(ctx context.Context) error {
return nil
}

// startSubscriptions initializes all subscriptions in the given state instance.
func (s *State) startSubscriptions(ctx context.Context) {
s.l1HeadSub = rpc.SubscribeChainHead(s.rpc.L1, s.l1HeadCh)
s.l2HeadSub = rpc.SubscribeChainHead(s.rpc.L2, s.l2HeadCh)
s.l2BlockVerifiedSub = rpc.SubscribeBlockVerified(s.rpc.TaikoL1, s.blockVerifiedCh)
s.l2BlockProposedSub = rpc.SubscribeBlockProposed(s.rpc.TaikoL1, s.blockProposedCh)
s.l2TransitionProvedSub = rpc.SubscribeTransitionProved(s.rpc.TaikoL1, s.transitionProvedCh)

go func() {
for {
select {
case <-ctx.Done():
return
case e := <-s.blockProposedCh:
s.setHeadBlockID(e.BlockId)
case e := <-s.transitionProvedCh:
log.Info(
"✅ Transition proven",
"blockID", e.BlockId,
"parentHash", common.Hash(e.Tran.ParentHash),
"hash", common.Hash(e.Tran.BlockHash),
"stateRoot", common.Hash(e.Tran.StateRoot),
"prover", e.Prover,
)
case e := <-s.blockVerifiedCh:
log.Info(
"📈 Block verified",
"blockID", e.BlockId,
"hash", common.Hash(e.BlockHash),
"stateRoot", common.Hash(e.StateRoot),
"assignedProver", e.AssignedProver,
"prover", e.Prover,
)
case newHead := <-s.l1HeadCh:
s.setL1Head(newHead)
s.l1HeadsFeed.Send(newHead)
case newHead := <-s.l2HeadCh:
s.setL2Head(newHead)
}
}
// eventLoop initializes and starts all subscriptions and callbacks in the given state instance.
func (s *State) eventLoop(ctx context.Context) {
s.wg.Add(1)
defer s.wg.Done()

l1HeadCh := make(chan *types.Header, 10)
l2HeadCh := make(chan *types.Header, 10)
blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, 10)
transitionProvedCh := make(chan *bindings.TaikoL1ClientTransitionProved, 10)
blockVerifiedCh := make(chan *bindings.TaikoL1ClientBlockVerified, 10)

l1HeadSub := rpc.SubscribeChainHead(s.rpc.L1, l1HeadCh)
l2HeadSub := rpc.SubscribeChainHead(s.rpc.L2, l2HeadCh)
l2BlockVerifiedSub := rpc.SubscribeBlockVerified(s.rpc.TaikoL1, blockVerifiedCh)
l2BlockProposedSub := rpc.SubscribeBlockProposed(s.rpc.TaikoL1, blockProposedCh)
l2TransitionProvedSub := rpc.SubscribeTransitionProved(s.rpc.TaikoL1, transitionProvedCh)
defer func() {
l1HeadSub.Unsubscribe()
l2HeadSub.Unsubscribe()
l2BlockVerifiedSub.Unsubscribe()
l2BlockProposedSub.Unsubscribe()
l2TransitionProvedSub.Unsubscribe()
}()

for {
select {
case <-ctx.Done():
return
case <-s.stopCh:
return
case e := <-blockProposedCh:
s.setHeadBlockID(e.BlockId)
case e := <-transitionProvedCh:
log.Info(
"✅ Transition proven",
"blockID", e.BlockId,
"parentHash", common.Hash(e.Tran.ParentHash),
"hash", common.Hash(e.Tran.BlockHash),
"stateRoot", common.Hash(e.Tran.StateRoot),
"prover", e.Prover,
)
case e := <-blockVerifiedCh:
log.Info(
"📈 Block verified",
"blockID", e.BlockId,
"hash", common.Hash(e.BlockHash),
"stateRoot", common.Hash(e.StateRoot),
"assignedProver", e.AssignedProver,
"prover", e.Prover,
)
case newHead := <-l1HeadCh:
s.setL1Head(newHead)
s.l1HeadsFeed.Send(newHead)
case newHead := <-l2HeadCh:
s.setL2Head(newHead)
}
}
}

// setL1Head sets the L1 head concurrent safely.
Expand Down
12 changes: 5 additions & 7 deletions pkg/rpc/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,10 @@ func SubscribeChainHead(

// waitSubErr keeps waiting until the given subscription failed.
func waitSubErr(ctx context.Context, sub event.Subscription) (event.Subscription, error) {
for {
select {
case err := <-sub.Err():
return sub, err
case <-ctx.Done():
return sub, nil
}
select {
case err := <-sub.Err():
return sub, err
case <-ctx.Done():
return sub, nil
}
}
1 change: 0 additions & 1 deletion pkg/rpc/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/stretchr/testify/require"

"github.com/taikoxyz/taiko-client/bindings"
)

Expand Down
Loading