From dab026e7571a0770956f53f7c50f7896e246d723 Mon Sep 17 00:00:00 2001 From: millken Date: Fri, 13 Jan 2023 14:16:03 +0800 Subject: [PATCH] [db]log mptrie node (#3634) Co-authored-by: huofei <68298506@qq.com> Co-authored-by: Jeremy Chou Co-authored-by: CoderZhi --- db/trie/mptrie/branchnode.go | 18 +++++ db/trie/mptrie/branchnode_test.go | 9 ++- db/trie/mptrie/cachenode.go | 5 ++ db/trie/mptrie/extensionnode.go | 15 ++++ db/trie/mptrie/leafnode.go | 17 +++- db/trie/mptrie/lognode.go | 127 ++++++++++++++++++++++++++++++ db/trie/mptrie/lognode_test.go | 120 ++++++++++++++++++++++++++++ server/main.go | 11 +++ 8 files changed, 317 insertions(+), 5 deletions(-) create mode 100644 db/trie/mptrie/lognode.go create mode 100644 db/trie/mptrie/lognode_test.go diff --git a/db/trie/mptrie/branchnode.go b/db/trie/mptrie/branchnode.go index a28dc2b913..ccfef9908c 100644 --- a/db/trie/mptrie/branchnode.go +++ b/db/trie/mptrie/branchnode.go @@ -46,6 +46,9 @@ func newBranchNode( } } } + if err := logNode(_nodeTypeBranch, _actionTypeNew, bnode, cli); err != nil { + return nil, err + } return bnode, nil } @@ -69,6 +72,9 @@ func newRootBranchNode(cli client, children map[byte]node, indices *SortedList, } } } + if err := logNode(_nodeTypeBranch, _actionTypeNew, bnode, cli); err != nil { + return nil, err + } return bnode, nil } @@ -85,6 +91,9 @@ func newBranchNodeFromProtoPb(pb *triepb.BranchPb, hashVal []byte) *branchNode { } bnode.indices = NewSortedList(bnode.children) bnode.cacheNode.serializable = bnode + if err := logNode(_nodeTypeBranch, _actionTypeNew, bnode, nil); err != nil { + panic(err) + } return bnode } @@ -101,6 +110,9 @@ func (b *branchNode) Children() []node { } func (b *branchNode) Delete(cli client, key keyType, offset uint8) (node, error) { + if err := logNode(_nodeTypeBranch, _actionTypeDelete, b, cli); err != nil { + return nil, err + } offsetKey := key[offset] child, err := b.child(offsetKey) if err != nil { @@ -154,6 +166,9 @@ func (b *branchNode) Delete(cli client, key keyType, offset uint8) (node, error) } func (b *branchNode) Upsert(cli client, key keyType, offset uint8, value []byte) (node, error) { + if err := logNode(_nodeTypeBranch, _actionTypeUpsert, b, cli); err != nil { + return nil, err + } var newChild node offsetKey := key[offset] child, err := b.child(offsetKey) @@ -171,6 +186,9 @@ func (b *branchNode) Upsert(cli client, key keyType, offset uint8, value []byte) } func (b *branchNode) Search(cli client, key keyType, offset uint8) (node, error) { + if err := logNode(_nodeTypeBranch, _actionTypeSearch, b, cli); err != nil { + return nil, err + } child, err := b.child(key[offset]) if err != nil { return nil, err diff --git a/db/trie/mptrie/branchnode_test.go b/db/trie/mptrie/branchnode_test.go index e2186a2310..6989d54ea5 100644 --- a/db/trie/mptrie/branchnode_test.go +++ b/db/trie/mptrie/branchnode_test.go @@ -48,10 +48,11 @@ func equals(bn *branchNode, clone *branchNode) bool { func TestBranchNodeClone(t *testing.T) { require := require.New(t) + cli := &merklePatriciaTrie{async: true, hashFunc: DefaultHashFunc} t.Run("dirty empty root", func(t *testing.T) { children := map[byte]node{} indices := NewSortedList(children) - node, err := newRootBranchNode(nil, children, indices, true) + node, err := newRootBranchNode(cli, children, indices, true) require.NoError(err) bn, ok := node.(*branchNode) require.True(ok) @@ -64,7 +65,7 @@ func TestBranchNodeClone(t *testing.T) { t.Run("clean empty root", func(t *testing.T) { children := map[byte]node{} indices := NewSortedList(children) - node, err := newRootBranchNode(nil, children, indices, false) + node, err := newRootBranchNode(cli, children, indices, false) require.NoError(err) bn, ok := node.(*branchNode) require.True(ok) @@ -81,7 +82,7 @@ func TestBranchNodeClone(t *testing.T) { children['c'] = &hashNode{hashVal: []byte("c")} children['d'] = &hashNode{hashVal: []byte("d")} indices := NewSortedList(children) - node, err := newBranchNode(&merklePatriciaTrie{async: true}, children, indices) + node, err := newBranchNode(cli, children, indices) require.NoError(err) bn, ok := node.(*branchNode) require.True(ok) @@ -105,7 +106,7 @@ func TestBranchNodeProto(t *testing.T) { children: children, indices: indices, } - cli := &merklePatriciaTrie{async: true} + cli := &merklePatriciaTrie{async: true, hashFunc: DefaultHashFunc} proto, err := bnode.proto(cli, true) require.NoError(err) nodepb, ok := proto.(*triepb.NodePb) diff --git a/db/trie/mptrie/cachenode.go b/db/trie/mptrie/cachenode.go index 09d3c6fea1..bc17cb7ad1 100644 --- a/db/trie/mptrie/cachenode.go +++ b/db/trie/mptrie/cachenode.go @@ -6,6 +6,8 @@ package mptrie import ( + "errors" + "google.golang.org/protobuf/proto" ) @@ -24,6 +26,9 @@ func (cn *cacheNode) hash(cli client, flush bool) ([]byte, error) { if len(cn.hashVal) != 0 { return cn.hashVal, nil } + if cli == nil { + return []byte{}, errors.New("client cannot be nil") + } pb, err := cn.proto(cli, flush) if err != nil { return nil, err diff --git a/db/trie/mptrie/extensionnode.go b/db/trie/mptrie/extensionnode.go index 153b8d61e5..9e8a5ab130 100644 --- a/db/trie/mptrie/extensionnode.go +++ b/db/trie/mptrie/extensionnode.go @@ -38,6 +38,9 @@ func newExtensionNode( return nil, err } } + if err := logNode(_nodeTypeExtension, _actionTypeNew, e, cli); err != nil { + return nil, err + } return e, nil } @@ -51,10 +54,16 @@ func newExtensionNodeFromProtoPb(pb *triepb.ExtendPb, hashVal []byte) *extension child: newHashNode(pb.Value), } e.cacheNode.serializable = e + if err := logNode(_nodeTypeExtension, _actionTypeNew, e, nil); err != nil { + panic(err) + } return e } func (e *extensionNode) Delete(cli client, key keyType, offset uint8) (node, error) { + if err := logNode(_nodeTypeExtension, _actionTypeDelete, e, cli); err != nil { + return nil, err + } matched := e.commonPrefixLength(key[offset:]) if matched != uint8(len(e.path)) { return nil, trie.ErrNotExist @@ -85,6 +94,9 @@ func (e *extensionNode) Delete(cli client, key keyType, offset uint8) (node, err } func (e *extensionNode) Upsert(cli client, key keyType, offset uint8, value []byte) (node, error) { + if err := logNode(_nodeTypeExtension, _actionTypeUpsert, e, cli); err != nil { + return nil, err + } matched := e.commonPrefixLength(key[offset:]) if matched == uint8(len(e.path)) { newChild, err := e.child.Upsert(cli, key, offset+matched, value) @@ -120,6 +132,9 @@ func (e *extensionNode) Upsert(cli client, key keyType, offset uint8, value []by } func (e *extensionNode) Search(cli client, key keyType, offset uint8) (node, error) { + if err := logNode(_nodeTypeExtension, _actionTypeSearch, e, cli); err != nil { + return nil, err + } matched := e.commonPrefixLength(key[offset:]) if matched != uint8(len(e.path)) { return nil, trie.ErrNotExist diff --git a/db/trie/mptrie/leafnode.go b/db/trie/mptrie/leafnode.go index 4502b6ba2f..563e50be19 100644 --- a/db/trie/mptrie/leafnode.go +++ b/db/trie/mptrie/leafnode.go @@ -38,6 +38,9 @@ func newLeafNode( return nil, err } } + if err := logNode(_nodeTypeLeaf, _actionTypeNew, l, cli); err != nil { + return nil, err + } return l, nil } @@ -51,6 +54,9 @@ func newLeafNodeFromProtoPb(pb *triepb.LeafPb, hashVal []byte) *leafNode { value: pb.Value, } l.cacheNode.serializable = l + if err := logNode(_nodeTypeLeaf, _actionTypeNew, l, nil); err != nil { + panic(err) + } return l } @@ -63,6 +69,9 @@ func (l *leafNode) Value() []byte { } func (l *leafNode) Delete(cli client, key keyType, offset uint8) (node, error) { + if err := logNode(_nodeTypeLeaf, _actionTypeDelete, l, cli); err != nil { + return nil, err + } if !bytes.Equal(l.key[offset:], key[offset:]) { return nil, trie.ErrNotExist } @@ -70,6 +79,9 @@ func (l *leafNode) Delete(cli client, key keyType, offset uint8) (node, error) { } func (l *leafNode) Upsert(cli client, key keyType, offset uint8, value []byte) (node, error) { + if err := logNode(_nodeTypeLeaf, _actionTypeUpsert, l, cli); err != nil { + return nil, err + } matched := commonPrefixLength(l.key[offset:], key[offset:]) if offset+matched == uint8(len(key)) { if err := l.delete(cli); err != nil { @@ -106,7 +118,10 @@ func (l *leafNode) Upsert(cli client, key keyType, offset uint8, value []byte) ( return newExtensionNode(cli, l.key[offset:offset+matched], bnode) } -func (l *leafNode) Search(_ client, key keyType, offset uint8) (node, error) { +func (l *leafNode) Search(cli client, key keyType, offset uint8) (node, error) { + if err := logNode(_nodeTypeLeaf, _actionTypeSearch, l, cli); err != nil { + return nil, err + } if !bytes.Equal(l.key[offset:], key[offset:]) { return nil, trie.ErrNotExist } diff --git a/db/trie/mptrie/lognode.go b/db/trie/mptrie/lognode.go new file mode 100644 index 0000000000..53c9f75e48 --- /dev/null +++ b/db/trie/mptrie/lognode.go @@ -0,0 +1,127 @@ +package mptrie + +import ( + "bufio" + "os" + + "github.com/pkg/errors" +) + +var ( + enabledLogMptrie = false + logFile *os.File + logWriter *bufio.Writer +) + +type nodeType byte +type actionType byte + +const ( + _nodeTypeLeaf nodeType = 'l' + _nodeTypeExtension nodeType = 'e' + _nodeTypeBranch nodeType = 'b' + + _actionTypeSearch actionType = 's' + _actionTypeUpsert actionType = 'u' + _actionTypeDelete actionType = 'd' + _actionTypeNew actionType = 'n' +) + +// nodeEvent is the event of node +type nodeEvent struct { + NodeType nodeType + ActionType actionType + KeyLen uint8 + Key []byte + PathLen uint8 + Path []byte + ChildrenLen uint8 + Children []byte + HashLen uint8 + HashVal []byte +} + +// Bytes returns the bytes of node event +func (e nodeEvent) Bytes() []byte { + b := make([]byte, 0, 1+1+1+e.KeyLen+1+e.PathLen+1+e.ChildrenLen+1+e.HashLen) + b = append(b, byte(e.NodeType)) + b = append(b, byte(e.ActionType)) + b = append(b, e.KeyLen) + b = append(b, e.Key...) + b = append(b, e.PathLen) + b = append(b, e.Path...) + b = append(b, e.ChildrenLen) + b = append(b, e.Children...) + b = append(b, e.HashLen) + b = append(b, e.HashVal...) + return b +} + +// OpenLogDB open the log DB file +func OpenLogDB(dbPath string) error { + var err error + logFile, err = os.OpenFile(dbPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + logWriter = bufio.NewWriter(logFile) + enabledLogMptrie = true + return nil +} + +// CloseLogDB close the log DB file +func CloseLogDB() error { + if !enabledLogMptrie { + return nil + } + if err := logWriter.Flush(); err != nil { + return err + } + return logFile.Close() +} + +func logNode(nt nodeType, at actionType, n node, cli client) error { + if !enabledLogMptrie { + return nil + } + nodeKey, nodePath, nodeChildren, hashvalue, err := parseNode(n, cli) + if err != nil { + return err + } + event := nodeEvent{ + NodeType: nt, + ActionType: at, + KeyLen: uint8(len(nodeKey)), + Key: nodeKey, + PathLen: uint8(len(nodePath)), + Path: nodePath, + ChildrenLen: uint8(len(nodeChildren)), + Children: nodeChildren, + HashLen: uint8(len(hashvalue)), + HashVal: hashvalue, + } + // write event length + if err = logWriter.WriteByte(byte(len(event.Bytes()))); err != nil { + return err + } + // write event body + _, err = logWriter.Write(event.Bytes()) + return err +} + +func parseNode(n node, cli client) (nodeKey, nodePath, nodeChildren, hashvalue []byte, err error) { + switch n := n.(type) { + case *leafNode: + nodeKey = n.key + hashvalue, err = n.cacheNode.Hash(cli) + case *extensionNode: + nodePath = n.path + hashvalue, err = n.cacheNode.Hash(cli) + case *branchNode: + nodeChildren = n.indices.List() + hashvalue, err = n.cacheNode.Hash(cli) + default: + err = errors.Errorf("unknown node type %T", n) + } + return +} diff --git a/db/trie/mptrie/lognode_test.go b/db/trie/mptrie/lognode_test.go new file mode 100644 index 0000000000..65dd7c10f9 --- /dev/null +++ b/db/trie/mptrie/lognode_test.go @@ -0,0 +1,120 @@ +package mptrie + +import ( + "testing" + + "github.com/iotexproject/iotex-core/testutil" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestNodeEvent(t *testing.T) { + require := require.New(t) + tests := []struct { + event nodeEvent + }{ + { + event: nodeEvent{ + NodeType: _nodeTypeBranch, + ActionType: _actionTypeNew, + KeyLen: 2, + Key: []byte{3, 4}, + PathLen: 5, + Path: []byte{6, 7, 8, 9, 10}, + ChildrenLen: 11, + Children: []byte{12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22}, + HashLen: 23, + HashVal: []byte{24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46}, + }, + }, + { + event: nodeEvent{ + NodeType: _nodeTypeLeaf, + ActionType: _actionTypeSearch, + KeyLen: 6, + Key: []byte("123456"), + PathLen: 6, + Path: []byte("abcdef"), + ChildrenLen: 6, + Children: []byte("ABCDEF"), + HashLen: 6, + HashVal: []byte("abcdef"), + }, + }, + } + + for _, test := range tests { + b := test.event.Bytes() + event, err := parseNodeEvent(b) + require.NoError(err) + require.Equal(test.event.NodeType, event.NodeType) + require.Equal(test.event.ActionType, event.ActionType) + require.Equal(test.event.KeyLen, event.KeyLen) + require.Equal(test.event.Key, event.Key) + require.Equal(test.event.PathLen, event.PathLen) + require.Equal(test.event.Path, event.Path) + require.Equal(test.event.ChildrenLen, event.ChildrenLen) + require.Equal(test.event.Children, event.Children) + require.Equal(test.event.HashLen, event.HashLen) + require.Equal(test.event.HashVal, event.HashVal) + } +} + +// parseNodeEvent parse the node event +func parseNodeEvent(b []byte) (nodeEvent, error) { + if len(b) < 1 { + return nodeEvent{}, errors.New("invalid node event") + } + event := nodeEvent{ + NodeType: nodeType(b[0]), + } + if len(b) < 2 { + return event, nil + } + event.ActionType = actionType(b[1]) + if len(b) < 3 { + return event, nil + } + event.KeyLen = uint8(b[2]) + if len(b) < 3+int(event.KeyLen) { + return event, nil + } + event.Key = b[3 : 3+event.KeyLen] + if len(b) < 3+int(event.KeyLen)+1 { + return event, nil + } + event.PathLen = b[3+event.KeyLen] + if len(b) < 3+int(event.KeyLen)+1+int(event.PathLen) { + return event, nil + } + event.Path = b[3+event.KeyLen+1 : 3+event.KeyLen+1+event.PathLen] + if len(b) < 3+int(event.KeyLen)+1+int(event.PathLen)+1 { + return event, nil + } + event.ChildrenLen = b[3+event.KeyLen+1+event.PathLen] + if len(b) < 3+int(event.KeyLen)+1+int(event.PathLen)+1+int(event.ChildrenLen) { + return event, nil + } + event.Children = b[3+event.KeyLen+1+event.PathLen+1 : 3+event.KeyLen+1+event.PathLen+1+event.ChildrenLen] + if len(b) < 3+int(event.KeyLen)+1+int(event.PathLen)+1+int(event.ChildrenLen)+1 { + return event, nil + } + event.HashLen = b[3+event.KeyLen+1+event.PathLen+1+event.ChildrenLen] + if len(b) < 3+int(event.KeyLen)+1+int(event.PathLen)+1+int(event.ChildrenLen)+1+int(event.HashLen) { + return event, nil + } + event.HashVal = b[3+event.KeyLen+1+event.PathLen+1+event.ChildrenLen+1 : 3+event.KeyLen+1+event.PathLen+1+event.ChildrenLen+1+event.HashLen] + return event, nil +} + +func TestLogDB(t *testing.T) { + require := require.New(t) + testPath, err := testutil.PathOfTempFile("test-log-db") + require.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + require.NoError(OpenLogDB(testPath)) + require.NoError(CloseLogDB()) + enabledLogMptrie = false +} diff --git a/server/main.go b/server/main.go index 9d45a717b5..7f0209a061 100644 --- a/server/main.go +++ b/server/main.go @@ -27,6 +27,7 @@ import ( "github.com/iotexproject/iotex-core/blockchain/block" "github.com/iotexproject/iotex-core/blockchain/genesis" "github.com/iotexproject/iotex-core/config" + "github.com/iotexproject/iotex-core/db/trie/mptrie" "github.com/iotexproject/iotex-core/pkg/log" "github.com/iotexproject/iotex-core/pkg/probe" "github.com/iotexproject/iotex-core/pkg/recovery" @@ -139,6 +140,16 @@ func main() { livenessCancel() }() + if cfg.MptrieLogPath != "" { + if err = mptrie.OpenLogDB(cfg.MptrieLogPath); err != nil { + log.L().Fatal("Failed to open mptrie log DB.", zap.Error(err)) + } + defer func() { + if err = mptrie.CloseLogDB(); err != nil { + log.L().Error("Failed to close mptrie log DB.", zap.Error(err)) + } + }() + } // create and start the node svr, err := itx.NewServer(cfg) if err != nil {