Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use import/export API of iavl to create snapshots #764

Merged
merged 4 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2121,7 +2121,7 @@ func (chain *Blockchain) EnsureIntegrity() error {
wasReset = true
resetTo := uint64(0)
for h, tryCnt := chain.Head.Height()-1, 0; h >= 1 && tryCnt < int(state.MaxSavedStatesCount)+1; h, tryCnt = h-1, tryCnt+1 {
if chain.appState.IdentityState.HasVersion(h) {
if chain.appState.State.HasVersion(h) && chain.appState.IdentityState.HasVersion(h) {
resetTo = h
break
}
Expand Down Expand Up @@ -2233,12 +2233,13 @@ func (chain *Blockchain) GetIdentityDiff(height uint64) *state.IdentityStateDiff
}

func (chain *Blockchain) ReadSnapshotManifest() *snapshot.Manifest {
cid, root, height, _ := chain.repo.LastSnapshotManifest()
cid, cidV2, root, height, _ := chain.repo.LastSnapshotManifest()
if cid == nil {
return nil
}
return &snapshot.Manifest{
Cid: cid,
CidV2: cidV2,
Root: root,
Height: height,
}
Expand Down
107 changes: 80 additions & 27 deletions core/state/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
SnapshotsFolder = "/snapshots"
)

type SnapshotVersion byte

const SnapshotVersionV1 = SnapshotVersion(1)
const SnapshotVersionV2 = SnapshotVersion(2)

var (
InvalidManifestPrefix = []byte("im")
MaxManifestTimeouts = byte(5)
Expand Down Expand Up @@ -60,13 +65,13 @@ func NewSnapshotManager(db dbm.DB, state *StateDB, bus eventbus.Bus, ipfs ipfs.P
return m
}

func createSnapshotFile(datadir string, height uint64) (fileName string, file *os.File, err error) {
func createSnapshotFile(datadir string, height uint64, version SnapshotVersion) (fileName string, file *os.File, err error) {
newpath := filepath.Join(datadir, SnapshotsFolder)
if err := os.MkdirAll(newpath, os.ModePerm); err != nil {
return "", nil, err
}

filePath := filepath.Join(newpath, strconv.FormatUint(height, 10)+".tar")
filePath := filepath.Join(newpath, strconv.FormatUint(height, 10)+"."+strconv.FormatInt(int64(version), 10)+".tar")
f, err := os.Create(filePath)
if err != nil {
return "", nil, err
Expand All @@ -83,25 +88,35 @@ func (m *SnapshotManager) createSnapshotIfNeeded(block *types.Header) {
}
}

func (m *SnapshotManager) createSnapshot(height uint64) (root common.Hash) {
filePath, file, err := createSnapshotFile(m.cfg.DataDir, height)
func (m *SnapshotManager) createShapshotForVersion(height uint64, version SnapshotVersion) (cidBytes []byte, root common.Hash, filePath string) {
filePath, file, err := createSnapshotFile(m.cfg.DataDir, height, version)
if err != nil {
m.log.Error("Cannot create file for snapshot", "err", err)
return common.Hash{}
return nil, common.Hash{}, ""
}

if root, err = m.state.WriteSnapshot(height, file); err != nil {
file.Close()
m.log.Error("Cannot write snapshot to file", "err", err)
return common.Hash{}
switch version {
case SnapshotVersionV1:
if root, err = m.state.WriteSnapshot(height, file); err != nil {
file.Close()
m.log.Error("Cannot write snapshot to file", "err", err)
return nil, common.Hash{}, ""
}
case SnapshotVersionV2:
if root, err = m.state.WriteSnapshot2(height, file); err != nil {
file.Close()
m.log.Error("Cannot write snapshot to file", "err", err)
return nil, common.Hash{}, ""
}
}

file.Close()
var f *os.File
var cid cid.Cid
if f, err = os.Open(filePath); err != nil {
m.log.Error("Cannot open snapshot file", "err", err)
os.Remove(filePath)
return
return nil, common.Hash{}, ""
}
stat, _ := f.Stat()
if cid, err = m.ipfs.AddFile(f.Name(), f, stat); err != nil {
Expand All @@ -110,27 +125,47 @@ func (m *SnapshotManager) createSnapshot(height uint64) (root common.Hash) {
if err = os.Remove(filePath); err != nil {
m.log.Error("Cannot remove file", "err", err)
}
return
return nil, common.Hash{}, ""
}
return cid.Bytes(), root, filePath
}

func (m *SnapshotManager) createSnapshot(height uint64) (root common.Hash) {

cidV1, root, filePath := m.createShapshotForVersion(height, SnapshotVersionV1)
cidV2, _, filePath2 := m.createShapshotForVersion(height, SnapshotVersionV2)

if cidV1 != nil && cidV2 != nil {
m.clearFs([]string{filePath, filePath2})
m.writeLastManifest(cidV1, cidV2, root, height, filePath, filePath2)
}
m.clearFs(filePath)
m.writeLastManifest(cid.Bytes(), root, height, filePath)
return root
}

func (m *SnapshotManager) clearFs(excludedFile string) {
if prevCid, _, _, _ := m.repo.LastSnapshotManifest(); prevCid != nil {
func (m *SnapshotManager) clearFs(excludedFiles []string) {
if prevCid, prevCidV2, _, _, _ := m.repo.LastSnapshotManifest(); prevCid != nil {
m.ipfs.Unpin(prevCid)
m.ipfs.Unpin(prevCidV2)
}
m.clearSnapshotFolder(excludedFile)
m.clearSnapshotFolder(excludedFiles)
}

func (m *SnapshotManager) clearSnapshotFolder(excludedFile string) {
func (m *SnapshotManager) clearSnapshotFolder(excludedFiles []string) {
directory := filepath.Join(m.cfg.DataDir, SnapshotsFolder)

var files []string

var contains = func(arr []string, value string) bool {
for _, s := range arr {
if s == value {
return true
}
}
return false
}

err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() && path != excludedFile {
if !info.IsDir() && !contains(excludedFiles, path) {
files = append(files, path)
}
return nil
Expand All @@ -147,14 +182,19 @@ func (m *SnapshotManager) clearSnapshotFolder(excludedFile string) {
}
}

func (m *SnapshotManager) writeLastManifest(snapshotCid []byte, root common.Hash, height uint64, file string) {
m.repo.WriteLastSnapshotManifest(snapshotCid, root, height, file)
func (m *SnapshotManager) writeLastManifest(snapshotCid []byte, snapshotCidV2 []byte, root common.Hash, height uint64, file string, fileV2 string) {
m.repo.WriteLastSnapshotManifest(snapshotCid, snapshotCidV2, root, height, file, fileV2)
}

func (m *SnapshotManager) DownloadSnapshot(snapshot *snapshot.Manifest) (filePath string, err error) {
filePath, file, err := createSnapshotFile(m.cfg.DataDir, snapshot.Height)
func (m *SnapshotManager) DownloadSnapshot(snapshot *snapshot.Manifest) (filePath string, version SnapshotVersion, err error) {
version = SnapshotVersionV2
if len(snapshot.CidV2) == 0 {
version = SnapshotVersionV1
}

filePath, file, err := createSnapshotFile(m.cfg.DataDir, snapshot.Height, version)
if err != nil {
return "", err
return "", 0, err
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -179,7 +219,13 @@ func (m *SnapshotManager) DownloadSnapshot(snapshot *snapshot.Manifest) (filePat
done := make(chan struct{})

go func() {
loadToErr = m.ipfs.LoadTo(snapshot.Cid, file, ctx, onLoading)
switch version {
case SnapshotVersionV1:
loadToErr = m.ipfs.LoadTo(snapshot.Cid, file, ctx, onLoading)
case SnapshotVersionV2:
loadToErr = m.ipfs.LoadTo(snapshot.CidV2, file, ctx, onLoading)
}

wg.Done()
close(done)
}()
Expand All @@ -205,11 +251,18 @@ func (m *SnapshotManager) DownloadSnapshot(snapshot *snapshot.Manifest) (filePat
wg.Wait()

if loadToErr == nil {
m.clearFs(filePath)
m.writeLastManifest(snapshot.Cid, snapshot.Root, snapshot.Height, filePath)
m.clearFs([]string{filePath})
var filePath1, filePath2 string
if version == SnapshotVersionV1 {
filePath1 = filePath
}
if version == SnapshotVersionV2 {
filePath2 = filePath
}
m.writeLastManifest(snapshot.Cid, snapshot.CidV2, snapshot.Root, snapshot.Height, filePath1, filePath2)
}

return filePath, loadToErr
return filePath, version, loadToErr
}

func (m *SnapshotManager) StartSync() {
Expand Down
3 changes: 3 additions & 0 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ type Manifest struct {
Root common.Hash
Height uint64
Cid []byte
CidV2 []byte
}

func (m *Manifest) ToBytes() ([]byte, error) {
protoObj := &models.ProtoManifest{
Cid: m.Cid,
Height: m.Height,
Root: m.Root[:],
CidV2: m.CidV2,
}
return proto.Marshal(protoObj)
}
Expand All @@ -29,5 +31,6 @@ func (m *Manifest) FromBytes(data []byte) error {
m.Root = common.BytesToHash(protoObj.Root)
m.Height = protoObj.Height
m.Cid = protoObj.Cid
m.CidV2 = protoObj.CidV2
return nil
}
16 changes: 16 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,18 @@ func (s *StateDB) IterateOverAccounts(callback func(addr common.Address, account
})
}


func (s *StateDB) WriteSnapshot2(height uint64, to io.Writer) (root common.Hash, err error) {
return WriteTreeTo2(s.db, height, to)
}

func (s *StateDB) RecoverSnapshot2(height uint64, treeRoot common.Hash, from io.Reader) error {
pdb := dbm.NewPrefixDB(s.original, StateDbKeys.BuildDbPrefix(height))
common.ClearDb(pdb)
return ReadTreeFrom2(pdb, height, treeRoot, from)
}


func (s *StateDB) WriteSnapshot(height uint64, to io.Writer) (root common.Hash, err error) {
return WriteTreeTo(s.db, height, to)
}
Expand Down Expand Up @@ -1572,6 +1584,10 @@ func (s *StateDB) RemoveDelayedOfflinePenalty(addr common.Address) {
s.GetOrNewDelayedOfflinePenaltyObject().Remove(addr)
}

func (s *StateDB) HasVersion(h uint64) bool {
return s.tree.ExistVersion(int64(h))
}

type readCloser struct {
r io.Reader
}
Expand Down
90 changes: 90 additions & 0 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,96 @@ func TestStateDB_RecoverSnapshot(t *testing.T) {
defer it.Close()
require.False(t, it.Valid())
}

func TestStateDB_RecoverSnapshot2(t *testing.T) {
//arrange
database := db.NewMemDB()
stateDb, _ := NewLazy(database)

prevStateDb := stateDb.db

identity := common.Address{}
stateDb.AddInvite(identity, 1)

stateDb.Commit(true)
const AddrsCount = 50000
const Height = uint64(2)

for i := 0; i < AddrsCount; i++ {
addr := common.Address{}
addr.SetBytes(common.ToBytes(uint64(i)))
stateDb.SetNonce(addr, uint32(i+1))
}

stateDb.Commit(true)

var keys [][]byte
var values [][]byte

stateDb.IterateAccounts(func(key []byte, value []byte) bool {
keys = append(keys, key)
values = append(values, value)
return false
})

require.Equal(t, AddrsCount, len(keys))

expectedRoot := stateDb.Root()
stateDb.AddInvite(common.Address{}, 2)

stateDb.Commit(true)
stateDb, _ = NewLazy(database)
stateDb.Load(3)
stateDb.tree.Hash()

//act

buffer := new(bytes.Buffer)
stateDb.WriteSnapshot2(Height, buffer)
require.True(t, buffer.Len() > 0)

println(buffer.Len())

require.Nil(t, stateDb.RecoverSnapshot2(Height, expectedRoot, buffer))

batch := stateDb.original.NewBatch()

dropDb := stateDb.CommitSnapshot(Height, batch)
common.ClearDb(dropDb)
batch.WriteSync()
//assert

require.Equal(t, int64(Height), stateDb.tree.Version())
require.Equal(t, expectedRoot, stateDb.Root())

i := 0
stateDb.IterateAccounts(func(key []byte, value []byte) bool {

require.Equal(t, keys[i], key)
require.Equal(t, values[i], value)
i++
return false
})

require.Equal(t, AddrsCount, i)

cnt := 0

stateDb.IterateIdentities(func(key []byte, value []byte) bool {
addr := common.Address{}
addr.SetBytes(key[1:])
require.Equal(t, addr, identity)
cnt++
return false
})
require.Equal(t, 1, cnt)

it, _ := prevStateDb.Iterator(nil, nil)
defer it.Close()
require.False(t, it.Valid())
}


func TestStateDB_Set_Has_ValidationTxBit(t *testing.T) {
database := db.NewMemDB()
stateDb, _ := NewLazy(database)
Expand Down
8 changes: 8 additions & 0 deletions core/state/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (t *MutableTree) LazyLoad(version int64) (int64, error) {
return t.tree.LazyLoadVersion(version)
}

func (t *MutableTree) Importer(version int64) (*iavl.Importer, error) {
return t.tree.Import(version)
}

type ImmutableTree struct {
tree *iavl.ImmutableTree
}
Expand Down Expand Up @@ -263,3 +267,7 @@ func (t *ImmutableTree) Rollback() {
func (t *ImmutableTree) SetVirtualVersion(version int64) {
t.tree.SetVirtualVersion(version)
}

func (t *ImmutableTree) Exporter() *iavl.Exporter{
return t.tree.Export()
}
Loading