diff --git a/vms/platformvm/state/mock_state.go b/vms/platformvm/state/mock_state.go index 99c9d95156d1..201c5ef7c5fd 100644 --- a/vms/platformvm/state/mock_state.go +++ b/vms/platformvm/state/mock_state.go @@ -12,11 +12,13 @@ package state import ( context "context" reflect "reflect" + sync "sync" time "time" database "github.com/ava-labs/avalanchego/database" ids "github.com/ava-labs/avalanchego/ids" validators "github.com/ava-labs/avalanchego/snow/validators" + logging "github.com/ava-labs/avalanchego/utils/logging" avax "github.com/ava-labs/avalanchego/vms/components/avax" block "github.com/ava-labs/avalanchego/vms/platformvm/block" fx "github.com/ava-labs/avalanchego/vms/platformvm/fx" @@ -1562,6 +1564,20 @@ func (mr *MockStateMockRecorder) PutPendingValidator(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutPendingValidator", reflect.TypeOf((*MockState)(nil).PutPendingValidator), arg0) } +// ReindexBlocks mocks base method. +func (m *MockState) ReindexBlocks(arg0 sync.Locker, arg1 logging.Logger) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReindexBlocks", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReindexBlocks indicates an expected call of ReindexBlocks. +func (mr *MockStateMockRecorder) ReindexBlocks(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReindexBlocks", reflect.TypeOf((*MockState)(nil).ReindexBlocks), arg0, arg1) +} + // SetCurrentSupply mocks base method. func (m *MockState) SetCurrentSupply(arg0 ids.ID, arg1 uint64) { m.ctrl.T.Helper() diff --git a/vms/platformvm/state/state.go b/vms/platformvm/state/state.go index 45360a13b764..ccfd6d824159 100644 --- a/vms/platformvm/state/state.go +++ b/vms/platformvm/state/state.go @@ -7,6 +7,8 @@ import ( "context" "errors" "fmt" + "math" + "sync" "time" "github.com/google/btree" @@ -28,6 +30,8 @@ import ( "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/bls" "github.com/ava-labs/avalanchego/utils/hashing" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/timer" "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/avalanchego/vms/components/avax" "github.com/ava-labs/avalanchego/vms/platformvm/block" @@ -42,6 +46,13 @@ import ( safemath "github.com/ava-labs/avalanchego/utils/math" ) +const ( + indexIterationLimit = 4096 + indexIterationSleepMultiplier = 5 + indexIterationSleepCap = 10 * time.Second + indexLogFrequency = 30 * time.Second +) + var ( _ State = (*state)(nil) @@ -69,11 +80,12 @@ var ( ChainPrefix = []byte("chain") SingletonPrefix = []byte("singleton") - TimestampKey = []byte("timestamp") - CurrentSupplyKey = []byte("current supply") - LastAcceptedKey = []byte("last accepted") - HeightsIndexedKey = []byte("heights indexed") - InitializedKey = []byte("initialized") + TimestampKey = []byte("timestamp") + CurrentSupplyKey = []byte("current supply") + LastAcceptedKey = []byte("last accepted") + HeightsIndexedKey = []byte("heights indexed") + InitializedKey = []byte("initialized") + BlocksReindexedKey = []byte("blocks reindexed") ) // Chain collects all methods to manage the state of the chain for block @@ -167,6 +179,14 @@ type State interface { // Discard uncommitted changes to the database. Abort() + // ReindexBlocks converts any block indices using the legacy storage format + // to the new format. If this database has already updated the indices, + // this function will return immediately, without iterating over the + // database. + // + // TODO: Remove after v1.12.x is activated + ReindexBlocks(lock sync.Locker, log logging.Logger) error + // Commit changes to the base database. Commit() error @@ -245,6 +265,7 @@ type stateBlk struct { * | '-- txID -> nil * '-. singletons * |-- initializedKey -> nil + * |-- blocksReindexedKey -> nil * |-- timestampKey -> timestamp * |-- currentSupplyKey -> currentSupply * |-- lastAcceptedKey -> lastAccepted @@ -2292,3 +2313,138 @@ func parseStoredBlock(blkBytes []byte) (block.Block, bool, error) { blk, err = block.Parse(block.GenesisCodec, blkState.Bytes) return blk, true, err } + +func (s *state) ReindexBlocks(lock sync.Locker, log logging.Logger) error { + has, err := s.singletonDB.Has(BlocksReindexedKey) + if err != nil { + return err + } + if has { + log.Info("blocks already reindexed") + return nil + } + + // It is possible that new blocks are added after grabbing this iterator. + // New blocks are guaranteed to be persisted in the new format, so we don't + // need to check them. + blockIterator := s.blockDB.NewIterator() + // Releasing is done using a closure to ensure that updating blockIterator + // will result in having the most recent iterator released when executing + // the deferred function. + defer func() { + blockIterator.Release() + }() + + log.Info("starting block reindexing") + + var ( + startTime = time.Now() + lastCommit = startTime + nextUpdate = startTime.Add(indexLogFrequency) + numIndicesChecked = 0 + numIndicesUpdated = 0 + ) + + for blockIterator.Next() { + valueBytes := blockIterator.Value() + blk, isStateBlk, err := parseStoredBlock(valueBytes) + if err != nil { + return fmt.Errorf("failed to parse block: %w", err) + } + + blkID := blk.ID() + + // This block was previously stored using the legacy format, update the + // index to remove the usage of stateBlk. + if isStateBlk { + blkBytes := blk.Bytes() + if err := s.blockDB.Put(blkID[:], blkBytes); err != nil { + return fmt.Errorf("failed to write block: %w", err) + } + + numIndicesUpdated++ + } + + numIndicesChecked++ + + now := time.Now() + if now.After(nextUpdate) { + nextUpdate = now.Add(indexLogFrequency) + + progress := timer.ProgressFromHash(blkID[:]) + eta := timer.EstimateETA( + startTime, + progress, + math.MaxUint64, + ) + + log.Info("reindexing blocks", + zap.Int("numIndicesUpdated", numIndicesUpdated), + zap.Int("numIndicesChecked", numIndicesChecked), + zap.Duration("eta", eta), + ) + } + + if numIndicesChecked%indexIterationLimit == 0 { + // We must hold the lock during committing to make sure we don't + // attempt to commit to disk while a block is concurrently being + // accepted. + lock.Lock() + err := utils.Err( + s.Commit(), + blockIterator.Error(), + ) + lock.Unlock() + if err != nil { + return err + } + + // We release the iterator here to allow the underlying database to + // clean up deleted state. + blockIterator.Release() + + // We take the minimum here because it's possible that the node is + // currently bootstrapping. This would mean that grabbing the lock + // could take an extremely long period of time; which we should not + // delay processing for. + indexDuration := now.Sub(lastCommit) + sleepDuration := min( + indexIterationSleepMultiplier*indexDuration, + indexIterationSleepCap, + ) + time.Sleep(sleepDuration) + + // Make sure not to include the sleep duration into the next index + // duration. + lastCommit = time.Now() + + blockIterator = s.blockDB.NewIteratorWithStart(blkID[:]) + } + } + + // Ensure we fully iterated over all blocks before writing that indexing has + // finished. + // + // Note: This is needed because a transient read error could cause the + // iterator to stop early. + if err := blockIterator.Error(); err != nil { + return fmt.Errorf("failed to iterate over historical blocks: %w", err) + } + + if err := s.singletonDB.Put(BlocksReindexedKey, nil); err != nil { + return fmt.Errorf("failed to put marked blocks as reindexed: %w", err) + } + + // We must hold the lock during committing to make sure we don't attempt to + // commit to disk while a block is concurrently being accepted. + lock.Lock() + defer lock.Unlock() + + log.Info("finished block reindexing", + zap.Int("numIndicesUpdated", numIndicesUpdated), + zap.Int("numIndicesChecked", numIndicesChecked), + zap.Duration("duration", time.Since(startTime)), + ) + + return s.Commit() +} diff --git a/vms/platformvm/state/state_test.go b/vms/platformvm/state/state_test.go index b0f6afa596f9..abfd1f34feee 100644 --- a/vms/platformvm/state/state_test.go +++ b/vms/platformvm/state/state_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "math" + "sync" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/units" "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/avalanchego/vms/components/avax" @@ -1295,10 +1297,112 @@ func requireEqualPublicKeysValidatorSet( } func TestParsedStateBlock(t *testing.T) { + var ( + require = require.New(t) + blks = makeBlocks(require) + ) + + for _, blk := range blks { + stBlk := stateBlk{ + Bytes: blk.Bytes(), + Status: choices.Accepted, + } + + stBlkBytes, err := block.GenesisCodec.Marshal(block.CodecVersion, &stBlk) + require.NoError(err) + + gotBlk, isStateBlk, err := parseStoredBlock(stBlkBytes) + require.NoError(err) + require.True(isStateBlk) + require.Equal(blk.ID(), gotBlk.ID()) + + gotBlk, isStateBlk, err = parseStoredBlock(blk.Bytes()) + require.NoError(err) + require.False(isStateBlk) + require.Equal(blk.ID(), gotBlk.ID()) + } +} + +func TestReindexBlocks(t *testing.T) { + var ( + require = require.New(t) + s = newInitializedState(require).(*state) + blks = makeBlocks(require) + ) + + // Populate the blocks using the legacy format. + for _, blk := range blks { + stBlk := stateBlk{ + Bytes: blk.Bytes(), + Status: choices.Accepted, + } + stBlkBytes, err := block.GenesisCodec.Marshal(block.CodecVersion, &stBlk) + require.NoError(err) + + blkID := blk.ID() + require.NoError(s.blockDB.Put(blkID[:], stBlkBytes)) + } + + // Convert the indices to the new format. + require.NoError(s.ReindexBlocks(&sync.Mutex{}, logging.NoLog{})) + + // Verify that the blocks are stored in the new format. + for _, blk := range blks { + blkID := blk.ID() + blkBytes, err := s.blockDB.Get(blkID[:]) + require.NoError(err) + + parsedBlk, err := block.Parse(block.GenesisCodec, blkBytes) + require.NoError(err) + require.Equal(blkID, parsedBlk.ID()) + } + + // Verify that the flag has been written to disk to allow skipping future + // reindexings. + reindexed, err := s.singletonDB.Has(BlocksReindexedKey) + require.NoError(err) + require.True(reindexed) +} + +func TestStateSubnetOwner(t *testing.T) { require := require.New(t) - var blks []block.Block + state := newInitializedState(require) + ctrl := gomock.NewController(t) + + var ( + owner1 = fx.NewMockOwner(ctrl) + owner2 = fx.NewMockOwner(ctrl) + + createSubnetTx = &txs.Tx{ + Unsigned: &txs.CreateSubnetTx{ + BaseTx: txs.BaseTx{}, + Owner: owner1, + }, + } + + subnetID = createSubnetTx.ID() + ) + + owner, err := state.GetSubnetOwner(subnetID) + require.ErrorIs(err, database.ErrNotFound) + require.Nil(owner) + + state.AddSubnet(createSubnetTx) + state.SetSubnetOwner(subnetID, owner1) + + owner, err = state.GetSubnetOwner(subnetID) + require.NoError(err) + require.Equal(owner1, owner) + + state.SetSubnetOwner(subnetID, owner2) + owner, err = state.GetSubnetOwner(subnetID) + require.NoError(err) + require.Equal(owner2, owner) +} +func makeBlocks(require *require.Assertions) []block.Block { + var blks []block.Block { blk, err := block.NewApricotAbortBlock(ids.GenerateTestID(), 1000) require.NoError(err) @@ -1382,61 +1486,5 @@ func TestParsedStateBlock(t *testing.T) { require.NoError(err) blks = append(blks, blk) } - - for _, blk := range blks { - stBlk := stateBlk{ - Bytes: blk.Bytes(), - Status: choices.Accepted, - } - - stBlkBytes, err := block.GenesisCodec.Marshal(block.CodecVersion, &stBlk) - require.NoError(err) - - gotBlk, isStateBlk, err := parseStoredBlock(stBlkBytes) - require.NoError(err) - require.True(isStateBlk) - require.Equal(blk.ID(), gotBlk.ID()) - - gotBlk, isStateBlk, err = parseStoredBlock(blk.Bytes()) - require.NoError(err) - require.False(isStateBlk) - require.Equal(blk.ID(), gotBlk.ID()) - } -} - -func TestStateSubnetOwner(t *testing.T) { - require := require.New(t) - - state := newInitializedState(require) - ctrl := gomock.NewController(t) - - var ( - owner1 = fx.NewMockOwner(ctrl) - owner2 = fx.NewMockOwner(ctrl) - - createSubnetTx = &txs.Tx{ - Unsigned: &txs.CreateSubnetTx{ - BaseTx: txs.BaseTx{}, - Owner: owner1, - }, - } - - subnetID = createSubnetTx.ID() - ) - - owner, err := state.GetSubnetOwner(subnetID) - require.ErrorIs(err, database.ErrNotFound) - require.Nil(owner) - - state.AddSubnet(createSubnetTx) - state.SetSubnetOwner(subnetID, owner1) - - owner, err = state.GetSubnetOwner(subnetID) - require.NoError(err) - require.Equal(owner1, owner) - - state.SetSubnetOwner(subnetID, owner2) - owner, err = state.GetSubnetOwner(subnetID) - require.NoError(err) - require.Equal(owner2, owner) + return blks } diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index 39896377aba2..ab698d8711b2 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -245,6 +245,15 @@ func (vm *VM) Initialize( // [periodicallyPruneMempool] grabs the context lock. go vm.periodicallyPruneMempool(execConfig.MempoolPruneFrequency) + go func() { + err := vm.state.ReindexBlocks(&vm.ctx.Lock, vm.ctx.Log) + if err != nil { + vm.ctx.Log.Warn("reindexing blocks failed", + zap.Error(err), + ) + } + }() + return nil }