Skip to content

Commit

Permalink
fix(server/v2/cometbft): wire app closer (#22240)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0b43fcc)

# Conflicts:
#	server/v2/testdata/app.toml
#	server/v2/types.go
#	store/v2/commitment/metadata.go
#	store/v2/root/factory.go
#	store/v2/root/migrate_test.go
#	store/v2/root/store.go
#	store/v2/root/store_test.go
#	store/v2/root/upgrade_test.go
  • Loading branch information
julienrbrt authored and mergify[bot] committed Oct 14, 2024
1 parent ba5966f commit 29ef7b4
Show file tree
Hide file tree
Showing 13 changed files with 2,115 additions and 4 deletions.
3 changes: 3 additions & 0 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Consensus[T transaction.Tx] struct {
logger log.Logger
appName, version string
app *appmanager.AppManager[T]
appCloser func() error
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
Expand Down Expand Up @@ -77,6 +78,7 @@ func NewConsensus[T transaction.Tx](
logger log.Logger,
appName string,
app *appmanager.AppManager[T],
appCloser func() error,
mp mempool.Mempool[T],
indexedEvents map[string]struct{},
queryHandlersMap map[string]appmodulev2.Handler,
Expand All @@ -89,6 +91,7 @@ func NewConsensus[T transaction.Tx](
appName: appName,
version: getCometBFTServerVersion(),
app: app,
appCloser: appCloser,
cfg: cfg,
store: store,
logger: logger,
Expand Down
6 changes: 3 additions & 3 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
am "cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/handlers"
cometmock "cosmossdk.io/server/v2/cometbft/internal/mock"
"cosmossdk.io/server/v2/cometbft/mempool"
Expand Down Expand Up @@ -672,7 +672,7 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
sc := cometmock.NewMockCommiter(log.NewNopLogger(), string(actorName), "stf")
mockStore := cometmock.NewMockStore(ss, sc)

b := am.Builder[mock.Tx]{
b := appmanager.Builder[mock.Tx]{
STF: s,
DB: mockStore,
ValidateTxGasLimit: gasLimit,
Expand All @@ -688,7 +688,7 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
am, err := b.Build()
require.NoError(t, err)

return NewConsensus[mock.Tx](log.NewNopLogger(), "testing-app", am, mempool, map[string]struct{}{}, nil, mockStore, Config{AppTomlConfig: DefaultAppTomlConfig()}, mock.TxCodec{}, "test")
return NewConsensus[mock.Tx](log.NewNopLogger(), "testing-app", am, func() error { return nil }, mempool, map[string]struct{}{}, nil, mockStore, Config{AppTomlConfig: DefaultAppTomlConfig()}, mock.TxCodec{}, "test")
}

// Check target version same with store's latest version
Expand Down
1 change: 1 addition & 0 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg
s.logger,
appI.Name(),
appI.GetAppManager(),
appI.Close,
s.serverOptions.Mempool(cfg),
indexEvents,
appI.GetQueryHandlers(),
Expand Down
51 changes: 51 additions & 0 deletions server/v2/testdata/app.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
[grpc]
# Enable defines if the gRPC server should be enabled.
enable = false
# Address defines the gRPC server address to bind to.
address = 'localhost:9090'
# MaxRecvMsgSize defines the max message size in bytes the server can receive.
# The default value is 10MB.
max-recv-msg-size = 10485760
# MaxSendMsgSize defines the max message size in bytes the server can send.
# The default value is math.MaxInt32.
max-send-msg-size = 2147483647

[mock-server-1]
# Mock field
mock_field = 'default'
# Mock field two
mock_field_two = 1

[server]
# minimum-gas-prices defines the price which a validator is willing to accept for processing a transaction. A transaction's fees must meet the minimum of any denomination specified in this config (e.g. 0.25token1;0.0001token2).
minimum-gas-prices = '0stake'

[store]
# The type of database for application and snapshots databases.
app-db-backend = 'goleveldb'

[store.options]
# State storage database type. Currently we support: "sqlite", "pebble" and "rocksdb"
ss-type = 'sqlite'
# State commitment database type. Currently we support: "iavl" and "iavl-v2"
sc-type = 'iavl'

# Pruning options for state storage
[store.options.ss-pruning-option]
# Number of recent heights to keep on disk.
keep-recent = 2
# Height interval at which pruned heights are removed from disk.
interval = 100

# Pruning options for state commitment
[store.options.sc-pruning-option]
# Number of recent heights to keep on disk.
keep-recent = 2
# Height interval at which pruned heights are removed from disk.
interval = 100

[store.options.iavl-config]
# CacheSize set the size of the iavl tree cache.
cache-size = 100000
# If true, the tree will work like no fast storage and always not upgrade fast storage.
skip-fast-storage-upgrade = true
25 changes: 25 additions & 0 deletions server/v2/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package serverv2

import (
"github.com/spf13/viper"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/server/v2/appmanager"

Check failure on line 11 in server/v2/types.go

View workflow job for this annotation

GitHub Actions / split-test-files

no required module provides package cosmossdk.io/server/v2/appmanager; to add it:

Check failure on line 11 in server/v2/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

no required module provides package cosmossdk.io/server/v2/appmanager; to add it:

Check failure on line 11 in server/v2/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

could not import cosmossdk.io/server/v2/appmanager (invalid package name: "")
"cosmossdk.io/store/v2"

Check failure on line 12 in server/v2/types.go

View workflow job for this annotation

GitHub Actions / split-test-files

no required module provides package cosmossdk.io/store/v2; to add it:

Check failure on line 12 in server/v2/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

no required module provides package cosmossdk.io/store/v2; to add it:

Check failure on line 12 in server/v2/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

could not import cosmossdk.io/store/v2 (invalid package name: "")
)

type AppCreator[T transaction.Tx] func(log.Logger, *viper.Viper) AppI[T]

type AppI[T transaction.Tx] interface {
Name() string
InterfaceRegistry() server.InterfaceRegistry
GetAppManager() *appmanager.AppManager[T]
GetQueryHandlers() map[string]appmodulev2.Handler
GetStore() store.RootStore

Check failure on line 22 in server/v2/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store
GetSchemaDecoderResolver() decoding.DecoderResolver
Close() error
}
10 changes: 10 additions & 0 deletions simapp/v2/app_di.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,16 @@ func (app *SimApp[T]) TxConfig() client.TxConfig {
return app.txConfig
}

// GetStore returns the root store.
func (app *SimApp[T]) GetStore() store.RootStore {
return app.store
}

// Close overwrites the base Close method to close the stores.
func (app *SimApp[T]) Close() error {
if err := app.store.Close(); err != nil {
return err
}

return app.App.Close()
}
174 changes: 174 additions & 0 deletions store/v2/commitment/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package commitment

import (
"bytes"
"errors"
"fmt"

corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2/internal/encoding"

Check failure on line 9 in store/v2/commitment/metadata.go

View workflow job for this annotation

GitHub Actions / split-test-files

no required module provides package cosmossdk.io/store/v2/internal/encoding; to add it:

Check failure on line 9 in store/v2/commitment/metadata.go

View workflow job for this annotation

GitHub Actions / dependency-review

no required module provides package cosmossdk.io/store/v2/internal/encoding; to add it:

Check failure on line 9 in store/v2/commitment/metadata.go

View workflow job for this annotation

GitHub Actions / dependency-review

could not import cosmossdk.io/store/v2/internal/encoding (invalid package name: "")
"cosmossdk.io/store/v2/proof"

Check failure on line 10 in store/v2/commitment/metadata.go

View workflow job for this annotation

GitHub Actions / split-test-files

no required module provides package cosmossdk.io/store/v2/proof; to add it:

Check failure on line 10 in store/v2/commitment/metadata.go

View workflow job for this annotation

GitHub Actions / dependency-review

no required module provides package cosmossdk.io/store/v2/proof; to add it:

Check failure on line 10 in store/v2/commitment/metadata.go

View workflow job for this annotation

GitHub Actions / dependency-review

could not import cosmossdk.io/store/v2/proof (invalid package name: "")
)

const (
commitInfoKeyFmt = "c/%d" // c/<version>
latestVersionKey = "c/latest"
removedStoreKeyPrefix = "c/removed/" // c/removed/<version>/<store-name>
)

// MetadataStore is a store for metadata related to the commitment store.
// It isn't metadata store role to close the underlying KVStore.
type MetadataStore struct {
kv corestore.KVStoreWithBatch
}

// NewMetadataStore creates a new MetadataStore.
func NewMetadataStore(kv corestore.KVStoreWithBatch) *MetadataStore {
return &MetadataStore{
kv: kv,
}
}

// GetLatestVersion returns the latest committed version.
func (m *MetadataStore) GetLatestVersion() (uint64, error) {
value, err := m.kv.Get([]byte(latestVersionKey))
if err != nil {
return 0, err
}
if value == nil {
return 0, nil
}

version, _, err := encoding.DecodeUvarint(value)
if err != nil {
return 0, err
}

return version, nil
}

func (m *MetadataStore) setLatestVersion(version uint64) error {
var buf bytes.Buffer
buf.Grow(encoding.EncodeUvarintSize(version))
if err := encoding.EncodeUvarint(&buf, version); err != nil {
return err
}
return m.kv.Set([]byte(latestVersionKey), buf.Bytes())
}

// GetCommitInfo returns the commit info for the given version.
func (m *MetadataStore) GetCommitInfo(version uint64) (*proof.CommitInfo, error) {
key := []byte(fmt.Sprintf(commitInfoKeyFmt, version))
value, err := m.kv.Get(key)
if err != nil {
return nil, err
}
if value == nil {
return nil, nil
}

cInfo := &proof.CommitInfo{}
if err := cInfo.Unmarshal(value); err != nil {
return nil, err
}

return cInfo, nil
}

func (m *MetadataStore) flushCommitInfo(version uint64, cInfo *proof.CommitInfo) (err error) {
// do nothing if commit info is nil, as will be the case for an empty, initializing store
if cInfo == nil {
return nil
}

batch := m.kv.NewBatch()
defer func() {
err = errors.Join(err, batch.Close())
}()
cInfoKey := []byte(fmt.Sprintf(commitInfoKeyFmt, version))
value, err := cInfo.Marshal()
if err != nil {
return err
}
if err := batch.Set(cInfoKey, value); err != nil {
return err
}

var buf bytes.Buffer
buf.Grow(encoding.EncodeUvarintSize(version))
if err := encoding.EncodeUvarint(&buf, version); err != nil {
return err
}
if err := batch.Set([]byte(latestVersionKey), buf.Bytes()); err != nil {
return err
}

if err := batch.Write(); err != nil {
return err
}
return nil
}

func (m *MetadataStore) flushRemovedStoreKeys(version uint64, storeKeys []string) (err error) {
batch := m.kv.NewBatch()
defer func() {
err = errors.Join(err, batch.Close())
}()

for _, storeKey := range storeKeys {
key := []byte(fmt.Sprintf("%s%s", encoding.BuildPrefixWithVersion(removedStoreKeyPrefix, version), storeKey))
if err := batch.Set(key, []byte{}); err != nil {
return err
}
}
return batch.Write()
}

func (m *MetadataStore) GetRemovedStoreKeys(version uint64) (storeKeys [][]byte, err error) {
end := encoding.BuildPrefixWithVersion(removedStoreKeyPrefix, version+1)
iter, err := m.kv.Iterator([]byte(removedStoreKeyPrefix), end)
if err != nil {
return nil, err
}
defer func() {
if ierr := iter.Close(); ierr != nil {
err = ierr
}
}()

for ; iter.Valid(); iter.Next() {
storeKey := iter.Key()[len(end):]
storeKeys = append(storeKeys, storeKey)
}
return storeKeys, nil
}

func (m *MetadataStore) deleteRemovedStoreKeys(version uint64, removeStore func(storeKey []byte, version uint64) error) (err error) {
removedStoreKeys, err := m.GetRemovedStoreKeys(version)
if err != nil {
return err
}
if len(removedStoreKeys) == 0 {
return nil
}

batch := m.kv.NewBatch()
defer func() {
err = errors.Join(err, batch.Close())
}()
for _, storeKey := range removedStoreKeys {
if err := removeStore(storeKey, version); err != nil {
return err
}
if err := batch.Delete(storeKey); err != nil {
return err
}
}

return batch.Write()
}

func (m *MetadataStore) deleteCommitInfo(version uint64) error {
cInfoKey := []byte(fmt.Sprintf(commitInfoKeyFmt, version))
return m.kv.Delete(cInfoKey)
}
Loading

0 comments on commit 29ef7b4

Please sign in to comment.