Skip to content

Commit

Permalink
Skip verify on build (#126)
Browse files Browse the repository at this point in the history
* insert height into state

* add merkledb metrics

* cleanup more metrics handling

* add additional arguments for config

* add HeightKey

* log lock wait

* use NodeID as random

* fix profiles

* gofumpt

* gofmt

* add more logs

* perform proper verify

* cache built block

* reduce memory usage of tstate

* improve memory efficiency of storage

* align devnets and scripts

* re-add proposer delay

* use default peer gossip size
  • Loading branch information
patrick-ogrady authored Apr 3, 2023
1 parent 480e474 commit 52f5a9c
Show file tree
Hide file tree
Showing 18 changed files with 197 additions and 93 deletions.
78 changes: 47 additions & 31 deletions chain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package chain

import (
"context"
"encoding/binary"
"fmt"
"time"

Expand Down Expand Up @@ -97,6 +98,7 @@ type StatelessBlock struct {
txsSet set.Set[ids.ID]

warpMessages map[ids.ID]*warpJob
containsWarp bool // this allows us to avoid allocating a map when we build
bctx *block.Context
vdrState validators.State

Expand Down Expand Up @@ -143,7 +145,8 @@ func ParseBlock(
return ParseStatefulBlock(ctx, blk, source, status, vm)
}

func (b *StatelessBlock) populateTxs(ctx context.Context, verifySigs bool) error {
// populateTxs is only called on blocks we did not build
func (b *StatelessBlock) populateTxs(ctx context.Context) error {
ctx, span := b.vm.Tracer().Start(ctx, "StatelessBlock.populateTxs")
defer span.End()

Expand All @@ -159,10 +162,7 @@ func (b *StatelessBlock) populateTxs(ctx context.Context, verifySigs bool) error
b.txsSet = set.NewSet[ids.ID](len(b.Txs))
b.warpMessages = map[ids.ID]*warpJob{}
for _, tx := range b.Txs {
sigTask := tx.AuthAsyncVerify()
if verifySigs {
b.sigJob.Go(sigTask)
}
b.sigJob.Go(tx.AuthAsyncVerify())
if b.txsSet.Contains(tx.ID()) {
return ErrDuplicateTx
}
Expand All @@ -188,6 +188,7 @@ func (b *StatelessBlock) populateTxs(ctx context.Context, verifySigs bool) error
verifiedChan: make(chan bool, 1),
warpNum: len(b.warpMessages),
}
b.containsWarp = true
}
}
b.sigJob.Done(func() { sspan.End() })
Expand Down Expand Up @@ -243,13 +244,12 @@ func ParseStatefulBlock(
}

// Populate hashes and tx set
return b, b.populateTxs(ctx, true)
return b, b.populateTxs(ctx)
}

// [init] is used during block building and testing
// TODO: remove init
func (b *StatelessBlock) init(ctx context.Context, results []*Result, validateSigs bool) error {
ctx, span := b.vm.Tracer().Start(ctx, "StatelessBlock.init")
// [initializeBuilt] is invoked after a block is built
func (b *StatelessBlock) initializeBuilt(ctx context.Context, state merkledb.TrieView, results []*Result) error {
_, span := b.vm.Tracer().Start(ctx, "StatelessBlock.initializeBuilt")
defer span.End()

blk, err := b.StatefulBlock.Marshal(b.vm.Registry())
Expand All @@ -258,19 +258,25 @@ func (b *StatelessBlock) init(ctx context.Context, results []*Result, validateSi
}
b.bytes = blk
b.id = utils.ToID(b.bytes)
b.state = state
b.t = time.Unix(b.StatefulBlock.Tmstmp, 0)
b.results = results

// Populate hashes and tx set
return b.populateTxs(ctx, validateSigs)
b.txsSet = set.NewSet[ids.ID](len(b.Txs))
for _, tx := range b.Txs {
b.txsSet.Add(tx.ID())
if tx.WarpMessage != nil {
b.containsWarp = true
}
}
return nil
}

// implements "snowman.Block.choices.Decidable"
func (b *StatelessBlock) ID() ids.ID { return b.id }

// implements "block.WithVerifyContext"
func (b *StatelessBlock) ShouldVerifyWithContext(context.Context) (bool, error) {
return len(b.warpMessages) > 0, nil
return b.containsWarp, nil
}

// implements "block.WithVerifyContext"
Expand All @@ -283,6 +289,7 @@ func (b *StatelessBlock) VerifyWithContext(ctx context.Context, bctx *block.Cont
attribute.Int64("height", int64(b.Hght)),
attribute.Bool("stateReady", stateReady),
attribute.Int64("pchainHeight", int64(bctx.PChainHeight)),
attribute.Bool("built", b.Processed()),
),
)
defer span.End()
Expand All @@ -303,6 +310,7 @@ func (b *StatelessBlock) Verify(ctx context.Context) error {
attribute.Int("txs", len(b.Txs)),
attribute.Int64("height", int64(b.Hght)),
attribute.Bool("stateReady", stateReady),
attribute.Bool("built", b.Processed()),
),
)
defer span.End()
Expand All @@ -311,9 +319,18 @@ func (b *StatelessBlock) Verify(ctx context.Context) error {
}

func (b *StatelessBlock) verify(ctx context.Context, stateReady bool) error {
// If the state of the accepted tip has been fully fetched, it is safe to
// verify any block.
if stateReady {
log := b.vm.Logger()
switch {
case !stateReady:
// If the state of the accepted tip has not been fully fetched, it is not safe to
// verify any block.
log.Info("skipping verification, state not ready", zap.Uint64("height", b.Hght), zap.Stringer("blkID", b.ID()))
case b.Processed():
// If we built the block, the state will already be populated and we don't
// need to compute it (we assume that we built a correct block and it isn't
// necessary to re-verify anything).
log.Info("skipping verification, already processed", zap.Uint64("height", b.Hght), zap.Stringer("blkID", b.ID()))
default:
// Parent may not be processed when we verify this block so [verify] may
// recursively compute missing state.
state, err := b.innerVerify(ctx)
Expand All @@ -325,7 +342,7 @@ func (b *StatelessBlock) verify(ctx context.Context, stateReady bool) error {

// At any point after this, we may attempt to verify the block. We should be
// sure we are prepared to do so.

//
// NOTE: mempool is modified by VM handler
b.vm.Verified(ctx, b)
return nil
Expand Down Expand Up @@ -387,9 +404,8 @@ func (b *StatelessBlock) verifyWarpMessage(ctx context.Context, r Rules, msg *wa
// state sync)
func (b *StatelessBlock) innerVerify(ctx context.Context) (merkledb.TrieView, error) {
var (
log = b.vm.Logger()
built = len(b.results) > 0
r = b.vm.Rules(b.Tmstmp)
log = b.vm.Logger()
r = b.vm.Rules(b.Tmstmp)
)

// Perform basic correctness checks before doing any expensive work
Expand Down Expand Up @@ -452,7 +468,7 @@ func (b *StatelessBlock) innerVerify(ctx context.Context) (merkledb.TrieView, er

// Start validating warp messages, if they exist
var invalidWarpResult bool
if len(b.warpMessages) > 0 {
if b.containsWarp {
if b.bctx == nil {
log.Error(
"missing verify block context",
Expand Down Expand Up @@ -567,9 +583,12 @@ func (b *StatelessBlock) innerVerify(ctx context.Context) (merkledb.TrieView, er
return nil, ErrWarpResultMismatch
}

// Store height in state to prevent duplicate roots
if err := state.Insert(ctx, b.vm.StateManager().HeightKey(), binary.BigEndian.AppendUint64(nil, b.Hght)); err != nil {
return nil, err
}

// Compute state root
// TODO: consider adding the parent root or height here to ensure state roots
// are never repeated
computedRoot, err := state.GetMerkleRoot(ctx)
if err != nil {
return nil, err
Expand All @@ -584,13 +603,10 @@ func (b *StatelessBlock) innerVerify(ctx context.Context) (merkledb.TrieView, er
}

// Ensure signatures are verified
if !built { // don't need to verify sigs if built
_, sspan := b.vm.Tracer().Start(ctx, "StatelessBlock.Verify.WaitSignatures")
if err := b.sigJob.Wait(); err != nil {
sspan.End()
return nil, err
}
sspan.End()
_, sspan := b.vm.Tracer().Start(ctx, "StatelessBlock.Verify.WaitSignatures")
defer sspan.End()
if err := b.sigJob.Wait(); err != nil {
return nil, err
}
return state, nil
}
Expand Down
21 changes: 18 additions & 3 deletions chain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ package chain

import (
"context"
"encoding/binary"
"errors"
"fmt"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
smblock "github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
Expand Down Expand Up @@ -47,7 +47,7 @@ func BuildBlock(
vm VM,
preferred ids.ID,
blockContext *smblock.Context,
) (snowman.Block, error) {
) (*StatelessBlock, error) {
ctx, span := vm.Tracer().Start(ctx, "chain.BuildBlock")
defer span.End()
log := vm.Logger()
Expand Down Expand Up @@ -88,10 +88,16 @@ func BuildBlock(

vdrState = vm.ValidatorState()
sm = vm.StateManager()

start = time.Now()
lockWait time.Duration
)
mempoolErr := mempool.Build(
ctx,
func(fctx context.Context, next *Transaction) (cont bool, restore bool, removeAcct bool, err error) {
if txsAttempted == 0 {
lockWait = time.Since(start)
}
txsAttempted++

// Ensure we can process if transaction includes a warp message
Expand Down Expand Up @@ -249,14 +255,21 @@ func BuildBlock(
if err := ts.WriteChanges(ctx, state, vm.Tracer()); err != nil {
return nil, err
}

// Store height in state to prevent duplicate roots
if err := state.Insert(ctx, sm.HeightKey(), binary.BigEndian.AppendUint64(nil, b.Hght)); err != nil {
return nil, err
}

// Compute state root after all data has been written to trie
root, err := state.GetMerkleRoot(ctx)
if err != nil {
return nil, err
}
b.StateRoot = root

// Compute block hash and marshaled representation
if err := b.init(ctx, results, false); err != nil {
if err := b.initializeBuilt(ctx, state, results); err != nil {
return nil, err
}
log.Info(
Expand All @@ -265,6 +278,8 @@ func BuildBlock(
zap.Int("attempted", txsAttempted),
zap.Int("added", len(b.Txs)),
zap.Int("mempool size", b.vm.Mempool().Len(ctx)),
zap.Duration("mempool lock wait", lockWait),
zap.Bool("context", blockContext != nil),
)
return b, nil
}
1 change: 1 addition & 0 deletions chain/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type Rules interface {
// in a structured manner. If we did not use [StateManager], we may overwrite
// state written by actions or auth.
type StateManager interface {
HeightKey() []byte
IncomingWarpKey(sourceChainID ids.ID, msgID ids.ID) []byte
OutgoingWarpKey(txID ids.ID) []byte
}
Expand Down
2 changes: 0 additions & 2 deletions chain/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ func (t *Transaction) Expiry() int64 { return t.Base.Timestamp }
func (t *Transaction) UnitPrice() uint64 { return t.Base.UnitPrice }

// It is ok to have duplicate ReadKeys...the processor will skip them
//
// TODO: verify the invariant that [t.id] is set by this point
func (t *Transaction) StateKeys(stateMapping StateManager) [][]byte {
keys := append(t.Action.StateKeys(t.Auth, t.ID()), t.Auth.StateKeys()...)
if t.WarpMessage != nil {
Expand Down
5 changes: 2 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/profiler"
"github.com/ava-labs/avalanchego/utils/units"
"github.com/ava-labs/hypersdk/trace"
"github.com/ava-labs/hypersdk/vm"
)
Expand All @@ -28,13 +27,13 @@ func (c *Config) GetDecisionsPort() uint16 { return 0 } // auto-as
func (c *Config) GetBlocksPort() uint16 { return 0 } // auto-assigned
func (c *Config) GetStreamingBacklogSize() int { return 1024 }
func (c *Config) GetStateHistoryLength() int { return 256 }
func (c *Config) GetStateCacheSize() int { return 1 * units.GiB }
func (c *Config) GetStateCacheSize() int { return 65_536 } // nodes
func (c *Config) GetAcceptorSize() int { return 1024 }
func (c *Config) GetTraceConfig() *trace.Config { return &trace.Config{Enabled: false} }
func (c *Config) GetStateSyncParallelism() int { return 4 }
func (c *Config) GetStateSyncMinBlocks() uint64 { return 256 }
func (c *Config) GetStateSyncServerDelay() time.Duration { return 0 } // used for testing
func (c *Config) GetBlockLRUSize() int { return 128 }
func (c *Config) GetBlockLRUSize() int { return 128 }

func (c *Config) GetContinuousProfilerConfig() *profiler.Config {
return &profiler.Config{Enabled: false}
Expand Down
5 changes: 4 additions & 1 deletion examples/tokenvm/DEVNETS.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ avalanchego_config:
throttler-inbound-cpu-validator-alloc: 100000
throttler-inbound-disk-validator-alloc: 10737418240000
throttler-outbound-validator-alloc-size: 107374182
snow-mixed-query-num-push-vdr-uint: 10
consensus-on-accept-gossip-peer-size: 0
consensus-accepted-frontier-gossip-peer-size: 0
```

Make sure to remove `throttler-inbound-at-large-alloc-size` and
Expand Down Expand Up @@ -234,7 +237,7 @@ rm -f /tmp/avalanche-ops/tokenvm-genesis.json
--genesis-file /tmp/avalanche-ops/tokenvm-genesis.json \
--max-block-units 4000000 \
--window-target-units 100000000000 \
--window-target-blocks 20
--window-target-blocks 30
cat /tmp/avalanche-ops/tokenvm-genesis.json
cat <<EOF > /tmp/avalanche-ops/tokenvm-chain-config.json
Expand Down
6 changes: 5 additions & 1 deletion examples/tokenvm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config
import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -35,7 +36,7 @@ type Config struct {
TraceSampleRate float64 `json:"traceSampleRate"`

// Profiling
ContinuousProfilerDir string `json:"continuousProfilerDir"`
ContinuousProfilerDir string `json:"continuousProfilerDir"` // "*" is replaced with rand int

// Streaming Ports
DecisionsPort uint16 `json:"decisionsPort"`
Expand Down Expand Up @@ -123,6 +124,9 @@ func (c *Config) GetContinuousProfilerConfig() *profiler.Config {
if len(c.ContinuousProfilerDir) == 0 {
return &profiler.Config{Enabled: false}
}
// Replace all instances of "*" with nodeID. This is useful when
// running multiple instances of tokenvm on the same machine.
c.ContinuousProfilerDir = strings.ReplaceAll(c.ContinuousProfilerDir, "*", c.nodeID.String())
return &profiler.Config{
Enabled: true,
Dir: c.ContinuousProfilerDir,
Expand Down
4 changes: 4 additions & 0 deletions examples/tokenvm/controller/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (

type StateManager struct{}

func (*StateManager) HeightKey() []byte {
return storage.HeightKey()
}

func (*StateManager) IncomingWarpKey(sourceChainID ids.ID, msgID ids.ID) []byte {
return storage.IncomingWarpKeyPrefix(sourceChainID, msgID)
}
Expand Down
Loading

0 comments on commit 52f5a9c

Please sign in to comment.