Skip to content

Commit

Permalink
swap storage
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Nov 6, 2024
1 parent 564cb27 commit f26849b
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 219 deletions.
44 changes: 25 additions & 19 deletions api/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ type txTraceResult struct {
}

type DebugAPI struct {
store *pebble.Storage
logger zerolog.Logger
tracer storage.TraceIndexer
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
receipts storage.ReceiptIndexer
config *config.Config
collector metrics.Collector
store *pebble.Storage
registerStore *pebble.RegisterStorage
logger zerolog.Logger
tracer storage.TraceIndexer
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
receipts storage.ReceiptIndexer
config *config.Config
collector metrics.Collector
}

func NewDebugAPI(
store *pebble.Storage,
registerStore *pebble.RegisterStorage,
tracer storage.TraceIndexer,
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
Expand All @@ -59,14 +61,15 @@ func NewDebugAPI(
collector metrics.Collector,
) *DebugAPI {
return &DebugAPI{
store: store,
logger: logger,
tracer: tracer,
blocks: blocks,
transactions: transactions,
receipts: receipts,
config: config,
collector: collector,
store: store,
registerStore: registerStore,
logger: logger,
tracer: tracer,
blocks: blocks,
transactions: transactions,
receipts: receipts,
config: config,
collector: collector,
}
}

Expand Down Expand Up @@ -270,7 +273,6 @@ func (d *DebugAPI) TraceCall(
return nil, err
}

ledger := pebble.NewRegister(d.store, block.Height, nil)
blocksProvider := replayer.NewBlocksProvider(
d.blocks,
d.config.FlowNetworkID,
Expand All @@ -279,7 +281,7 @@ func (d *DebugAPI) TraceCall(
viewProvider := query.NewViewProvider(
d.config.FlowNetworkID,
flowEVM.StorageAccountAddress(d.config.FlowNetworkID),
ledger,
d.registerStore,
blocksProvider,
120_000_000,
)
Expand Down Expand Up @@ -331,7 +333,11 @@ func (d *DebugAPI) TraceCall(
}

func (d *DebugAPI) executorAtBlock(block *models.Block) (*evm.BlockExecutor, error) {
ledger := pebble.NewRegister(d.store, block.Height, d.store.NewBatch())
snapshot, err := d.registerStore.GetSnapshotAt(block.Height)
if err != nil {
return nil, fmt.Errorf("failed to get register snapshot at block height %d: %w", block.Height, err)
}
ledger := storage.NewRegisterDelta(snapshot)

return evm.NewBlockExecutor(
block,
Expand Down
44 changes: 40 additions & 4 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"math"
"time"

pebbleDB "github.com/cockroachdb/pebble"

"github.com/onflow/flow-go-sdk/access"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
Expand All @@ -32,6 +34,7 @@ import (

type Storages struct {
Storage *pebble.Storage
Registers *pebble.RegisterStorage
Blocks storage.BlockIndexer
Transactions storage.TransactionIndexer
Receipts storage.ReceiptIndexer
Expand Down Expand Up @@ -149,6 +152,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
subscriber,
blocksProvider,
b.storages.Storage,
b.storages.Registers,
b.storages.Blocks,
b.storages.Receipts,
b.storages.Transactions,
Expand Down Expand Up @@ -219,6 +223,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {

evm, err := requester.NewEVM(
b.storages.Storage,
b.storages.Registers,
blocksProvider,
b.client,
b.config,
Expand Down Expand Up @@ -281,6 +286,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {

debugAPI := api.NewDebugAPI(
b.storages.Storage,
b.storages.Registers,
b.storages.Traces,
b.storages.Blocks,
b.storages.Transactions,
Expand Down Expand Up @@ -464,6 +470,8 @@ func setupStorage(
}

blocks := pebble.NewBlocks(store, config.FlowNetworkID)
storageAddress := evm.StorageAccountAddress(config.FlowNetworkID)
registerStore := pebble.NewRegisterStorage(store, storageAddress)

// hard set the start cadence height, this is used when force reindexing
if config.ForceStartCadenceHeight != 0 {
Expand All @@ -475,22 +483,40 @@ func setupStorage(

// if database is not initialized require init height
if _, err := blocks.LatestCadenceHeight(); errors.Is(err, errs.ErrStorageNotInitialized) {
batch := store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
// we don't know what went wrong, so this is fatal
logger.Fatal().Err(err).Msg("failed to close batch")
}
}(batch)

cadenceHeight := config.InitCadenceHeight
cadenceBlock, err := client.GetBlockHeaderByHeight(context.Background(), cadenceHeight)
if err != nil {
return nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
}

storageProvider := pebble.NewRegister(store, 0, nil)
storageAddress := evm.StorageAccountAddress(config.FlowNetworkID)
snapshot, err := registerStore.GetSnapshotAt(0)
if err != nil {
return nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
}

delta := storage.NewRegisterDelta(snapshot)
accountStatus := environment.NewAccountStatus()
err = storageProvider.SetValue(
err = delta.SetValue(
storageAddress[:],
[]byte(flowGo.AccountStatusKey),
accountStatus.ToBytes(),
)
if err != nil {
return nil, fmt.Errorf("could not initialize state index: %w", err)
return nil, fmt.Errorf("could not set account status: %w", err)
}

err = registerStore.Store(delta.GetUpdates(), cadenceHeight, batch)
if err != nil {
return nil, fmt.Errorf("could not store register updates: %w", err)
}

if err := blocks.InitHeights(cadenceHeight, cadenceBlock.ID); err != nil {
Expand All @@ -501,12 +527,22 @@ func setupStorage(
err,
)
}

err = batch.Commit(pebbleDB.Sync)
if err != nil {
return nil, fmt.Errorf("could not commit register updates: %w", err)
}

logger.Info().Msgf("database initialized with cadence height: %d", cadenceHeight)
}
//else {
// // TODO(JanezP): verify storage account owner is correct
//}

return &Storages{
Storage: store,
Blocks: blocks,
Registers: registerStore,
Transactions: pebble.NewTransactions(store),
Receipts: pebble.NewReceipts(store),
Accounts: pebble.NewAccounts(store),
Expand Down
34 changes: 22 additions & 12 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

flowGo "github.com/onflow/flow-go/model/flow"

pebbleDB "github.com/cockroachdb/pebble"
"github.com/onflow/flow-go-sdk"
gethTypes "github.com/onflow/go-ethereum/core/types"
Expand Down Expand Up @@ -40,6 +42,7 @@ type Engine struct {
subscriber EventSubscriber
blocksProvider *replayer.BlocksProvider
store *pebble.Storage
registerStore *pebble.RegisterStorage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
transactions storage.TransactionIndexer
Expand All @@ -57,6 +60,7 @@ func NewEventIngestionEngine(
subscriber EventSubscriber,
blocksProvider *replayer.BlocksProvider,
store *pebble.Storage,
registerStore *pebble.RegisterStorage,
blocks storage.BlockIndexer,
receipts storage.ReceiptIndexer,
transactions storage.TransactionIndexer,
Expand All @@ -76,6 +80,7 @@ func NewEventIngestionEngine(
subscriber: subscriber,
blocksProvider: blocksProvider,
store: store,
registerStore: registerStore,
blocks: blocks,
receipts: receipts,
transactions: transactions,
Expand Down Expand Up @@ -173,15 +178,11 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return err
}

storageProvider := pebble.NewRegister(
e.store,
events.Block().Height,
batch,
)
blockEvents := events.BlockEventPayload()
cr := sync.NewReplayer(
e.replayerConfig.ChainID,
e.replayerConfig.RootAddr,
storageProvider,
e.registerStore,
e.blocksProvider,
e.log,
e.replayerConfig.CallTracerCollector.TxTracer(),
Expand All @@ -190,19 +191,17 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {

// Step 1.2: Replay all block transactions
// If `ReplayBlock` returns any error, we abort the EVM events processing
res, err := cr.ReplayBlock(events.TxEventPayloads(), events.BlockEventPayload())
res, err := cr.ReplayBlock(events.TxEventPayloads(), blockEvents)
if err != nil {
return fmt.Errorf("failed to replay block on height: %d, with: %w", events.Block().Height, err)
}

// Step 2: Write all the necessary changes to each storage

// Step 2.1: Write all the EVM state changes to `StorageProvider`
for k, v := range res.StorageRegisterUpdates() {
err = storageProvider.SetValue([]byte(k.Owner), []byte(k.Key), v)
if err != nil {
return fmt.Errorf("failed to commit state changes on block: %d", events.Block().Height)
}
err = e.registerStore.Store(registerEntriesFromKeyValue(res.StorageRegisterUpdates()), blockEvents.Height, batch)
if err != nil {
return fmt.Errorf("failed to store state changes on block: %d", events.Block().Height)
}

// Step 2.2: Write the latest EVM block to `Blocks` storage
Expand Down Expand Up @@ -341,3 +340,14 @@ func (e *Engine) indexReceipts(

return nil
}

func registerEntriesFromKeyValue(keyValue map[flowGo.RegisterID]flowGo.RegisterValue) []flowGo.RegisterEntry {
entries := make([]flowGo.RegisterEntry, 0, len(keyValue))
for k, v := range keyValue {
entries = append(entries, flowGo.RegisterEntry{
Key: k,
Value: v,
})
}
return entries
}
Loading

0 comments on commit f26849b

Please sign in to comment.