Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(blockstore)!: Cache more in blockstore, which speedsup the gossip routines #3342 #116

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[blockstore]` Use LRU caches for LoadBlockPart. Make the LoadBlockPart and LoadBlockCommit APIs
return mutative copies, that the caller is expected to not modify. This saves on memory copying.
([\#3342](https://github.com/cometbft/cometbft/issues/3342))
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## v0.37.4-v25-osmo-10

* [#115](https://github.com/osmosis-labs/cometbft/pull/115) perf(p2p/secretconn): Buffer secret connection writes (#3346)

* [#116](https://github.com/osmosis-labs/cometbft/pull/116) perf(blockstore)!: Cache more in blockstore, which speedsup the gossip routines (#3342)

## v0.37.4-v25-osmo-9

Expand Down
9 changes: 5 additions & 4 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ type ReactorOption func(*Reactor)
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
initialHeight: consensusState.state.InitialHeight,
Metrics: NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

Expand Down
2 changes: 1 addition & 1 deletion evidence/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func getSignedHeader(blockStore BlockStore, height int64) (*types.SignedHeader,
if blockMeta == nil {
return nil, fmt.Errorf("don't have header at height #%d", height)
}
commit := blockStore.LoadBlockCommit(height)
commit := blockStore.LoadBlockCommit(height).Clone()
if commit == nil {
return nil, fmt.Errorf("don't have commit at height #%d", height)
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/conn/secret_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
labelDHSecret = "DH_SECRET"
labelSecretConnectionMac = "SECRET_CONNECTION_MAC"

defaultWriteBufferSize = 1024 * 1024
defaultWriteBufferSize = 128 * 1024
defaultReadBufferSize = 65536
)

Expand Down
2 changes: 1 addition & 1 deletion rpc/core/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro
}

// Return the canonical commit (comes from the block at height+1)
commit := env.BlockStore.LoadBlockCommit(height)
commit := env.BlockStore.LoadBlockCommit(height).Clone()
return ctypes.NewResultCommit(&header, commit, true), nil
}

Expand Down
30 changes: 26 additions & 4 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type BlockStore struct {

seenCommitCache *lru.Cache[int64, *types.Commit]
blockCommitCache *lru.Cache[int64, *types.Commit]
blockPartCache *lru.Cache[blockPartIndex, *types.Part]
}

// NewBlockStore returns a new BlockStore with the given DB,
Expand All @@ -72,6 +73,10 @@ func (bs *BlockStore) addCaches() {
if err != nil {
panic(err)
}
bs.blockPartCache, err = lru.New[blockPartIndex, *types.Part](500)
if err != nil {
panic(err)
}
}

func (bs *BlockStore) IsEmpty() bool {
Expand Down Expand Up @@ -168,10 +173,20 @@ func (bs *BlockStore) LoadBlockByHash(hash []byte) *types.Block {
return bs.LoadBlock(height)
}

type blockPartIndex struct {
height int64
index int
}

// LoadBlockPart returns the Part at the given index
// from the block at the given height.
// If no part is found for the given height and index, it returns nil.
// The returned part should not be modified by the caller. Take a copy if you need to modify it.
func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part {
part, ok := bs.blockPartCache.Get(blockPartIndex{height, index})
if ok {
return part
}
pbpart := new(cmtproto.Part)

bz, err := bs.db.Get(calcBlockPartKey(height, index))
Expand All @@ -186,11 +201,11 @@ func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part {
if err != nil {
panic(fmt.Errorf("unmarshal to cmtproto.Part failed: %w", err))
}
part, err := types.PartFromProto(pbpart)
part, err = types.PartFromProto(pbpart)
if err != nil {
panic(fmt.Sprintf("Error reading block part: %v", err))
}

bs.blockPartCache.Add(blockPartIndex{height, index}, part)
return part
}

Expand Down Expand Up @@ -243,10 +258,13 @@ func (bs *BlockStore) LoadBlockMetaByHash(hash []byte) *types.BlockMeta {
// This commit consists of the +2/3 and other Precommit-votes for block at `height`,
// and it comes from the block.LastCommit for `height+1`.
// If no commit is found for the given height, it returns nil.
//
// This return value should not be modified. If you need to modify it,
// do bs.LoadBlockCommit(height).Clone().
func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit {
comm, ok := bs.blockCommitCache.Get(height)
if ok {
return comm.Clone()
return comm
}
pbc := new(cmtproto.Commit)
bz, err := bs.db.Get(calcBlockCommitKey(height))
Expand All @@ -265,7 +283,7 @@ func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit {
panic(fmt.Sprintf("Error reading block commit: %v", err))
}
bs.blockCommitCache.Add(height, commit)
return commit.Clone()
return commit
}

// LoadSeenCommit returns the locally seen Commit for the given height.
Expand Down Expand Up @@ -347,13 +365,16 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) {
if err := batch.Delete(calcBlockCommitKey(h)); err != nil {
return 0, err
}
bs.blockCommitCache.Remove(h)
if err := batch.Delete(calcSeenCommitKey(h)); err != nil {
return 0, err
}
bs.seenCommitCache.Remove(h)
for p := 0; p < int(meta.BlockID.PartSetHeader.Total); p++ {
if err := batch.Delete(calcBlockPartKey(h, p)); err != nil {
return 0, err
}
bs.blockPartCache.Remove(blockPartIndex{h, p})
}
pruned++

Expand Down Expand Up @@ -404,6 +425,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
bs.saveBlockPart(height, i, part)
bs.blockPartCache.Add(blockPartIndex{height, i}, part)
}

// Save block meta
Expand Down
Loading