Skip to content

Commit

Permalink
Merge branch 'master' into alex/consensus-sealing-halt-on-exec-fork
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Hentschel authored Dec 3, 2020
2 parents 3a9d171 + 9e0f71e commit 3ded2bf
Show file tree
Hide file tree
Showing 32 changed files with 816 additions and 531 deletions.
68 changes: 68 additions & 0 deletions cmd/bootstrap/cmd/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package cmd

import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
)

func copyDir(src string, dst string) error {
var err error
var fds []os.FileInfo
var srcinfo os.FileInfo

if srcinfo, err = os.Stat(src); err != nil {
return err
}

if err = os.MkdirAll(dst, srcinfo.Mode()); err != nil {
return err
}

if fds, err = ioutil.ReadDir(src); err != nil {
return err
}
for _, fd := range fds {
srcfp := path.Join(src, fd.Name())
dstfp := path.Join(dst, fd.Name())

if fd.IsDir() {
if err = copyDir(srcfp, dstfp); err != nil {
fmt.Println(err)
}
} else {
if err = copyFile(srcfp, dstfp); err != nil {
fmt.Println(err)
}
}
}
return nil
}

// CopyFile copies a single file from src to dst
func copyFile(src, dst string) error {
var err error
var srcfd *os.File
var dstfd *os.File
var srcinfo os.FileInfo

if srcfd, err = os.Open(src); err != nil {
return err
}
defer srcfd.Close()

if dstfd, err = os.Create(dst); err != nil {
return err
}
defer dstfd.Close()

if _, err = io.Copy(dstfd, srcfd); err != nil {
return err
}
if srcinfo, err = os.Stat(src); err != nil {
return err
}
return os.Chmod(dst, srcinfo.Mode())
}
10 changes: 9 additions & 1 deletion cmd/bootstrap/cmd/finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"strings"
"time"

"github.com/onflow/cadence"
"github.com/spf13/cobra"

"github.com/onflow/cadence"

"github.com/onflow/flow-go/cmd/bootstrap/run"
model "github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/encodable"
Expand Down Expand Up @@ -169,6 +170,13 @@ func finalize(cmd *cobra.Command, args []string) {
constructRootResultAndSeal(flagRootCommit, block, stakingNodes, assignments, clusterQCs, dkgData)
log.Info().Msg("")

log.Info().Msg("copying internal private keys to output folder")
err := copyDir(flagInternalNodePrivInfoDir, filepath.Join(flagOutdir, model.DirPrivateRoot))
if err != nil {
log.Error().Err(err).Msg("could not copy private key files")
}
log.Info().Msg("")

// print count of all nodes
roleCounts := nodeCountByRole(stakingNodes)
for role, count := range roleCounts {
Expand Down
24 changes: 18 additions & 6 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,25 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() {
myAddr = fnb.BaseConfig.bindAddr
}

mw, err := p2p.NewMiddleware(fnb.Logger.Level(zerolog.ErrorLevel), codec, myAddr, fnb.Me.NodeID(),
fnb.networkKey, fnb.Metrics.Network, p2p.DefaultMaxUnicastMsgSize, p2p.DefaultMaxPubSubMsgSize,
libP2PNodeFactory, err := p2p.DefaultLibP2PNodeFactory(fnb.Logger.Level(zerolog.ErrorLevel),
fnb.Me.NodeID(),
myAddr,
fnb.networkKey,
fnb.RootBlock.ID().String(),
fnb.MsgValidators...)
p2p.DefaultMaxPubSubMsgSize,
fnb.Metrics.Network)
if err != nil {
return nil, fmt.Errorf("could not initialize middleware: %w", err)
return nil, fmt.Errorf("could not generate libp2p node factory: %w", err)
}
fnb.Middleware = mw

fnb.Middleware = p2p.NewMiddleware(fnb.Logger.Level(zerolog.ErrorLevel),
libP2PNodeFactory,
fnb.Me.NodeID(),
fnb.Metrics.Network,
p2p.DefaultMaxUnicastMsgSize,
p2p.DefaultMaxPubSubMsgSize,
fnb.RootBlock.ID().String(),
fnb.MsgValidators...)

participants, err := fnb.State.Final().Identities(p2p.NetworkingSetFilter)
if err != nil {
Expand Down Expand Up @@ -543,9 +554,10 @@ func (fnb *FlowNodeBuilder) initState() {
}

func (fnb *FlowNodeBuilder) initFvmOptions() {
blockFinder := fvm.NewBlockFinder(fnb.Storage.Headers)
vmOpts := []fvm.Option{
fvm.WithChain(fnb.RootChainID.Chain()),
fvm.WithBlocks(fnb.Storage.Blocks),
fvm.WithBlocks(blockFinder),
}
if fnb.RootChainID == flow.Testnet {
vmOpts = append(vmOpts,
Expand Down
19 changes: 13 additions & 6 deletions cmd/util/cmd/exec-data-json-export/block_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type blockSummary struct {
}

// ExportBlocks exports blocks (note this only export blocks of the main chain and doesn't export forks)
func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) error {
func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) (flow.StateCommitment, error) {

// traverse backward from the given block (parent block) and fetch by blockHash
db := common.InitStorage(dbPath)
Expand All @@ -44,13 +44,14 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err
seals := badger.NewSeals(cacheMetrics, db)
payloads := badger.NewPayloads(db, index, guarantees, seals)
blocks := badger.NewBlocks(db, headers, payloads)
commits := badger.NewCommits(&metrics.NoopCollector{}, db)

activeBlockID := blockID
outputFile := filepath.Join(outputPath, "blocks.jsonl")

fi, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("could not create block output file %w", err)
return nil, fmt.Errorf("could not create block output file %w", err)
}
defer fi.Close()

Expand All @@ -61,13 +62,13 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err
header, err := headers.ByBlockID(activeBlockID)
if err != nil {
// no more header is available
return nil
break
}

block, err := blocks.ByID(activeBlockID)
if err != nil {
// log.Fatal().Err(err).Msg("could not load block")
return nil
break
}

cols := make([]string, 0)
Expand Down Expand Up @@ -104,14 +105,20 @@ func ExportBlocks(blockID flow.Identifier, dbPath string, outputPath string) err

jsonData, err := json.Marshal(b)
if err != nil {
return fmt.Errorf("could not create a json obj for a block: %w", err)
return nil, fmt.Errorf("could not create a json obj for a block: %w", err)
}
_, err = blockWriter.WriteString(string(jsonData) + "\n")
if err != nil {
return fmt.Errorf("could not write block json to the file: %w", err)
return nil, fmt.Errorf("could not write block json to the file: %w", err)
}
blockWriter.Flush()

activeBlockID = header.ParentID
}

state, err := commits.ByBlockID(blockID)
if err != nil {
return nil, fmt.Errorf("could not find state commitment for this block: %w", err)
}
return state, nil
}
16 changes: 14 additions & 2 deletions cmd/util/cmd/exec-data-json-export/cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package jsonexporter

import (
"encoding/hex"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

Expand All @@ -12,6 +14,7 @@ var (
flagOutputDir string
flagBlockHash string
flagDatadir string
flagStateCommitment string
)

var Cmd = &cobra.Command{
Expand All @@ -36,6 +39,9 @@ func init() {
Cmd.Flags().StringVar(&flagDatadir, "datadir", "",
"directory that stores the protocol state")
_ = Cmd.MarkFlagRequired("datadir")

Cmd.Flags().StringVar(&flagStateCommitment, "state-commitment", "",
"state commitment (hex-encoded, 64 characters)")
}

func run(*cobra.Command, []string) {
Expand All @@ -46,7 +52,7 @@ func run(*cobra.Command, []string) {
}

log.Info().Msg("start exporting blocks")
err = ExportBlocks(blockID, flagDatadir, flagOutputDir)
fallbackState, err := ExportBlocks(blockID, flagDatadir, flagOutputDir)
if err != nil {
log.Fatal().Err(err).Msg("cannot get export blocks")
}
Expand All @@ -70,7 +76,13 @@ func run(*cobra.Command, []string) {
}

log.Info().Msg("start exporting ledger")
err = ExportLedger(blockID, flagDatadir, flagExecutionStateDir, flagOutputDir)
// if state commitment not provided do the fall back to the one connected to the block
if len(flagStateCommitment) == 0 {
flagStateCommitment = hex.EncodeToString(fallbackState)
log.Info().Msg("no state commitment is provided, falling back to the one attached to the block")
}

err = ExportLedger(flagExecutionStateDir, flagStateCommitment, flagOutputDir)
if err != nil {
log.Fatal().Err(err).Msg("cannot get export ledger")
}
Expand Down
119 changes: 8 additions & 111 deletions cmd/util/cmd/exec-data-json-export/ledger_exporter.go
Original file line number Diff line number Diff line change
@@ -1,133 +1,30 @@
package jsonexporter

import (
"bufio"
"bytes"
"encoding/hex"
"errors"
"fmt"
"os"
"path/filepath"
"time"

"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage/badger"
)

// ExportLedger exports ledger key value pairs at the given blockID
func ExportLedger(blockID flow.Identifier, dbPath string, ledgerPath string, outputPath string) error {
db := common.InitStorage(dbPath)
defer db.Close()
func ExportLedger(ledgerPath string, targetstate string, outputPath string) error {

cache := &metrics.NoopCollector{}
commits := badger.NewCommits(cache, db)

targetHash, err := commits.ByBlockID(blockID)
led, err := complete.NewLedger(ledgerPath, complete.DefaultCacheSize, &metrics.NoopCollector{}, log.Logger, nil, 0)
if err != nil {
return fmt.Errorf("cannot get state commitment for block: %w", err)
return fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err)
}

w, err := wal.NewWAL(nil, nil, ledgerPath, complete.DefaultCacheSize, pathfinder.PathByteSize, wal.SegmentSize)
state, err := hex.DecodeString(targetstate)
if err != nil {
return fmt.Errorf("cannot create WAL: %w", err)
return fmt.Errorf("failed to decode hex code of state: %w", err)
}
defer func() {
_ = w.Close()
}()

// TODO port this to use new forest
forest, err := mtrie.NewForest(pathfinder.PathByteSize, outputPath, complete.DefaultCacheSize, &metrics.NoopCollector{}, nil)
err = led.DumpTrieAsJSON(ledger.State(state), outputPath)
if err != nil {
return fmt.Errorf("cannot create mForest: %w", err)
return fmt.Errorf("cannot dump trie as json: %w", err)
}

i := 0
valuesSize := 0
valuesCount := 0
startTime := time.Now()
found := false
FoundHashError := fmt.Errorf("found hash %s", targetHash)

err = w.ReplayLogsOnly(
func(forestSequencing *flattener.FlattenedForest) error {
rebuiltTries, err := flattener.RebuildTries(forestSequencing)
if err != nil {
return fmt.Errorf("rebuilding forest from sequenced nodes failed: %w", err)
}
err = forest.AddTries(rebuiltTries)
if err != nil {
return fmt.Errorf("adding rebuilt tries to forest failed: %w", err)
}
return nil
},
func(update *ledger.TrieUpdate) error {

newTrieHash, err := forest.Update(update)

for _, value := range update.Payloads {
valuesSize += len(value.Value)
}

valuesCount += len(update.Payloads)

if err != nil {
return fmt.Errorf("error while updating mForest: %w", err)
}

if bytes.Equal(targetHash, newTrieHash) {
found = true
return FoundHashError
}

i++
if i%1000 == 0 {
log.Info().Int("values_count", valuesCount).Int("values_size_bytes", valuesSize).Int("updates_count", i).Msg("progress")
}

return err
},
func(commitment ledger.RootHash) error {
return nil
})

duration := time.Since(startTime)

if !errors.Is(err, FoundHashError) {
return fmt.Errorf("error while processing WAL: %w", err)
}

if !found {
return fmt.Errorf("no value found: %w", err)
}

log.Info().Int("values_count", valuesCount).Int("values_size_bytes", valuesSize).Int("updates_count", i).Float64("total_time_s", duration.Seconds()).Msg("finished seeking")
log.Info().Msg("writing root checkpoint")

trie, err := forest.GetTrie(targetHash)
if err != nil {
return fmt.Errorf("cannot get a trie with target hash: %w", err)
}

path := filepath.Join(outputPath, hex.EncodeToString(targetHash)+".trie.jsonl")

fi, err := os.Create(path)
if err != nil {
return err
}
defer fi.Close()

writer := bufio.NewWriter(fi)
defer writer.Flush()

return trie.DumpAsJSON(writer)
return nil
}
Loading

0 comments on commit 3ded2bf

Please sign in to comment.