From a4143c9ae31396f24417002a2da33bd97edebc91 Mon Sep 17 00:00:00 2001 From: galaio <12880651+galaio@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:41:07 +0800 Subject: [PATCH] txdag: opt txdag logic & clean todos; (#17) txdag: opt rw record flag; txdag: opt some logic; Co-authored-by: galaio --- core/state/journal.go | 2 +- core/state/state_object.go | 2 +- core/state/statedb.go | 32 +++-- core/state_processor.go | 2 +- core/types/dag.go | 48 ++++--- core/types/dag_test.go | 194 ++++------------------------- core/types/mvstates.go | 137 ++++++++++++-------- core/types/mvstates_test.go | 241 ++++++++++++++++++++++++++++++++++++ 8 files changed, 399 insertions(+), 259 deletions(-) create mode 100644 core/types/mvstates_test.go diff --git a/core/state/journal.go b/core/state/journal.go index 53375b1d12..b9a5d2d6e7 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -184,7 +184,7 @@ func (ch resetObjectChange) revert(dber StateDBer) { if !ch.prevdestruct { s.snapParallelLock.Lock() - s.deleteStateObjectsDestruct(ch.prev.address) + s.removeStateObjectsDestruct(ch.prev.address) s.snapParallelLock.Unlock() } if ch.prevAccount != nil { diff --git a/core/state/state_object.go b/core/state/state_object.go index cd7cde2947..f8578604dc 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -384,7 +384,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { // 2) we don't have new values, and can deliver empty response back //if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed { s.db.snapParallelLock.RLock() - if _, destructed := s.db.queryStateObjectsDestruct(s.address); destructed { // fixme: use sync.Map, instead of RWMutex? + if _, destructed := s.db.getStateObjectsDegetstruct(s.address); destructed { // fixme: use sync.Map, instead of RWMutex? s.db.snapParallelLock.RUnlock() return common.Hash{} } diff --git a/core/state/statedb.go b/core/state/statedb.go index 1dc0e051b3..a53465095e 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -244,10 +244,9 @@ type StateDB struct { logSize uint // parallel EVM related - rwSet *types.RWSet - mvStates *types.MVStates - stat *types.ExeStat - rwRecordFlag bool + rwSet *types.RWSet + mvStates *types.MVStates + stat *types.ExeStat // Preimages occurred seen by VM in the scope of block. preimages map[common.Hash][]byte @@ -686,8 +685,8 @@ func (s *StateDB) SetStorage(addr common.Address, storage map[common.Hash]common // // TODO(rjl493456442) this function should only be supported by 'unwritable' // state and all mutations made should all be discarded afterwards. - if _, ok := s.queryStateObjectsDestruct(addr); !ok { - s.tagStateObjectsDestruct(addr, nil) + if _, ok := s.getStateObjectsDegetstruct(addr); !ok { + s.setStateObjectsDestruct(addr, nil) } object := s.GetOrNewStateObject(addr) for k, v := range storage { @@ -1025,9 +1024,9 @@ func (s *StateDB) createObject(addr common.Address) (newobj *stateObject) { // account and storage data should be cleared as well. Note, it must // be done here, otherwise the destruction event of "original account" // will be lost. - _, prevdestruct := s.queryStateObjectsDestruct(prev.address) + _, prevdestruct := s.getStateObjectsDegetstruct(prev.address) if !prevdestruct { - s.tagStateObjectsDestruct(prev.address, prev.origin) + s.setStateObjectsDestruct(prev.address, prev.origin) } // There may be some cached account/storage data already since IntermediateRoot // will be called for each transaction before byzantium fork which will always @@ -2373,7 +2372,6 @@ func (s *StateDB) BeforeTxTransition() { s.rwSet = types.NewRWSet(types.StateVersion{ TxIndex: s.txIndex, }) - s.rwRecordFlag = true } func (s *StateDB) BeginTxStat(index int) { @@ -2405,10 +2403,9 @@ func (s *StateDB) RecordRead(key types.RWKey, val interface{}) { if s.isParallel && s.parallel.isSlotDB { return } - if !s.rwRecordFlag { + if s.rwSet == nil || s.rwSet.RWRecordDone() { return } - // TODO: read from MVStates, record with ver s.rwSet.RecordRead(key, types.StateVersion{ TxIndex: -1, }, val) @@ -2418,7 +2415,7 @@ func (s *StateDB) RecordWrite(key types.RWKey, val interface{}) { if s.isParallel && s.parallel.isSlotDB { return } - if !s.rwRecordFlag { + if s.rwSet == nil || s.rwSet.RWRecordDone() { return } s.rwSet.RecordWrite(key, val) @@ -2430,14 +2427,13 @@ func (s *StateDB) ResetMVStates(txCount int) { } s.mvStates = types.NewMVStates(txCount) s.rwSet = nil - s.rwRecordFlag = false } func (s *StateDB) FinaliseRWSet() error { if s.isParallel && s.parallel.isSlotDB { return nil } - if !s.rwRecordFlag { + if s.rwSet == nil || s.rwSet.RWRecordDone() { return nil } if metrics.EnabledExpensive { @@ -2475,11 +2471,11 @@ func (s *StateDB) FinaliseRWSet() error { } } - s.rwRecordFlag = false + s.rwSet.SetRWRecordDone() return s.mvStates.FulfillRWSet(s.rwSet, s.stat) } -func (s *StateDB) queryStateObjectsDestruct(addr common.Address) (*types.StateAccount, bool) { +func (s *StateDB) getStateObjectsDegetstruct(addr common.Address) (*types.StateAccount, bool) { if !(s.isParallel && s.parallel.isSlotDB) { if acc, ok := s.stateObjectsDestructDirty[addr]; ok { return acc, ok @@ -2489,7 +2485,7 @@ func (s *StateDB) queryStateObjectsDestruct(addr common.Address) (*types.StateAc return acc, ok } -func (s *StateDB) tagStateObjectsDestruct(addr common.Address, acc *types.StateAccount) { +func (s *StateDB) setStateObjectsDestruct(addr common.Address, acc *types.StateAccount) { if !(s.isParallel && s.parallel.isSlotDB) { s.stateObjectsDestructDirty[addr] = acc return @@ -2498,7 +2494,7 @@ func (s *StateDB) tagStateObjectsDestruct(addr common.Address, acc *types.StateA return } -func (s *StateDB) deleteStateObjectsDestruct(addr common.Address) { +func (s *StateDB) removeStateObjectsDestruct(addr common.Address) { if !(s.isParallel && s.parallel.isSlotDB) { delete(s.stateObjectsDestructDirty, addr) return diff --git a/core/state_processor.go b/core/state_processor.go index 2498448a0f..b41c64f3e3 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -125,9 +125,9 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), withdrawals) if p.bc.enableTxDAG { - // TODO(galaio): append dag into block body, TxDAGPerformance will print metrics when profile is enabled // compare input TxDAG when it enable in consensus dag, extraStats := statedb.ResolveTxDAG([]common.Address{context.Coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient}) + // TODO(galaio): check TxDAG correctness? log.Debug("Process TxDAG result", "block", block.NumberU64(), "txDAG", dag) if metrics.EnabledExpensive { types.EvaluateTxDAGPerformance(dag, extraStats) diff --git a/core/types/dag.go b/core/types/dag.go index 1cbd87e431..801ec476a2 100644 --- a/core/types/dag.go +++ b/core/types/dag.go @@ -33,6 +33,9 @@ type TxDAG interface { // TxCount return tx count TxCount() int + + // SetTxDep at the last one + SetTxDep(int, TxDep) error } func EncodeTxDAG(dag TxDAG) ([]byte, error) { @@ -99,6 +102,10 @@ func (d *EmptyTxDAG) TxCount() int { return 0 } +func (d *EmptyTxDAG) SetTxDep(int, TxDep) error { + return nil +} + func (d *EmptyTxDAG) String() string { return "None" } @@ -129,6 +136,18 @@ func (d *PlainTxDAG) TxCount() int { return len(d.TxDeps) } +func (d *PlainTxDAG) SetTxDep(i int, dep TxDep) error { + if i < 0 || i > len(d.TxDeps) { + return fmt.Errorf("SetTxDep with wrong index: %d", i) + } + if i < len(d.TxDeps) { + d.TxDeps[i] = dep + return nil + } + d.TxDeps = append(d.TxDeps, dep) + return nil +} + func NewPlainTxDAG(txLen int) *PlainTxDAG { return &PlainTxDAG{ TxDeps: make([]TxDep, txLen), @@ -285,23 +304,23 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) { // It assumes that there is no parallel thread limit txCount := dag.TxCount() var ( - maxGasIndex int - maxGas uint64 - maxTimeIndex int - maxTime time.Duration - txTimes = make([]time.Duration, txCount) - txGases = make([]uint64, txCount) - txReads = make([]int, txCount) - noDepdencyCount int + maxGasIndex int + maxGas uint64 + maxTimeIndex int + maxTime time.Duration + txTimes = make([]time.Duration, txCount) + txGases = make([]uint64, txCount) + txReads = make([]int, txCount) + noDepCnt int ) totalTxMeter.Mark(int64(txCount)) for i, path := range paths { - if stats[i].mustSerialFlag { + if stats[i].mustSerial { continue } if len(path) <= 1 { - noDepdencyCount++ + noDepCnt++ totalNoDepMeter.Mark(1) } if len(path) <= 3 { @@ -358,7 +377,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) { sPath []int ) for i, stat := range stats { - if stat.mustSerialFlag { + if stat.mustSerial { continue } sPath = append(sPath, i) @@ -399,8 +418,9 @@ type ExeStat struct { readCount int startTime time.Time costTime time.Duration - // TODO: consider system tx, gas fee issues, may need to use different flag - mustSerialFlag bool + + // some flags + mustSerial bool } func NewExeStat(txIndex int) *ExeStat { @@ -420,7 +440,7 @@ func (s *ExeStat) Done() *ExeStat { } func (s *ExeStat) WithSerialFlag() *ExeStat { - s.mustSerialFlag = true + s.mustSerial = true return s } diff --git a/core/types/dag_test.go b/core/types/dag_test.go index b83da5f5fb..1c0b334a8d 100644 --- a/core/types/dag_test.go +++ b/core/types/dag_test.go @@ -7,7 +7,6 @@ import ( "github.com/cometbft/cometbft/libs/rand" "github.com/ethereum/go-ethereum/common" - "github.com/holiman/uint256" "github.com/stretchr/testify/require" ) @@ -17,6 +16,31 @@ var ( ) func TestTxDAG(t *testing.T) { + dag := mockSimpleDAG() + require.NoError(t, dag.SetTxDep(9, TxDep{ + Relation: 1, + TxIndexes: nil, + })) + require.NoError(t, dag.SetTxDep(10, TxDep{ + Relation: 1, + TxIndexes: nil, + })) + require.Error(t, dag.SetTxDep(12, TxDep{ + Relation: 1, + TxIndexes: nil, + })) + dag = NewEmptyTxDAG() + require.NoError(t, dag.SetTxDep(0, TxDep{ + Relation: 1, + TxIndexes: nil, + })) + require.NoError(t, dag.SetTxDep(11, TxDep{ + Relation: 1, + TxIndexes: nil, + })) +} + +func TestTxDAG_SetTxDep(t *testing.T) { dag := mockSimpleDAG() t.Log(dag) dag = mockSystemTxDAG() @@ -36,144 +60,6 @@ func TestEvaluateTxDAG(t *testing.T) { EvaluateTxDAGPerformance(dag, stats) } -func TestSimpleMVStates2TxDAG(t *testing.T) { - ms := NewMVStates(10) - - ms.rwSets[0] = mockRWSet(0, []string{"0x00"}, []string{"0x00"}) - ms.rwSets[1] = mockRWSet(1, []string{"0x01"}, []string{"0x01"}) - ms.rwSets[2] = mockRWSet(2, []string{"0x02"}, []string{"0x02"}) - ms.rwSets[3] = mockRWSet(3, []string{"0x00", "0x03"}, []string{"0x03"}) - ms.rwSets[4] = mockRWSet(4, []string{"0x00", "0x04"}, []string{"0x04"}) - ms.rwSets[5] = mockRWSet(5, []string{"0x01", "0x02", "0x05"}, []string{"0x05"}) - ms.rwSets[6] = mockRWSet(6, []string{"0x02", "0x05", "0x06"}, []string{"0x06"}) - ms.rwSets[7] = mockRWSet(7, []string{"0x06", "0x07"}, []string{"0x07"}) - ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"}) - ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"}) - - dag := ms.ResolveTxDAG(nil) - require.Equal(t, mockSimpleDAG(), dag) - t.Log(dag) -} - -func TestSystemTxMVStates2TxDAG(t *testing.T) { - ms := NewMVStates(12) - - ms.rwSets[0] = mockRWSet(0, []string{"0x00"}, []string{"0x00"}) - ms.rwSets[1] = mockRWSet(1, []string{"0x01"}, []string{"0x01"}) - ms.rwSets[2] = mockRWSet(2, []string{"0x02"}, []string{"0x02"}) - ms.rwSets[3] = mockRWSet(3, []string{"0x00", "0x03"}, []string{"0x03"}) - ms.rwSets[4] = mockRWSet(4, []string{"0x00", "0x04"}, []string{"0x04"}) - ms.rwSets[5] = mockRWSet(5, []string{"0x01", "0x02", "0x05"}, []string{"0x05"}) - ms.rwSets[6] = mockRWSet(6, []string{"0x02", "0x05", "0x06"}, []string{"0x06"}) - ms.rwSets[7] = mockRWSet(7, []string{"0x06", "0x07"}, []string{"0x07"}) - ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"}) - ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"}) - ms.rwSets[10] = mockRWSet(10, []string{"0x10"}, []string{"0x10"}).WithSerialFlag() - ms.rwSets[11] = mockRWSet(11, []string{"0x11"}, []string{"0x11"}).WithSerialFlag() - - dag := ms.ResolveTxDAG(nil) - require.Equal(t, mockSystemTxDAG(), dag) - t.Log(dag) -} - -func TestIsEqualRWVal(t *testing.T) { - tests := []struct { - key RWKey - src interface{} - compared interface{} - isEqual bool - }{ - { - key: AccountStateKey(mockAddr, AccountNonce), - src: uint64(0), - compared: uint64(0), - isEqual: true, - }, - { - key: AccountStateKey(mockAddr, AccountNonce), - src: uint64(0), - compared: uint64(1), - isEqual: false, - }, - { - key: AccountStateKey(mockAddr, AccountBalance), - src: new(uint256.Int).SetUint64(1), - compared: new(uint256.Int).SetUint64(1), - isEqual: true, - }, - { - key: AccountStateKey(mockAddr, AccountBalance), - src: nil, - compared: new(uint256.Int).SetUint64(1), - isEqual: false, - }, - { - key: AccountStateKey(mockAddr, AccountBalance), - src: (*uint256.Int)(nil), - compared: new(uint256.Int).SetUint64(1), - isEqual: false, - }, - { - key: AccountStateKey(mockAddr, AccountBalance), - src: (*uint256.Int)(nil), - compared: (*uint256.Int)(nil), - isEqual: true, - }, - { - key: AccountStateKey(mockAddr, AccountCodeHash), - src: []byte{1}, - compared: []byte{1}, - isEqual: true, - }, - { - key: AccountStateKey(mockAddr, AccountCodeHash), - src: nil, - compared: []byte{1}, - isEqual: false, - }, - { - key: AccountStateKey(mockAddr, AccountCodeHash), - src: ([]byte)(nil), - compared: []byte{1}, - isEqual: false, - }, - { - key: AccountStateKey(mockAddr, AccountCodeHash), - src: ([]byte)(nil), - compared: ([]byte)(nil), - isEqual: true, - }, - { - key: AccountStateKey(mockAddr, AccountSuicide), - src: struct{}{}, - compared: struct{}{}, - isEqual: false, - }, - { - key: AccountStateKey(mockAddr, AccountSuicide), - src: nil, - compared: struct{}{}, - isEqual: false, - }, - { - key: StorageStateKey(mockAddr, mockHash), - src: mockHash, - compared: mockHash, - isEqual: true, - }, - { - key: StorageStateKey(mockAddr, mockHash), - src: nil, - compared: mockHash, - isEqual: false, - }, - } - - for i, item := range tests { - require.Equal(t, item.isEqual, isEqualRWVal(item.key, item.src, item.compared), i) - } -} - func TestMergeTxDAGExecutionPaths_Simple(t *testing.T) { paths := MergeTxDAGExecutionPaths(mockSimpleDAG()) require.Equal(t, [][]uint64{ @@ -268,36 +154,6 @@ func mockSystemTxDAG() TxDAG { return dag } -func mockRWSet(index int, read []string, write []string) *RWSet { - ver := StateVersion{ - TxIndex: index, - } - set := NewRWSet(ver) - for _, k := range read { - key := RWKey{} - if len(k) > len(key) { - k = k[:len(key)] - } - copy(key[:], k) - set.readSet[key] = &ReadRecord{ - StateVersion: ver, - Val: struct{}{}, - } - } - for _, k := range write { - key := RWKey{} - if len(k) > len(key) { - k = k[:len(key)] - } - copy(key[:], k) - set.writeSet[key] = &WriteRecord{ - Val: struct{}{}, - } - } - - return set -} - func TestTxDAG_Encode_Decode(t *testing.T) { expected := TxDAG(&EmptyTxDAG{}) enc, err := EncodeTxDAG(expected) diff --git a/core/types/mvstates.go b/core/types/mvstates.go index 5b7580df1b..85d3886470 100644 --- a/core/types/mvstates.go +++ b/core/types/mvstates.go @@ -7,6 +7,8 @@ import ( "strings" "sync" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/holiman/uint256" @@ -86,37 +88,26 @@ func (key *RWKey) Addr() common.Address { // if TxIndex equals to -1, it means the state read from DB. type StateVersion struct { TxIndex int - // TODO(galaio): used for multi ver state + // Tx incarnation used for multi ver state TxIncarnation int } -// ReadRecord keep read value & its version -type ReadRecord struct { - StateVersion - Val interface{} -} - -// WriteRecord keep latest state value & change count -type WriteRecord struct { - Val interface{} -} - // RWSet record all read & write set in txs // Attention: this is not a concurrent safety structure type RWSet struct { ver StateVersion - readSet map[RWKey]*ReadRecord - writeSet map[RWKey]*WriteRecord + readSet map[RWKey]*RWItem + writeSet map[RWKey]*RWItem - // some flags - mustSerial bool + rwRecordDone bool + mustSerial bool } func NewRWSet(ver StateVersion) *RWSet { return &RWSet{ ver: ver, - readSet: make(map[RWKey]*ReadRecord), - writeSet: make(map[RWKey]*WriteRecord), + readSet: make(map[RWKey]*RWItem), + writeSet: make(map[RWKey]*RWItem), } } @@ -125,16 +116,17 @@ func (s *RWSet) RecordRead(key RWKey, ver StateVersion, val interface{}) { if _, exist := s.readSet[key]; exist { return } - s.readSet[key] = &ReadRecord{ - StateVersion: ver, - Val: val, + s.readSet[key] = &RWItem{ + Ver: ver, + Val: val, } } func (s *RWSet) RecordWrite(key RWKey, val interface{}) { wr, exist := s.writeSet[key] if !exist { - s.writeSet[key] = &WriteRecord{ + s.writeSet[key] = &RWItem{ + Ver: s.ver, Val: val, } return @@ -146,11 +138,11 @@ func (s *RWSet) Version() StateVersion { return s.ver } -func (s *RWSet) ReadSet() map[RWKey]*ReadRecord { +func (s *RWSet) ReadSet() map[RWKey]*RWItem { return s.readSet } -func (s *RWSet) WriteSet() map[RWKey]*WriteRecord { +func (s *RWSet) WriteSet() map[RWKey]*RWItem { return s.writeSet } @@ -159,6 +151,14 @@ func (s *RWSet) WithSerialFlag() *RWSet { return s } +func (s *RWSet) RWRecordDone() bool { + return s.rwRecordDone +} + +func (s *RWSet) SetRWRecordDone() { + s.rwRecordDone = true +} + func (s *RWSet) String() string { builder := strings.Builder{} builder.WriteString(fmt.Sprintf("tx: %v, inc: %v\nreadSet: [", s.ver.TxIndex, s.ver.TxIncarnation)) @@ -219,37 +219,37 @@ func equalUint256(s, c *uint256.Int) bool { return s == c } -type PendingWrite struct { +type RWItem struct { Ver StateVersion Val interface{} } -func NewPendingWrite(ver StateVersion, wr *WriteRecord) *PendingWrite { - return &PendingWrite{ +func NewRWItem(ver StateVersion, val interface{}) *RWItem { + return &RWItem{ Ver: ver, - Val: wr.Val, + Val: val, } } -func (w *PendingWrite) TxIndex() int { +func (w *RWItem) TxIndex() int { return w.Ver.TxIndex } -func (w *PendingWrite) TxIncarnation() int { +func (w *RWItem) TxIncarnation() int { return w.Ver.TxIncarnation } type PendingWrites struct { - list []*PendingWrite + list []*RWItem } func NewPendingWrites() *PendingWrites { return &PendingWrites{ - list: make([]*PendingWrite, 0), + list: make([]*RWItem, 0), } } -func (w *PendingWrites) Append(pw *PendingWrite) { +func (w *PendingWrites) Append(pw *RWItem) { if i, found := w.SearchTxIndex(pw.TxIndex()); found { w.list[i] = pw return @@ -279,7 +279,7 @@ func (w *PendingWrites) SearchTxIndex(txIndex int) (int, bool) { return i, i < n && w.list[i].TxIndex() == txIndex } -func (w *PendingWrites) FindLastWrite(txIndex int) *PendingWrite { +func (w *PendingWrites) FindLastWrite(txIndex int) *RWItem { var i, _ = w.SearchTxIndex(txIndex) for j := i - 1; j >= 0; j-- { if w.list[j].TxIndex() < txIndex { @@ -291,8 +291,9 @@ func (w *PendingWrites) FindLastWrite(txIndex int) *PendingWrite { } type MVStates struct { - rwSets map[int]*RWSet - pendingWriteSet map[RWKey]*PendingWrites + rwSets map[int]*RWSet + pendingWriteSet map[RWKey]*PendingWrites + nextFinaliseIndex int // dependency map cache for generating TxDAG // depsCache[i].exist(j) means j->i, and i > j @@ -333,21 +334,27 @@ func (s *MVStates) RWSet(index int) *RWSet { return s.rwSets[index] } -// ReadState TODO(galaio): read state from MVStates -func (s *MVStates) ReadState(key RWKey) (interface{}, bool) { - return nil, false +// ReadState read state from MVStates +func (s *MVStates) ReadState(txIndex int, key RWKey) *RWItem { + s.lock.RLock() + defer s.lock.RUnlock() + + wset, ok := s.pendingWriteSet[key] + if !ok { + return nil + } + return wset.FindLastWrite(txIndex) } // FulfillRWSet it can execute as async, and rwSet & stat must guarantee read-only // try to generate TxDAG, when fulfill RWSet -// TODO(galaio): support flag to stat execution as optional func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error { - log.Debug("FulfillRWSet", "s.len", len(s.rwSets), "cur", rwSet.ver.TxIndex, "reads", len(rwSet.readSet), "writes", len(rwSet.writeSet)) + log.Debug("FulfillRWSet", "total", len(s.rwSets), "cur", rwSet.ver.TxIndex, "reads", len(rwSet.readSet), "writes", len(rwSet.writeSet)) s.lock.Lock() defer s.lock.Unlock() index := rwSet.ver.TxIndex - if s := s.rwSets[index]; s != nil { - return errors.New("refill a exist RWSet") + if index < s.nextFinaliseIndex { + return errors.New("fulfill a finalized RWSet") } if stat != nil { if stat.txIndex != index { @@ -356,26 +363,46 @@ func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error { s.stats[index] = stat } + if metrics.EnabledExpensive { + for k := range rwSet.writeSet { + // this action is only for testing, it runs when enable expensive metrics. + checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet) + } + } s.resolveDepsCache(index, rwSet) + s.rwSets[index] = rwSet + return nil +} + +// Finalise it will put target write set into pending writes. +func (s *MVStates) Finalise(index int) error { + log.Debug("Finalise", "total", len(s.rwSets), "index", index) + s.lock.Lock() + defer s.lock.Unlock() + + rwSet := s.rwSets[index] + if rwSet == nil { + return fmt.Errorf("finalise a non-exist RWSet, index: %d", index) + } + + if index != s.nextFinaliseIndex { + return fmt.Errorf("finalise in wrong order, next: %d, input: %d", s.nextFinaliseIndex, index) + } + // append to pending write set for k, v := range rwSet.writeSet { - // TODO(galaio): this action is only for testing, it can be removed in production mode. - // ignore no changed write record - checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet) if _, exist := s.pendingWriteSet[k]; !exist { s.pendingWriteSet[k] = NewPendingWrites() } - s.pendingWriteSet[k].Append(NewPendingWrite(rwSet.ver, v)) + s.pendingWriteSet[k].Append(v) } - s.rwSets[index] = rwSet + s.nextFinaliseIndex++ return nil } func (s *MVStates) resolveDepsCache(index int, rwSet *RWSet) { // analysis dep, if the previous transaction is not executed/validated, re-analysis is required - if _, ok := s.depsCache[index]; !ok { - s.depsCache[index] = NewTxDeps(0) - } + s.depsCache[index] = NewTxDeps(0) for prev := 0; prev < index; prev++ { // if there are some parallel execution or system txs, it will fulfill in advance // it's ok, and try re-generate later @@ -395,11 +422,11 @@ func (s *MVStates) resolveDepsCache(index int, rwSet *RWSet) { } } -func checkRWSetInconsistent(index int, k RWKey, readSet map[RWKey]*ReadRecord, writeSet map[RWKey]*WriteRecord) bool { +func checkRWSetInconsistent(index int, k RWKey, readSet map[RWKey]*RWItem, writeSet map[RWKey]*RWItem) bool { var ( readOk bool writeOk bool - r *WriteRecord + r *RWItem ) if k.IsAccountSuicide() { @@ -411,7 +438,7 @@ func checkRWSetInconsistent(index int, k RWKey, readSet map[RWKey]*ReadRecord, w r, writeOk = writeSet[k] if readOk != writeOk { // check if it's correct? read nil, write non-nil - log.Info("checkRWSetInconsistent find inconsistent", "tx", index, "k", k.String(), "read", readOk, "write", writeOk, "val", r.Val) + log.Warn("checkRWSetInconsistent find inconsistent", "tx", index, "k", k.String(), "read", readOk, "write", writeOk, "val", r.Val) return true } @@ -443,7 +470,7 @@ func (s *MVStates) ResolveTxDAG(gasFeeReceivers []common.Address) TxDAG { return txDAG } -func checkDependency(writeSet map[RWKey]*WriteRecord, readSet map[RWKey]*ReadRecord) bool { +func checkDependency(writeSet map[RWKey]*RWItem, readSet map[RWKey]*RWItem) bool { // check tx dependency, only check key, skip version for k, _ := range writeSet { // check suicide, add read address flag, it only for check suicide quickly, and cannot for other scenarios. diff --git a/core/types/mvstates_test.go b/core/types/mvstates_test.go new file mode 100644 index 0000000000..9d4422bf1f --- /dev/null +++ b/core/types/mvstates_test.go @@ -0,0 +1,241 @@ +package types + +import ( + "testing" + + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" +) + +func TestMVStates_BasicUsage(t *testing.T) { + ms := NewMVStates(0) + require.NoError(t, ms.FulfillRWSet(mockRWSetWithVal(0, []interface{}{"0x00", 0}, []interface{}{"0x00", 0}), nil)) + require.Nil(t, ms.ReadState(0, str2key("0x00"))) + require.NoError(t, ms.Finalise(0)) + require.Error(t, ms.Finalise(0)) + require.Error(t, ms.FulfillRWSet(mockRWSetWithVal(0, nil, nil), nil)) + require.Nil(t, ms.ReadState(0, str2key("0x00"))) + require.Equal(t, NewRWItem(StateVersion{TxIndex: 0}, 0), ms.ReadState(1, str2key("0x00"))) + + require.NoError(t, ms.FulfillRWSet(mockRWSetWithVal(1, []interface{}{"0x01", 1}, []interface{}{"0x01", 1}), nil)) + require.Nil(t, ms.ReadState(1, str2key("0x01"))) + require.NoError(t, ms.Finalise(1)) + require.Nil(t, ms.ReadState(0, str2key("0x01"))) + require.Equal(t, NewRWItem(StateVersion{TxIndex: 1}, 1), ms.ReadState(2, str2key("0x01"))) + + require.NoError(t, ms.FulfillRWSet(mockRWSetWithVal(2, []interface{}{"0x02", 2, "0x01", 1}, []interface{}{"0x01", 2, "0x02", 2}), nil)) + require.NoError(t, ms.Finalise(2)) + require.Equal(t, NewRWItem(StateVersion{TxIndex: 1}, 1), ms.ReadState(2, str2key("0x01"))) + require.Equal(t, NewRWItem(StateVersion{TxIndex: 2}, 2), ms.ReadState(3, str2key("0x01"))) + + require.NoError(t, ms.FulfillRWSet(mockRWSetWithVal(3, []interface{}{"0x03", 3, "0x00", 0, "0x01", 2}, []interface{}{"0x00", 3, "0x01", 3, "0x03", 3}), nil)) + require.Nil(t, ms.ReadState(3, str2key("0x03"))) + require.NoError(t, ms.Finalise(3)) + require.Nil(t, ms.ReadState(0, str2key("0x01"))) + require.Equal(t, NewRWItem(StateVersion{TxIndex: 1}, 1), ms.ReadState(2, str2key("0x01"))) + require.Equal(t, NewRWItem(StateVersion{TxIndex: 2}, 2), ms.ReadState(3, str2key("0x01"))) + require.Equal(t, NewRWItem(StateVersion{TxIndex: 3}, 3), ms.ReadState(4, str2key("0x01"))) + require.Nil(t, ms.ReadState(0, str2key("0x00"))) + require.Equal(t, NewRWItem(StateVersion{TxIndex: 3}, 3), ms.ReadState(5, str2key("0x00"))) +} + +func TestSimpleMVStates2TxDAG(t *testing.T) { + ms := NewMVStates(10) + + ms.rwSets[0] = mockRWSet(0, []string{"0x00"}, []string{"0x00"}) + ms.rwSets[1] = mockRWSet(1, []string{"0x01"}, []string{"0x01"}) + ms.rwSets[2] = mockRWSet(2, []string{"0x02"}, []string{"0x02"}) + ms.rwSets[3] = mockRWSet(3, []string{"0x00", "0x03"}, []string{"0x03"}) + ms.rwSets[4] = mockRWSet(4, []string{"0x00", "0x04"}, []string{"0x04"}) + ms.rwSets[5] = mockRWSet(5, []string{"0x01", "0x02", "0x05"}, []string{"0x05"}) + ms.rwSets[6] = mockRWSet(6, []string{"0x02", "0x05", "0x06"}, []string{"0x06"}) + ms.rwSets[7] = mockRWSet(7, []string{"0x06", "0x07"}, []string{"0x07"}) + ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"}) + ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"}) + + dag := ms.ResolveTxDAG(nil) + require.Equal(t, mockSimpleDAG(), dag) + t.Log(dag) +} + +func TestSystemTxMVStates2TxDAG(t *testing.T) { + ms := NewMVStates(12) + + ms.rwSets[0] = mockRWSet(0, []string{"0x00"}, []string{"0x00"}) + ms.rwSets[1] = mockRWSet(1, []string{"0x01"}, []string{"0x01"}) + ms.rwSets[2] = mockRWSet(2, []string{"0x02"}, []string{"0x02"}) + ms.rwSets[3] = mockRWSet(3, []string{"0x00", "0x03"}, []string{"0x03"}) + ms.rwSets[4] = mockRWSet(4, []string{"0x00", "0x04"}, []string{"0x04"}) + ms.rwSets[5] = mockRWSet(5, []string{"0x01", "0x02", "0x05"}, []string{"0x05"}) + ms.rwSets[6] = mockRWSet(6, []string{"0x02", "0x05", "0x06"}, []string{"0x06"}) + ms.rwSets[7] = mockRWSet(7, []string{"0x06", "0x07"}, []string{"0x07"}) + ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"}) + ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"}) + ms.rwSets[10] = mockRWSet(10, []string{"0x10"}, []string{"0x10"}).WithSerialFlag() + ms.rwSets[11] = mockRWSet(11, []string{"0x11"}, []string{"0x11"}).WithSerialFlag() + + dag := ms.ResolveTxDAG(nil) + require.Equal(t, mockSystemTxDAG(), dag) + t.Log(dag) +} + +func TestIsEqualRWVal(t *testing.T) { + tests := []struct { + key RWKey + src interface{} + compared interface{} + isEqual bool + }{ + { + key: AccountStateKey(mockAddr, AccountNonce), + src: uint64(0), + compared: uint64(0), + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountNonce), + src: uint64(0), + compared: uint64(1), + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountBalance), + src: new(uint256.Int).SetUint64(1), + compared: new(uint256.Int).SetUint64(1), + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountBalance), + src: nil, + compared: new(uint256.Int).SetUint64(1), + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountBalance), + src: (*uint256.Int)(nil), + compared: new(uint256.Int).SetUint64(1), + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountBalance), + src: (*uint256.Int)(nil), + compared: (*uint256.Int)(nil), + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountCodeHash), + src: []byte{1}, + compared: []byte{1}, + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountCodeHash), + src: nil, + compared: []byte{1}, + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountCodeHash), + src: ([]byte)(nil), + compared: []byte{1}, + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountCodeHash), + src: ([]byte)(nil), + compared: ([]byte)(nil), + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountSuicide), + src: struct{}{}, + compared: struct{}{}, + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountSuicide), + src: nil, + compared: struct{}{}, + isEqual: false, + }, + { + key: StorageStateKey(mockAddr, mockHash), + src: mockHash, + compared: mockHash, + isEqual: true, + }, + { + key: StorageStateKey(mockAddr, mockHash), + src: nil, + compared: mockHash, + isEqual: false, + }, + } + + for i, item := range tests { + require.Equal(t, item.isEqual, isEqualRWVal(item.key, item.src, item.compared), i) + } +} + +func mockRWSet(index int, read []string, write []string) *RWSet { + ver := StateVersion{ + TxIndex: index, + } + set := NewRWSet(ver) + for _, k := range read { + set.readSet[str2key(k)] = &RWItem{ + Ver: ver, + Val: struct{}{}, + } + } + for _, k := range write { + set.writeSet[str2key(k)] = &RWItem{ + Ver: ver, + Val: struct{}{}, + } + } + + return set +} + +func mockRWSetWithVal(index int, read []interface{}, write []interface{}) *RWSet { + ver := StateVersion{ + TxIndex: index, + } + set := NewRWSet(ver) + + if len(read)%2 != 0 { + panic("wrong read size") + } + if len(write)%2 != 0 { + panic("wrong write size") + } + + for i := 0; i < len(read); { + set.readSet[str2key(read[i].(string))] = &RWItem{ + Ver: StateVersion{ + TxIndex: index - 1, + }, + Val: read[i+1], + } + i += 2 + } + for i := 0; i < len(write); { + set.writeSet[str2key(write[i].(string))] = &RWItem{ + Ver: ver, + Val: write[i+1], + } + i += 2 + } + + return set +} + +func str2key(k string) RWKey { + key := RWKey{} + if len(k) > len(key) { + k = k[:len(key)] + } + copy(key[:], k) + return key +}