Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support proposervm historical block deletion #1929

Merged
merged 16 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,14 +751,19 @@ func (m *manager) createAvalancheChain(
}

// Initialize the ProposerVM and the vm wrapped inside it
minBlockDelay := proposervm.DefaultMinBlockDelay
var (
minBlockDelay = proposervm.DefaultMinBlockDelay
numHistoricalBlocks = proposervm.DefaultNumHistoricalBlocks
)
if subnetCfg, ok := m.SubnetConfigs[ctx.SubnetID]; ok {
minBlockDelay = subnetCfg.ProposerMinBlockDelay
numHistoricalBlocks = subnetCfg.ProposerNumHistoricalBlocks
}
m.Log.Info("creating proposervm wrapper",
zap.Time("activationTime", m.ApricotPhase4Time),
zap.Uint64("minPChainHeight", m.ApricotPhase4MinPChainHeight),
zap.Duration("minBlockDelay", minBlockDelay),
zap.Uint64("numHistoricalBlocks", numHistoricalBlocks),
)

chainAlias := m.PrimaryAliasOrDefault(ctx.ChainID)
Expand All @@ -778,6 +783,7 @@ func (m *manager) createAvalancheChain(
m.ApricotPhase4Time,
m.ApricotPhase4MinPChainHeight,
minBlockDelay,
numHistoricalBlocks,
m.stakingSigner,
m.stakingCert,
)
Expand Down Expand Up @@ -1100,14 +1106,19 @@ func (m *manager) createSnowmanChain(
return nil, fmt.Errorf("error while fetching chain config: %w", err)
}

minBlockDelay := proposervm.DefaultMinBlockDelay
var (
minBlockDelay = proposervm.DefaultMinBlockDelay
numHistoricalBlocks = proposervm.DefaultNumHistoricalBlocks
)
if subnetCfg, ok := m.SubnetConfigs[ctx.SubnetID]; ok {
minBlockDelay = subnetCfg.ProposerMinBlockDelay
numHistoricalBlocks = subnetCfg.ProposerNumHistoricalBlocks
}
m.Log.Info("creating proposervm wrapper",
zap.Time("activationTime", m.ApricotPhase4Time),
zap.Uint64("minPChainHeight", m.ApricotPhase4MinPChainHeight),
zap.Duration("minBlockDelay", minBlockDelay),
zap.Uint64("numHistoricalBlocks", numHistoricalBlocks),
)

chainAlias := m.PrimaryAliasOrDefault(ctx.ChainID)
Expand All @@ -1120,6 +1131,7 @@ func (m *manager) createSnowmanChain(
m.ApricotPhase4Time,
m.ApricotPhase4MinPChainHeight,
minBlockDelay,
numHistoricalBlocks,
m.stakingSigner,
m.stakingCert,
)
Expand Down
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,10 +1161,11 @@ func getSubnetConfigsFromDir(v *viper.Viper, subnetIDs []ids.ID) (map[ids.ID]sub

func getDefaultSubnetConfig(v *viper.Viper) subnets.Config {
return subnets.Config{
ConsensusParameters: getConsensusConfig(v),
ValidatorOnly: false,
GossipConfig: getGossipConfig(v),
ProposerMinBlockDelay: proposervm.DefaultMinBlockDelay,
ConsensusParameters: getConsensusConfig(v),
ValidatorOnly: false,
GossipConfig: getGossipConfig(v),
ProposerMinBlockDelay: proposervm.DefaultMinBlockDelay,
ProposerNumHistoricalBlocks: proposervm.DefaultNumHistoricalBlocks,
}
}

Expand Down
19 changes: 19 additions & 0 deletions subnets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,27 @@ type Config struct {

// ProposerMinBlockDelay is the minimum delay this node will enforce when
// building a snowman++ block.
//
// TODO: Remove this flag once all VMs throttle their own block production.
ProposerMinBlockDelay time.Duration `json:"proposerMinBlockDelay" yaml:"proposerMinBlockDelay"`
// ProposerNumHistoricalBlocks is the number of historical snowman++ blocks
// this node will index per chain. If set to 0, the node will index all
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
// snowman++ blocks.
//
// Note: The last accepted block is not considered a historical block. This
// prevents the user from only storing the last accepted block, which can
// never be safe due to the non-atomic commits between the proposervm
// database and the innerVM's database.
//
// Invariant: This value must be set such that the proposervm never needs to
// rollback more blocks than have been deleted. On startup, the proposervm
// rolls back its accepted chain to match the innerVM's accepted chain. If
// the innerVM is not persisting its last accepted block quickly enough, the
// database can become corrupted.
abi87 marked this conversation as resolved.
Show resolved Hide resolved
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
//
// TODO: Move this flag once the proposervm is configurable on a per-chain
// basis.
ProposerNumHistoricalBlocks uint64 `json:"proposerNumHistoricalBlocks" yaml:"proposerNumHistoricalBlocks"`
}

func (c *Config) Valid() error {
Expand Down
1 change: 1 addition & 0 deletions vms/proposervm/batched_vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,7 @@ func initTestRemoteProposerVM(
proBlkStartTime,
0,
DefaultMinBlockDelay,
DefaultNumHistoricalBlocks,
pTestSigner,
pTestCert,
)
Expand Down
100 changes: 98 additions & 2 deletions vms/proposervm/height_indexed_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
)

const pruneCommitPeriod = 1024

// shouldHeightIndexBeRepaired checks if index needs repairing and stores a
// checkpoint if repairing is needed.
//
Expand Down Expand Up @@ -109,7 +111,8 @@ func (vm *VM) updateHeightIndex(height uint64, blkID ids.ID) error {
}

func (vm *VM) storeHeightEntry(height uint64, blkID ids.ID) error {
switch _, err := vm.State.GetForkHeight(); err {
forkHeight, err := vm.State.GetForkHeight()
switch err {
case nil:
// The fork was already reached. Just update the index.

Expand All @@ -118,14 +121,107 @@ func (vm *VM) storeHeightEntry(height uint64, blkID ids.ID) error {
if err := vm.State.SetForkHeight(height); err != nil {
return fmt.Errorf("failed storing fork height: %w", err)
}
forkHeight = height

default:
return fmt.Errorf("failed to load fork height: %w", err)
}

if err := vm.State.SetBlockIDAtHeight(height, blkID); err != nil {
return err
}

vm.ctx.Log.Debug("indexed block",
zap.Stringer("blkID", blkID),
zap.Uint64("height", height),
)
return vm.State.SetBlockIDAtHeight(height, blkID)

if vm.numHistoricalBlocks == 0 {
return nil
}

blocksSinceFork := height - forkHeight
// Note: The last accepted block is not considered a historical block. Which
// is why <= is used rather than <. This prevents the user from only storing
// the last accepted block, which can never be safe due to the non-atomic
// commits between the proposervm database and the innerVM's database.
if blocksSinceFork <= vm.numHistoricalBlocks {
return nil
}

// Note: heightToDelete is >= forkHeight, so it is guaranteed not to
// underflow.
heightToDelete := height - vm.numHistoricalBlocks - 1
blockToDelete, err := vm.State.GetBlockIDAtHeight(heightToDelete)
if err == database.ErrNotFound {
// Block may have already been deleted. This can happen due to a
abi87 marked this conversation as resolved.
Show resolved Hide resolved
// proposervm rollback, the node having recently state-synced, or the
// user reconfiguring the node to store more historical blocks than a
// prior run.
return nil
}
if err != nil {
return err
}

if err := vm.State.DeleteBlockIDAtHeight(heightToDelete); err != nil {
return err
}
if err := vm.State.DeleteBlock(blockToDelete); err != nil {
return err
}

vm.ctx.Log.Debug("deleted block",
zap.Stringer("blkID", blockToDelete),
zap.Uint64("height", heightToDelete),
)
return nil
}

// TODO: Support async deletion of old blocks.
func (vm *VM) pruneOldBlocks() error {
if vm.numHistoricalBlocks == 0 {
return nil
}

height, err := vm.State.GetMinimumHeight()
if err == database.ErrNotFound {
// Chain hasn't forked yet
return nil
}

// TODO: Refactor to use DB iterators.
//
// Note: vm.lastAcceptedHeight is guaranteed to be >= height, so the
// subtraction can never underflow.
for vm.lastAcceptedHeight-height > vm.numHistoricalBlocks {
blockToDelete, err := vm.State.GetBlockIDAtHeight(height)
if err != nil {
return err
}

if err := vm.State.DeleteBlockIDAtHeight(height); err != nil {
return err
}
if err := vm.State.DeleteBlock(blockToDelete); err != nil {
return err
}

vm.ctx.Log.Debug("deleted block",
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
zap.Stringer("blkID", blockToDelete),
zap.Uint64("height", height),
)

// Note: height is < vm.lastAcceptedHeight, so it is guaranteed not to
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
// overflow.
height++
if height%pruneCommitPeriod != 0 {
continue
}

if err := vm.db.Commit(); err != nil {
return err
}
}
return vm.db.Commit()
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions vms/proposervm/post_fork_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ func TestOptionTimestampValidity(t *testing.T) {
time.Time{},
0,
DefaultMinBlockDelay,
DefaultNumHistoricalBlocks,
pTestSigner,
pTestCert,
)
Expand Down
27 changes: 26 additions & 1 deletion vms/proposervm/state/block_height_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ var (
)

type HeightIndexGetter interface {
// GetMinimumHeight return the smallest height of an indexed blockID. If
// there are no indexed blockIDs, ErrNotFound will be returned.
GetMinimumHeight() (uint64, error)
abi87 marked this conversation as resolved.
Show resolved Hide resolved
GetBlockIDAtHeight(height uint64) (ids.ID, error)

// Fork height is stored when the first post-fork block/option is accepted.
Expand All @@ -32,8 +35,9 @@ type HeightIndexGetter interface {
}

type HeightIndexWriter interface {
SetBlockIDAtHeight(height uint64, blkID ids.ID) error
SetForkHeight(height uint64) error
SetBlockIDAtHeight(height uint64, blkID ids.ID) error
DeleteBlockIDAtHeight(height uint64) error
}

// A checkpoint is the blockID of the next block to be considered
Expand Down Expand Up @@ -75,6 +79,21 @@ func NewHeightIndex(db database.Database, commitable versiondb.Commitable) Heigh
}
}

func (hi *heightIndex) GetMinimumHeight() (uint64, error) {
it := hi.heightDB.NewIterator()
defer it.Release()

if !it.Next() {
return 0, database.ErrNotFound
}

height, err := database.ParseUInt64(it.Key())
if err != nil {
return 0, err
}
return height, it.Error()
}

func (hi *heightIndex) GetBlockIDAtHeight(height uint64) (ids.ID, error) {
if blkID, found := hi.heightsCache.Get(height); found {
return blkID, nil
Expand All @@ -95,6 +114,12 @@ func (hi *heightIndex) SetBlockIDAtHeight(height uint64, blkID ids.ID) error {
return database.PutID(hi.heightDB, key, blkID)
}

func (hi *heightIndex) DeleteBlockIDAtHeight(height uint64) error {
hi.heightsCache.Evict(height)
key := database.PackUInt64(height)
return hi.heightDB.Delete(key)
}

func (hi *heightIndex) GetForkHeight() (uint64, error) {
return database.GetUInt64(hi.metadataDB, forkKey)
}
Expand Down
6 changes: 6 additions & 0 deletions vms/proposervm/state/block_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
type BlockState interface {
GetBlock(blkID ids.ID) (block.Block, choices.Status, error)
PutBlock(blk block.Block, status choices.Status) error
DeleteBlock(blkID ids.ID) error
}

type blockState struct {
Expand Down Expand Up @@ -134,3 +135,8 @@ func (s *blockState) PutBlock(blk block.Block, status choices.Status) error {
s.blkCache.Put(blkID, &blkWrapper)
return s.db.Put(blkID[:], bytes)
}

func (s *blockState) DeleteBlock(blkID ids.ID) error {
s.blkCache.Evict(blkID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we s.blkCache.Put(blkID, nil) here rather than evicting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete seems fine with me.
IIRC we peek historical blocks only via GetAncestors call which should stop at the first database.ErrNotFound. I'd rather use the cache for fresh blocks, rather than historical ones

return s.db.Delete(blkID[:])
}
43 changes: 43 additions & 0 deletions vms/proposervm/state/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vms/proposervm/state_syncable_vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func helperBuildStateSyncTestObjects(t *testing.T) (*fullVM, *VM) {
time.Time{},
0,
DefaultMinBlockDelay,
DefaultNumHistoricalBlocks,
pTestSigner,
pTestCert,
)
Expand Down
Loading
Loading