Skip to content

Commit

Permalink
Use import/export API of iavl to create snapshots (#764)
Browse files Browse the repository at this point in the history
  • Loading branch information
sidenaio authored Aug 9, 2021
1 parent d87d231 commit 1d2bddb
Show file tree
Hide file tree
Showing 11 changed files with 1,498 additions and 978 deletions.
5 changes: 3 additions & 2 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2121,7 +2121,7 @@ func (chain *Blockchain) EnsureIntegrity() error {
wasReset = true
resetTo := uint64(0)
for h, tryCnt := chain.Head.Height()-1, 0; h >= 1 && tryCnt < int(state.MaxSavedStatesCount)+1; h, tryCnt = h-1, tryCnt+1 {
if chain.appState.IdentityState.HasVersion(h) {
if chain.appState.State.HasVersion(h) && chain.appState.IdentityState.HasVersion(h) {
resetTo = h
break
}
Expand Down Expand Up @@ -2233,12 +2233,13 @@ func (chain *Blockchain) GetIdentityDiff(height uint64) *state.IdentityStateDiff
}

func (chain *Blockchain) ReadSnapshotManifest() *snapshot.Manifest {
cid, root, height, _ := chain.repo.LastSnapshotManifest()
cid, cidV2, root, height, _ := chain.repo.LastSnapshotManifest()
if cid == nil {
return nil
}
return &snapshot.Manifest{
Cid: cid,
CidV2: cidV2,
Root: root,
Height: height,
}
Expand Down
107 changes: 80 additions & 27 deletions core/state/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
SnapshotsFolder = "/snapshots"
)

type SnapshotVersion byte

const SnapshotVersionV1 = SnapshotVersion(1)
const SnapshotVersionV2 = SnapshotVersion(2)

var (
InvalidManifestPrefix = []byte("im")
MaxManifestTimeouts = byte(5)
Expand Down Expand Up @@ -60,13 +65,13 @@ func NewSnapshotManager(db dbm.DB, state *StateDB, bus eventbus.Bus, ipfs ipfs.P
return m
}

func createSnapshotFile(datadir string, height uint64) (fileName string, file *os.File, err error) {
func createSnapshotFile(datadir string, height uint64, version SnapshotVersion) (fileName string, file *os.File, err error) {
newpath := filepath.Join(datadir, SnapshotsFolder)
if err := os.MkdirAll(newpath, os.ModePerm); err != nil {
return "", nil, err
}

filePath := filepath.Join(newpath, strconv.FormatUint(height, 10)+".tar")
filePath := filepath.Join(newpath, strconv.FormatUint(height, 10)+"."+strconv.FormatInt(int64(version), 10)+".tar")
f, err := os.Create(filePath)
if err != nil {
return "", nil, err
Expand All @@ -83,25 +88,35 @@ func (m *SnapshotManager) createSnapshotIfNeeded(block *types.Header) {
}
}

func (m *SnapshotManager) createSnapshot(height uint64) (root common.Hash) {
filePath, file, err := createSnapshotFile(m.cfg.DataDir, height)
func (m *SnapshotManager) createShapshotForVersion(height uint64, version SnapshotVersion) (cidBytes []byte, root common.Hash, filePath string) {
filePath, file, err := createSnapshotFile(m.cfg.DataDir, height, version)
if err != nil {
m.log.Error("Cannot create file for snapshot", "err", err)
return common.Hash{}
return nil, common.Hash{}, ""
}

if root, err = m.state.WriteSnapshot(height, file); err != nil {
file.Close()
m.log.Error("Cannot write snapshot to file", "err", err)
return common.Hash{}
switch version {
case SnapshotVersionV1:
if root, err = m.state.WriteSnapshot(height, file); err != nil {
file.Close()
m.log.Error("Cannot write snapshot to file", "err", err)
return nil, common.Hash{}, ""
}
case SnapshotVersionV2:
if root, err = m.state.WriteSnapshot2(height, file); err != nil {
file.Close()
m.log.Error("Cannot write snapshot to file", "err", err)
return nil, common.Hash{}, ""
}
}

file.Close()
var f *os.File
var cid cid.Cid
if f, err = os.Open(filePath); err != nil {
m.log.Error("Cannot open snapshot file", "err", err)
os.Remove(filePath)
return
return nil, common.Hash{}, ""
}
stat, _ := f.Stat()
if cid, err = m.ipfs.AddFile(f.Name(), f, stat); err != nil {
Expand All @@ -110,27 +125,47 @@ func (m *SnapshotManager) createSnapshot(height uint64) (root common.Hash) {
if err = os.Remove(filePath); err != nil {
m.log.Error("Cannot remove file", "err", err)
}
return
return nil, common.Hash{}, ""
}
return cid.Bytes(), root, filePath
}

func (m *SnapshotManager) createSnapshot(height uint64) (root common.Hash) {

cidV1, root, filePath := m.createShapshotForVersion(height, SnapshotVersionV1)
cidV2, _, filePath2 := m.createShapshotForVersion(height, SnapshotVersionV2)

if cidV1 != nil && cidV2 != nil {
m.clearFs([]string{filePath, filePath2})
m.writeLastManifest(cidV1, cidV2, root, height, filePath, filePath2)
}
m.clearFs(filePath)
m.writeLastManifest(cid.Bytes(), root, height, filePath)
return root
}

func (m *SnapshotManager) clearFs(excludedFile string) {
if prevCid, _, _, _ := m.repo.LastSnapshotManifest(); prevCid != nil {
func (m *SnapshotManager) clearFs(excludedFiles []string) {
if prevCid, prevCidV2, _, _, _ := m.repo.LastSnapshotManifest(); prevCid != nil {
m.ipfs.Unpin(prevCid)
m.ipfs.Unpin(prevCidV2)
}
m.clearSnapshotFolder(excludedFile)
m.clearSnapshotFolder(excludedFiles)
}

func (m *SnapshotManager) clearSnapshotFolder(excludedFile string) {
func (m *SnapshotManager) clearSnapshotFolder(excludedFiles []string) {
directory := filepath.Join(m.cfg.DataDir, SnapshotsFolder)

var files []string

var contains = func(arr []string, value string) bool {
for _, s := range arr {
if s == value {
return true
}
}
return false
}

err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() && path != excludedFile {
if !info.IsDir() && !contains(excludedFiles, path) {
files = append(files, path)
}
return nil
Expand All @@ -147,14 +182,19 @@ func (m *SnapshotManager) clearSnapshotFolder(excludedFile string) {
}
}

func (m *SnapshotManager) writeLastManifest(snapshotCid []byte, root common.Hash, height uint64, file string) {
m.repo.WriteLastSnapshotManifest(snapshotCid, root, height, file)
func (m *SnapshotManager) writeLastManifest(snapshotCid []byte, snapshotCidV2 []byte, root common.Hash, height uint64, file string, fileV2 string) {
m.repo.WriteLastSnapshotManifest(snapshotCid, snapshotCidV2, root, height, file, fileV2)
}

func (m *SnapshotManager) DownloadSnapshot(snapshot *snapshot.Manifest) (filePath string, err error) {
filePath, file, err := createSnapshotFile(m.cfg.DataDir, snapshot.Height)
func (m *SnapshotManager) DownloadSnapshot(snapshot *snapshot.Manifest) (filePath string, version SnapshotVersion, err error) {
version = SnapshotVersionV2
if len(snapshot.CidV2) == 0 {
version = SnapshotVersionV1
}

filePath, file, err := createSnapshotFile(m.cfg.DataDir, snapshot.Height, version)
if err != nil {
return "", err
return "", 0, err
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -179,7 +219,13 @@ func (m *SnapshotManager) DownloadSnapshot(snapshot *snapshot.Manifest) (filePat
done := make(chan struct{})

go func() {
loadToErr = m.ipfs.LoadTo(snapshot.Cid, file, ctx, onLoading)
switch version {
case SnapshotVersionV1:
loadToErr = m.ipfs.LoadTo(snapshot.Cid, file, ctx, onLoading)
case SnapshotVersionV2:
loadToErr = m.ipfs.LoadTo(snapshot.CidV2, file, ctx, onLoading)
}

wg.Done()
close(done)
}()
Expand All @@ -205,11 +251,18 @@ func (m *SnapshotManager) DownloadSnapshot(snapshot *snapshot.Manifest) (filePat
wg.Wait()

if loadToErr == nil {
m.clearFs(filePath)
m.writeLastManifest(snapshot.Cid, snapshot.Root, snapshot.Height, filePath)
m.clearFs([]string{filePath})
var filePath1, filePath2 string
if version == SnapshotVersionV1 {
filePath1 = filePath
}
if version == SnapshotVersionV2 {
filePath2 = filePath
}
m.writeLastManifest(snapshot.Cid, snapshot.CidV2, snapshot.Root, snapshot.Height, filePath1, filePath2)
}

return filePath, loadToErr
return filePath, version, loadToErr
}

func (m *SnapshotManager) StartSync() {
Expand Down
3 changes: 3 additions & 0 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ type Manifest struct {
Root common.Hash
Height uint64
Cid []byte
CidV2 []byte
}

func (m *Manifest) ToBytes() ([]byte, error) {
protoObj := &models.ProtoManifest{
Cid: m.Cid,
Height: m.Height,
Root: m.Root[:],
CidV2: m.CidV2,
}
return proto.Marshal(protoObj)
}
Expand All @@ -29,5 +31,6 @@ func (m *Manifest) FromBytes(data []byte) error {
m.Root = common.BytesToHash(protoObj.Root)
m.Height = protoObj.Height
m.Cid = protoObj.Cid
m.CidV2 = protoObj.CidV2
return nil
}
16 changes: 16 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,18 @@ func (s *StateDB) IterateOverAccounts(callback func(addr common.Address, account
})
}


func (s *StateDB) WriteSnapshot2(height uint64, to io.Writer) (root common.Hash, err error) {
return WriteTreeTo2(s.db, height, to)
}

func (s *StateDB) RecoverSnapshot2(height uint64, treeRoot common.Hash, from io.Reader) error {
pdb := dbm.NewPrefixDB(s.original, StateDbKeys.BuildDbPrefix(height))
common.ClearDb(pdb)
return ReadTreeFrom2(pdb, height, treeRoot, from)
}


func (s *StateDB) WriteSnapshot(height uint64, to io.Writer) (root common.Hash, err error) {
return WriteTreeTo(s.db, height, to)
}
Expand Down Expand Up @@ -1572,6 +1584,10 @@ func (s *StateDB) RemoveDelayedOfflinePenalty(addr common.Address) {
s.GetOrNewDelayedOfflinePenaltyObject().Remove(addr)
}

func (s *StateDB) HasVersion(h uint64) bool {
return s.tree.ExistVersion(int64(h))
}

type readCloser struct {
r io.Reader
}
Expand Down
90 changes: 90 additions & 0 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,96 @@ func TestStateDB_RecoverSnapshot(t *testing.T) {
defer it.Close()
require.False(t, it.Valid())
}

func TestStateDB_RecoverSnapshot2(t *testing.T) {
//arrange
database := db.NewMemDB()
stateDb, _ := NewLazy(database)

prevStateDb := stateDb.db

identity := common.Address{}
stateDb.AddInvite(identity, 1)

stateDb.Commit(true)
const AddrsCount = 50000
const Height = uint64(2)

for i := 0; i < AddrsCount; i++ {
addr := common.Address{}
addr.SetBytes(common.ToBytes(uint64(i)))
stateDb.SetNonce(addr, uint32(i+1))
}

stateDb.Commit(true)

var keys [][]byte
var values [][]byte

stateDb.IterateAccounts(func(key []byte, value []byte) bool {
keys = append(keys, key)
values = append(values, value)
return false
})

require.Equal(t, AddrsCount, len(keys))

expectedRoot := stateDb.Root()
stateDb.AddInvite(common.Address{}, 2)

stateDb.Commit(true)
stateDb, _ = NewLazy(database)
stateDb.Load(3)
stateDb.tree.Hash()

//act

buffer := new(bytes.Buffer)
stateDb.WriteSnapshot2(Height, buffer)
require.True(t, buffer.Len() > 0)

println(buffer.Len())

require.Nil(t, stateDb.RecoverSnapshot2(Height, expectedRoot, buffer))

batch := stateDb.original.NewBatch()

dropDb := stateDb.CommitSnapshot(Height, batch)
common.ClearDb(dropDb)
batch.WriteSync()
//assert

require.Equal(t, int64(Height), stateDb.tree.Version())
require.Equal(t, expectedRoot, stateDb.Root())

i := 0
stateDb.IterateAccounts(func(key []byte, value []byte) bool {

require.Equal(t, keys[i], key)
require.Equal(t, values[i], value)
i++
return false
})

require.Equal(t, AddrsCount, i)

cnt := 0

stateDb.IterateIdentities(func(key []byte, value []byte) bool {
addr := common.Address{}
addr.SetBytes(key[1:])
require.Equal(t, addr, identity)
cnt++
return false
})
require.Equal(t, 1, cnt)

it, _ := prevStateDb.Iterator(nil, nil)
defer it.Close()
require.False(t, it.Valid())
}


func TestStateDB_Set_Has_ValidationTxBit(t *testing.T) {
database := db.NewMemDB()
stateDb, _ := NewLazy(database)
Expand Down
8 changes: 8 additions & 0 deletions core/state/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (t *MutableTree) LazyLoad(version int64) (int64, error) {
return t.tree.LazyLoadVersion(version)
}

func (t *MutableTree) Importer(version int64) (*iavl.Importer, error) {
return t.tree.Import(version)
}

type ImmutableTree struct {
tree *iavl.ImmutableTree
}
Expand Down Expand Up @@ -263,3 +267,7 @@ func (t *ImmutableTree) Rollback() {
func (t *ImmutableTree) SetVirtualVersion(version int64) {
t.tree.SetVirtualVersion(version)
}

func (t *ImmutableTree) Exporter() *iavl.Exporter{
return t.tree.Export()
}
Loading

0 comments on commit 1d2bddb

Please sign in to comment.