Skip to content

Commit

Permalink
Merge pull request #18 from roysc/add-tracker
Browse files Browse the repository at this point in the history
Add tracker package
  • Loading branch information
roysc authored Sep 6, 2022
2 parents 2508990 + d72e321 commit 3d57aa3
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 3 deletions.
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module github.com/vulcanize/go-eth-state-node-iterator

go 1.18

require github.com/ethereum/go-ethereum v1.10.23
require (
github.com/ethereum/go-ethereum v1.10.23
github.com/sirupsen/logrus v1.9.0
)

require (
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
Expand All @@ -24,7 +27,7 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
)

replace github.com/ethereum/go-ethereum v1.10.23 => github.com/vulcanize/go-ethereum v1.10.23-statediff-4.2.0-alpha
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,15 @@ github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSg
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI=
Expand Down Expand Up @@ -169,8 +172,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -205,5 +209,6 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
243 changes: 243 additions & 0 deletions tracker/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package tracker

import (
"bytes"
"context"
"encoding/csv"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie"
log "github.com/sirupsen/logrus"

iter "github.com/vulcanize/go-eth-state-node-iterator"
)

type Tracker struct {
recoveryFile string

startChan chan *Iterator
stopChan chan *Iterator
started map[*Iterator]struct{}
stopped []*Iterator
running bool
}

type Iterator struct {
trie.NodeIterator
tracker *Tracker

SeekedPath []byte // latest path seeked from the tracked iterator
EndPath []byte // endPath for the tracked iterator
}

func New(file string, buf uint) Tracker {
return Tracker{
recoveryFile: file,
startChan: make(chan *Iterator, buf),
stopChan: make(chan *Iterator, buf),
started: map[*Iterator]struct{}{},
running: true,
}
}

func (tr *Tracker) CaptureSignal(cancelCtx context.CancelFunc) {
sigChan := make(chan os.Signal, 1)

signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
log.Errorf("Signal received (%v), stopping", sig)
// cancel context on receiving a signal
// on ctx cancellation, all the iterators complete processing of their current node before stopping
cancelCtx()
}()
}

// Wraps an iterator in a Iterator. This should not be called once halts are possible.
func (tr *Tracker) Tracked(it trie.NodeIterator, recoveredPath []byte) (ret *Iterator) {
// create seeked path of max capacity (65)
iterSeekedPath := make([]byte, 0, 65)
// intially populate seeked path with the recovered path
// to be used in trie traversal
if recoveredPath != nil {
iterSeekedPath = append(iterSeekedPath, recoveredPath...)
}

// if the iterator being tracked is a PrefixBoundIterator, capture it's end path
// to be used in trie traversal
var endPath []byte
if boundedIter, ok := it.(*iter.PrefixBoundIterator); ok {
endPath = boundedIter.EndPath
}

ret = &Iterator{it, tr, iterSeekedPath, endPath}
tr.startChan <- ret
return
}

// explicitly stops an iterator
func (tr *Tracker) StopIterator(it *Iterator) {
tr.stopChan <- it
}

// dumps iterator path and bounds to a text file so it can be restored later
func (tr *Tracker) dump() error {
log.Debugf("Dumping recovery state to: %s", tr.recoveryFile)
var rows [][]string
for it := range tr.started {
var startPath []byte
var endPath []byte
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
// if the iterator being tracked is a PrefixBoundIterator,
// initialize start and end paths with its bounds
startPath = impl.StartPath
endPath = impl.EndPath
}

// if seeked path and iterator path are non-empty, use iterator's path as startpath
if !bytes.Equal(it.SeekedPath, []byte{}) && !bytes.Equal(it.Path(), []byte{}) {
startPath = it.Path()
}

rows = append(rows, []string{
fmt.Sprintf("%x", startPath),
fmt.Sprintf("%x", endPath),
fmt.Sprintf("%x", it.SeekedPath),
})
}

file, err := os.Create(tr.recoveryFile)
if err != nil {
return err
}
defer file.Close()
out := csv.NewWriter(file)

return out.WriteAll(rows)
}

// attempts to read iterator state from file
// if file doesn't exist, returns an empty slice with no error
func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) {
file, err := os.Open(tr.recoveryFile)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
log.Debugf("Restoring recovery state from: %s", tr.recoveryFile)

defer file.Close()
in := csv.NewReader(file)
in.FieldsPerRecord = 3
rows, err := in.ReadAll()
if err != nil {
return nil, err
}

var ret []trie.NodeIterator
for _, row := range rows {
// pick up where each interval left off
var startPath []byte
var endPath []byte
var recoveredPath []byte

if len(row[0]) != 0 {
if _, err = fmt.Sscanf(row[0], "%x", &startPath); err != nil {
return nil, err
}
}
if len(row[1]) != 0 {
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
return nil, err
}
}
if len(row[2]) != 0 {
if _, err = fmt.Sscanf(row[2], "%x", &recoveredPath); err != nil {
return nil, err
}
}

// force the lower bound path to an even length (required by geth API/HexToKeyBytes)
if len(startPath)&0b1 == 1 {
// decrement first to avoid skipped nodes
decrementPath(startPath)
startPath = append(startPath, 0)
}
it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath)
ret = append(ret, tr.Tracked(it, recoveredPath))
}

log.Debugf("Restored %d iterators", len(ret))
return ret, nil
}

func (tr *Tracker) HaltAndDump() error {
tr.running = false

// drain any pending iterators
close(tr.startChan)
for start := range tr.startChan {
tr.started[start] = struct{}{}
}
close(tr.stopChan)
for stop := range tr.stopChan {
tr.stopped = append(tr.stopped, stop)
}

for _, stop := range tr.stopped {
delete(tr.started, stop)
}

if len(tr.started) == 0 {
// if the tracker state is empty, erase any existing recovery file
err := os.Remove(tr.recoveryFile)
if os.IsNotExist(err) {
err = nil
}
return err
}

return tr.dump()
}

func (it *Iterator) Next(descend bool) bool {
ret := it.NodeIterator.Next(descend)

if !ret {
if it.tracker.running {
it.tracker.stopChan <- it
} else {
log.Errorf("iterator stopped after tracker halted: path=%x", it.Path())
}
}
return ret
}

// Subtracts 1 from the last byte in a path slice, carrying if needed.
// Does nothing, returning false, for all-zero inputs.
func decrementPath(path []byte) bool {
// check for all zeros
allzero := true
for i := 0; i < len(path); i++ {
allzero = allzero && path[i] == 0
}
if allzero {
return false
}
for i := len(path) - 1; i >= 0; i-- {
val := path[i]
path[i]--
if val == 0 {
path[i] = 0xf
} else {
return true
}
}
return true
}

0 comments on commit 3d57aa3

Please sign in to comment.