From 73b6c60be489b2bc77a0b0fd5c9845ba1aa06010 Mon Sep 17 00:00:00 2001 From: Dhruba Basu <7675102+dhrubabasu@users.noreply.github.com> Date: Fri, 8 Mar 2024 12:14:50 -0500 Subject: [PATCH] [vms/avm] Cleanup `GetTx` + remove state pruning logic (#2826) --- vms/avm/state/mock_state.go | 16 -- vms/avm/state/state.go | 362 +----------------------------------- vms/avm/vm.go | 11 -- 3 files changed, 1 insertion(+), 388 deletions(-) diff --git a/vms/avm/state/mock_state.go b/vms/avm/state/mock_state.go index cb5138c90369..13ede9805d0b 100644 --- a/vms/avm/state/mock_state.go +++ b/vms/avm/state/mock_state.go @@ -11,12 +11,10 @@ package state import ( reflect "reflect" - sync "sync" time "time" database "github.com/ava-labs/avalanchego/database" ids "github.com/ava-labs/avalanchego/ids" - logging "github.com/ava-labs/avalanchego/utils/logging" block "github.com/ava-labs/avalanchego/vms/avm/block" txs "github.com/ava-labs/avalanchego/vms/avm/txs" avax "github.com/ava-labs/avalanchego/vms/components/avax" @@ -464,20 +462,6 @@ func (mr *MockStateMockRecorder) IsInitialized() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsInitialized", reflect.TypeOf((*MockState)(nil).IsInitialized)) } -// Prune mocks base method. -func (m *MockState) Prune(arg0 sync.Locker, arg1 logging.Logger) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Prune", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// Prune indicates an expected call of Prune. -func (mr *MockStateMockRecorder) Prune(arg0, arg1 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Prune", reflect.TypeOf((*MockState)(nil).Prune), arg0, arg1) -} - // SetInitialized mocks base method. func (m *MockState) SetInitialized() error { m.ctrl.T.Helper() diff --git a/vms/avm/state/state.go b/vms/avm/state/state.go index 297a7e76d39a..5005eb3dfc14 100644 --- a/vms/avm/state/state.go +++ b/vms/avm/state/state.go @@ -4,15 +4,10 @@ package state import ( - "bytes" - "errors" "fmt" - "math" - "sync" "time" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/cache/metercacher" @@ -20,30 +15,20 @@ import ( "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/database/versiondb" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/utils" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/timer" "github.com/ava-labs/avalanchego/vms/avm/block" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/components/avax" ) const ( - statusCacheSize = 8192 txCacheSize = 8192 blockIDCacheSize = 8192 blockCacheSize = 2048 - - pruneCommitLimit = 1024 - pruneCommitSleepMultiplier = 5 - pruneCommitSleepCap = 10 * time.Second - pruneUpdateFrequency = 30 * time.Second ) var ( utxoPrefix = []byte("utxo") - statusPrefix = []byte("status") txPrefix = []byte("tx") blockIDPrefix = []byte("blockID") blockPrefix = []byte("block") @@ -53,8 +38,6 @@ var ( timestampKey = []byte{0x01} lastAcceptedKey = []byte{0x02} - errStatusWithoutTx = errors.New("unexpected status without transactions") - _ State = (*state)(nil) ) @@ -106,19 +89,6 @@ type State interface { // pending changes to the base database. CommitBatch() (database.Batch, error) - // Asynchronously removes unneeded state from disk. - // - // Specifically, this removes: - // - All transaction statuses - // - All non-accepted transactions - // - All UTXOs that were consumed by accepted transactions - // - // [lock] is the AVM's context lock and is assumed to be unlocked when this - // method is called. - // - // TODO: remove after v1.11.x is activated - Prune(lock sync.Locker, log logging.Logger) error - // Checksums returns the current TxChecksum and UTXOChecksum. Checksums() (txChecksum ids.ID, utxoChecksum ids.ID) @@ -129,8 +99,6 @@ type State interface { * VMDB * |- utxos * | '-- utxoDB - * |- statuses - * | '-- statusDB * |-. txs * | '-- txID -> tx bytes * |-. blockIDs @@ -150,10 +118,6 @@ type state struct { utxoDB database.Database utxoState avax.UTXOState - statusesPruned bool - statusCache cache.Cacher[ids.ID, *choices.Status] // cache of id -> choices.Status. If the entry is nil, it is not in the database - statusDB database.Database - addedTxs map[ids.ID]*txs.Tx // map of txID -> *txs.Tx txCache cache.Cacher[ids.ID, *txs.Tx] // cache of txID -> *txs.Tx. If the entry is nil, it is not in the database txDB database.Database @@ -182,21 +146,11 @@ func New( trackChecksums bool, ) (State, error) { utxoDB := prefixdb.New(utxoPrefix, db) - statusDB := prefixdb.New(statusPrefix, db) txDB := prefixdb.New(txPrefix, db) blockIDDB := prefixdb.New(blockIDPrefix, db) blockDB := prefixdb.New(blockPrefix, db) singletonDB := prefixdb.New(singletonPrefix, db) - statusCache, err := metercacher.New[ids.ID, *choices.Status]( - "status_cache", - metrics, - &cache.LRU[ids.ID, *choices.Status]{Size: statusCacheSize}, - ) - if err != nil { - return nil, err - } - txCache, err := metercacher.New[ids.ID, *txs.Tx]( "tx_cache", metrics, @@ -237,9 +191,6 @@ func New( utxoDB: utxoDB, utxoState: utxoState, - statusCache: statusCache, - statusDB: statusDB, - addedTxs: make(map[ids.ID]*txs.Tx), txCache: txCache, txDB: txDB, @@ -281,69 +232,7 @@ func (s *state) DeleteUTXO(utxoID ids.ID) { s.modifiedUTXOs[utxoID] = nil } -// TODO: After v1.11.x has activated we can rename [getTx] to [GetTx] and delete -// [getStatus]. func (s *state) GetTx(txID ids.ID) (*txs.Tx, error) { - tx, err := s.getTx(txID) - if err != nil { - return nil, err - } - - // Before the linearization, transactions were persisted before they were - // marked as Accepted. However, this function aims to only return accepted - // transactions. - status, err := s.getStatus(txID) - if err == database.ErrNotFound { - // If the status wasn't persisted, then the transaction was written - // after the linearization, and is accepted. - return tx, nil - } - if err != nil { - return nil, err - } - - // If the status was persisted, then the transaction was written before the - // linearization. If it wasn't marked as accepted, then we treat it as if it - // doesn't exist. - if status != choices.Accepted { - return nil, database.ErrNotFound - } - return tx, nil -} - -func (s *state) getStatus(id ids.ID) (choices.Status, error) { - if s.statusesPruned { - return choices.Unknown, database.ErrNotFound - } - - if _, ok := s.addedTxs[id]; ok { - return choices.Unknown, database.ErrNotFound - } - if status, found := s.statusCache.Get(id); found { - if status == nil { - return choices.Unknown, database.ErrNotFound - } - return *status, nil - } - - val, err := database.GetUInt32(s.statusDB, id[:]) - if err == database.ErrNotFound { - s.statusCache.Put(id, nil) - return choices.Unknown, database.ErrNotFound - } - if err != nil { - return choices.Unknown, err - } - - status := choices.Status(val) - if err := status.Valid(); err != nil { - return choices.Unknown, err - } - s.statusCache.Put(id, &status) - return status, nil -} - -func (s *state) getTx(txID ids.ID) (*txs.Tx, error) { if tx, exists := s.addedTxs[txID]; exists { return tx, nil } @@ -521,7 +410,6 @@ func (s *state) CommitBatch() (database.Batch, error) { func (s *state) Close() error { return utils.Err( s.utxoDB.Close(), - s.statusDB.Close(), s.txDB.Close(), s.blockIDDB.Close(), s.blockDB.Close(), @@ -564,13 +452,9 @@ func (s *state) writeTxs() error { delete(s.addedTxs, txID) s.txCache.Put(txID, tx) - s.statusCache.Put(txID, nil) if err := s.txDB.Put(txID[:], txBytes); err != nil { return fmt.Errorf("failed to add tx: %w", err) } - if err := s.statusDB.Delete(txID[:]); err != nil { - return fmt.Errorf("failed to delete status: %w", err) - } } return nil } @@ -618,225 +502,6 @@ func (s *state) writeMetadata() error { return nil } -func (s *state) Prune(lock sync.Locker, log logging.Logger) error { - lock.Lock() - // It is possible that more txs are added after grabbing this iterator. No - // new txs will write a status, so we don't need to check those txs. - statusIter := s.statusDB.NewIterator() - // Releasing is done using a closure to ensure that updating statusIter will - // result in having the most recent iterator released when executing the - // deferred function. - defer func() { - statusIter.Release() - }() - - if !statusIter.Next() { - // If there are no statuses on disk, pruning was previously run and - // finished. - lock.Unlock() - - log.Info("state already pruned") - - return statusIter.Error() - } - - startTxIDBytes := statusIter.Key() - txIter := s.txDB.NewIteratorWithStart(startTxIDBytes) - // Releasing is done using a closure to ensure that updating statusIter will - // result in having the most recent iterator released when executing the - // deferred function. - defer func() { - txIter.Release() - }() - - // While we are pruning the disk, we disable caching of the data we are - // modifying. Caching is re-enabled when pruning finishes. - // - // Note: If an unexpected error occurs the caches are never re-enabled. - // That's fine as the node is going to be in an unhealthy state regardless. - oldTxCache := s.txCache - s.statusCache = &cache.Empty[ids.ID, *choices.Status]{} - s.txCache = &cache.Empty[ids.ID, *txs.Tx]{} - lock.Unlock() - - startTime := time.Now() - lastCommit := startTime - lastUpdate := startTime - startProgress := timer.ProgressFromHash(startTxIDBytes) - - startStatusBytes := statusIter.Value() - if err := s.cleanupTx(lock, startTxIDBytes, startStatusBytes, txIter); err != nil { - return err - } - - numPruned := 1 - for statusIter.Next() { - txIDBytes := statusIter.Key() - statusBytes := statusIter.Value() - if err := s.cleanupTx(lock, txIDBytes, statusBytes, txIter); err != nil { - return err - } - - numPruned++ - - if numPruned%pruneCommitLimit == 0 { - // We must hold the lock during committing to make sure we don't - // attempt to commit to disk while a block is concurrently being - // accepted. - lock.Lock() - err := utils.Err( - s.Commit(), - statusIter.Error(), - txIter.Error(), - ) - lock.Unlock() - if err != nil { - return err - } - - // We release the iterators here to allow the underlying database to - // clean up deleted state. - statusIter.Release() - txIter.Release() - - now := time.Now() - if now.Sub(lastUpdate) > pruneUpdateFrequency { - lastUpdate = now - - progress := timer.ProgressFromHash(txIDBytes) - eta := timer.EstimateETA( - startTime, - progress-startProgress, - math.MaxUint64-startProgress, - ) - log.Info("committing state pruning", - zap.Int("numPruned", numPruned), - zap.Duration("eta", eta), - ) - } - - // We take the minimum here because it's possible that the node is - // currently bootstrapping. This would mean that grabbing the lock - // could take an extremely long period of time; which we should not - // delay processing for. - pruneDuration := now.Sub(lastCommit) - sleepDuration := min( - pruneCommitSleepMultiplier*pruneDuration, - pruneCommitSleepCap, - ) - time.Sleep(sleepDuration) - - // Make sure not to include the sleep duration into the next prune - // duration. - lastCommit = time.Now() - - // We shouldn't need to grab the lock here, but doing so ensures - // that we see a consistent view across both the statusDB and the - // txDB. - lock.Lock() - statusIter = s.statusDB.NewIteratorWithStart(txIDBytes) - txIter = s.txDB.NewIteratorWithStart(txIDBytes) - lock.Unlock() - } - } - - lock.Lock() - defer lock.Unlock() - - err := utils.Err( - s.Commit(), - statusIter.Error(), - txIter.Error(), - ) - - // Make sure we flush the original cache before re-enabling it to prevent - // surfacing any stale data. - oldTxCache.Flush() - s.statusesPruned = true - s.txCache = oldTxCache - - log.Info("finished state pruning", - zap.Int("numPruned", numPruned), - zap.Duration("duration", time.Since(startTime)), - ) - - return err -} - -// Assumes [lock] is unlocked. -func (s *state) cleanupTx(lock sync.Locker, txIDBytes []byte, statusBytes []byte, txIter database.Iterator) error { - // After the linearization, we write txs to disk without statuses to mark - // them as accepted. This means that there may be more txs than statuses and - // we need to skip over them. - // - // Note: We do not need to remove UTXOs consumed after the linearization, as - // those UTXOs are guaranteed to have already been deleted. - if err := skipTo(txIter, txIDBytes); err != nil { - return err - } - // txIter.Key() is now `txIDBytes` - - statusInt, err := database.ParseUInt32(statusBytes) - if err != nil { - return err - } - status := choices.Status(statusInt) - - if status == choices.Accepted { - txBytes := txIter.Value() - tx, err := s.parser.ParseGenesisTx(txBytes) - if err != nil { - return err - } - - utxos := tx.Unsigned.InputUTXOs() - - // Locking is done here to make sure that any concurrent verification is - // performed with a valid view of the state. - lock.Lock() - defer lock.Unlock() - - // Remove all the UTXOs consumed by the accepted tx. Technically we only - // need to remove UTXOs consumed by operations, but it's easy to just - // remove all of them. - for _, UTXO := range utxos { - if err := s.utxoState.DeleteUTXO(UTXO.InputID()); err != nil { - return err - } - } - } else { - lock.Lock() - defer lock.Unlock() - - // This tx wasn't accepted, so we can remove it entirely from disk. - if err := s.txDB.Delete(txIDBytes); err != nil { - return err - } - } - // By removing the status, we will treat the tx as accepted if it is still - // on disk. - return s.statusDB.Delete(txIDBytes) -} - -// skipTo advances [iter] until its key is equal to [targetKey]. If [iter] does -// not contain [targetKey] an error will be returned. -// -// Note: [iter.Next()] will always be called at least once. -func skipTo(iter database.Iterator, targetKey []byte) error { - for { - if !iter.Next() { - return fmt.Errorf("%w: 0x%x", database.ErrNotFound, targetKey) - } - key := iter.Key() - switch bytes.Compare(targetKey, key) { - case -1: - return fmt.Errorf("%w: 0x%x", database.ErrNotFound, targetKey) - case 0: - return nil - } - } -} - func (s *state) Checksums() (ids.ID, ids.ID) { return s.txChecksum, s.utxoState.Checksum() } @@ -848,27 +513,9 @@ func (s *state) initTxChecksum() error { txIt := s.txDB.NewIterator() defer txIt.Release() - statusIt := s.statusDB.NewIterator() - defer statusIt.Release() - statusHasNext := statusIt.Next() for txIt.Next() { txIDBytes := txIt.Key() - if statusHasNext { // if status was exhausted, everything is accepted - statusIDBytes := statusIt.Key() - if bytes.Equal(txIDBytes, statusIDBytes) { // if the status key doesn't match this was marked as accepted - statusInt, err := database.ParseUInt32(statusIt.Value()) - if err != nil { - return err - } - - statusHasNext = statusIt.Next() // we processed the txID, so move on to the next status - - if choices.Status(statusInt) != choices.Accepted { // the status isn't accepted, so we skip the txID - continue - } - } - } txID, err := ids.ToID(txIDBytes) if err != nil { @@ -878,14 +525,7 @@ func (s *state) initTxChecksum() error { s.updateTxChecksum(txID) } - if statusHasNext { - return errStatusWithoutTx - } - - return utils.Err( - txIt.Error(), - statusIt.Error(), - ) + return txIt.Error() } func (s *state) updateTxChecksum(modifiedID ids.ID) { diff --git a/vms/avm/vm.go b/vms/avm/vm.go index f401398f0a24..022be327c1e1 100644 --- a/vms/avm/vm.go +++ b/vms/avm/vm.go @@ -473,17 +473,6 @@ func (vm *VM) Linearize(ctx context.Context, stopVertexID ids.ID, toEngine chan< vm.network.PullGossip(vm.onShutdownCtx) }() - go func() { - err := vm.state.Prune(&vm.ctx.Lock, vm.ctx.Log) - if err != nil { - vm.ctx.Log.Warn("state pruning failed", - zap.Error(err), - ) - return - } - vm.ctx.Log.Info("state pruning finished") - }() - return nil }