Skip to content

Commit

Permalink
Ledger: improving ledger export cmd functionality (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramtinms authored Dec 3, 2020
1 parent 1aac521 commit 9e0f71e
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 119 deletions.
19 changes: 13 additions & 6 deletions cmd/util/cmd/exec-data-json-export/block_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type blockSummary struct {
}

// ExportBlocks exports blocks (note this only export blocks of the main chain and doesn't export forks)
func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) error {
func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) (flow.StateCommitment, error) {

// traverse backward from the given block (parent block) and fetch by blockHash
db := common.InitStorage(dbPath)
Expand All @@ -44,13 +44,14 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err
seals := badger.NewSeals(cacheMetrics, db)
payloads := badger.NewPayloads(db, index, guarantees, seals)
blocks := badger.NewBlocks(db, headers, payloads)
commits := badger.NewCommits(&metrics.NoopCollector{}, db)

activeBlockID := blockID
outputFile := filepath.Join(outputPath, "blocks.jsonl")

fi, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("could not create block output file %w", err)
return nil, fmt.Errorf("could not create block output file %w", err)
}
defer fi.Close()

Expand All @@ -61,13 +62,13 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err
header, err := headers.ByBlockID(activeBlockID)
if err != nil {
// no more header is available
return nil
break
}

block, err := blocks.ByID(activeBlockID)
if err != nil {
// log.Fatal().Err(err).Msg("could not load block")
return nil
break
}

cols := make([]string, 0)
Expand Down Expand Up @@ -104,14 +105,20 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err

jsonData, err := json.Marshal(b)
if err != nil {
return fmt.Errorf("could not create a json obj for a block: %w", err)
return nil, fmt.Errorf("could not create a json obj for a block: %w", err)
}
_, err = blockWriter.WriteString(string(jsonData) + "\n")
if err != nil {
return fmt.Errorf("could not write block json to the file: %w", err)
return nil, fmt.Errorf("could not write block json to the file: %w", err)
}
blockWriter.Flush()

activeBlockID = header.ParentID
}

state, err := commits.ByBlockID(blockID)
if err != nil {
return nil, fmt.Errorf("could not find state commitment for this block: %w", err)
}
return state, nil
}
16 changes: 14 additions & 2 deletions cmd/util/cmd/exec-data-json-export/cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package jsonexporter

import (
"encoding/hex"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

Expand All @@ -12,6 +14,7 @@ var (
flagOutputDir string
flagBlockHash string
flagDatadir string
flagStateCommitment string
)

var Cmd = &cobra.Command{
Expand All @@ -36,6 +39,9 @@ func init() {
Cmd.Flags().StringVar(&flagDatadir, "datadir", "",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("datadir")

Cmd.Flags().StringVar(&flagStateCommitment, "state-commitment", "",
"state commitment (hex-encoded, 64 characters)")
}

func run(*cobra.Command, []string) {
Expand All @@ -46,7 +52,7 @@ func run(*cobra.Command, []string) {
}

log.Info().Msg("start exporting blocks")
err = ExportBlocks(blockID, flagDatadir, flagOutputDir)
fallbackState, err := ExportBlocks(blockID, flagDatadir, flagOutputDir)
if err != nil {
log.Fatal().Err(err).Msg("cannot get export blocks")
}
Expand All @@ -70,7 +76,13 @@ func run(*cobra.Command, []string) {
}

log.Info().Msg("start exporting ledger")
err = ExportLedger(blockID, flagDatadir, flagExecutionStateDir, flagOutputDir)
// if state commitment not provided do the fall back to the one connected to the block
if len(flagStateCommitment) == 0 {
flagStateCommitment = hex.EncodeToString(fallbackState)
log.Info().Msg("no state commitment is provided, falling back to the one attached to the block")
}

err = ExportLedger(flagExecutionStateDir, flagStateCommitment, flagOutputDir)
if err != nil {
log.Fatal().Err(err).Msg("cannot get export ledger")
}
Expand Down
119 changes: 8 additions & 111 deletions cmd/util/cmd/exec-data-json-export/ledger_exporter.go
Original file line number Diff line number Diff line change
@@ -1,133 +1,30 @@
package jsonexporter

import (
"bufio"
"bytes"
"encoding/hex"
"errors"
"fmt"
"os"
"path/filepath"
"time"

"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage/badger"
)

// ExportLedger exports ledger key value pairs at the given blockID
func ExportLedger(blockID flow.Identifier, dbPath string, ledgerPath string, outputPath string) error {
db := common.InitStorage(dbPath)
defer db.Close()
func ExportLedger(ledgerPath string, targetstate string, outputPath string) error {

cache := &metrics.NoopCollector{}
commits := badger.NewCommits(cache, db)

targetHash, err := commits.ByBlockID(blockID)
led, err := complete.NewLedger(ledgerPath, complete.DefaultCacheSize, &metrics.NoopCollector{}, log.Logger, nil, 0)
if err != nil {
return fmt.Errorf("cannot get state commitment for block: %w", err)
return fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err)
}

w, err := wal.NewWAL(nil, nil, ledgerPath, complete.DefaultCacheSize, pathfinder.PathByteSize, wal.SegmentSize)
state, err := hex.DecodeString(targetstate)
if err != nil {
return fmt.Errorf("cannot create WAL: %w", err)
return fmt.Errorf("failed to decode hex code of state: %w", err)
}
defer func() {
_ = w.Close()
}()

// TODO port this to use new forest
forest, err := mtrie.NewForest(pathfinder.PathByteSize, outputPath, complete.DefaultCacheSize, &metrics.NoopCollector{}, nil)
err = led.DumpTrieAsJSON(ledger.State(state), outputPath)
if err != nil {
return fmt.Errorf("cannot create mForest: %w", err)
return fmt.Errorf("cannot dump trie as json: %w", err)
}

i := 0
valuesSize := 0
valuesCount := 0
startTime := time.Now()
found := false
FoundHashError := fmt.Errorf("found hash %s", targetHash)

err = w.ReplayLogsOnly(
func(forestSequencing *flattener.FlattenedForest) error {
rebuiltTries, err := flattener.RebuildTries(forestSequencing)
if err != nil {
return fmt.Errorf("rebuilding forest from sequenced nodes failed: %w", err)
}
err = forest.AddTries(rebuiltTries)
if err != nil {
return fmt.Errorf("adding rebuilt tries to forest failed: %w", err)
}
return nil
},
func(update *ledger.TrieUpdate) error {

newTrieHash, err := forest.Update(update)

for _, value := range update.Payloads {
valuesSize += len(value.Value)
}

valuesCount += len(update.Payloads)

if err != nil {
return fmt.Errorf("error while updating mForest: %w", err)
}

if bytes.Equal(targetHash, newTrieHash) {
found = true
return FoundHashError
}

i++
if i%1000 == 0 {
log.Info().Int("values_count", valuesCount).Int("values_size_bytes", valuesSize).Int("updates_count", i).Msg("progress")
}

return err
},
func(commitment ledger.RootHash) error {
return nil
})

duration := time.Since(startTime)

if !errors.Is(err, FoundHashError) {
return fmt.Errorf("error while processing WAL: %w", err)
}

if !found {
return fmt.Errorf("no value found: %w", err)
}

log.Info().Int("values_count", valuesCount).Int("values_size_bytes", valuesSize).Int("updates_count", i).Float64("total_time_s", duration.Seconds()).Msg("finished seeking")
log.Info().Msg("writing root checkpoint")

trie, err := forest.GetTrie(targetHash)
if err != nil {
return fmt.Errorf("cannot get a trie with target hash: %w", err)
}

path := filepath.Join(outputPath, hex.EncodeToString(targetHash)+".trie.jsonl")

fi, err := os.Create(path)
if err != nil {
return err
}
defer fi.Close()

writer := bufio.NewWriter(fi)
defer writer.Flush()

return trie.DumpAsJSON(writer)
return nil
}
26 changes: 26 additions & 0 deletions ledger/complete/ledger.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package complete

import (
"bufio"
"encoding/hex"
"fmt"
"os"
"path/filepath"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -336,3 +340,25 @@ func (l *Ledger) ExportCheckpointAt(state ledger.State,

return newTrie.RootHash(), nil
}

// DumpTrieAsJSON export trie at specific state as a jsonl file, each line is json encode of a payload
func (l *Ledger) DumpTrieAsJSON(state ledger.State, outputFilePath string) error {
fmt.Println(ledger.RootHash(state))
trie, err := l.forest.GetTrie(ledger.RootHash(state))
if err != nil {
return fmt.Errorf("cannot find the target trie: %w", err)
}

path := filepath.Join(outputFilePath, hex.EncodeToString(ledger.RootHash(state))+".trie.jsonl")

fi, err := os.Create(path)
if err != nil {
return err
}
defer fi.Close()

writer := bufio.NewWriter(fi)
defer writer.Flush()

return trie.DumpAsJSON(writer)
}

0 comments on commit 9e0f71e

Please sign in to comment.