diff --git a/CHANGELOG.md b/CHANGELOG.md index bee7ce9c04ab..7d2e4e89cc19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,8 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [#13881](https://github.com/cosmos/cosmos-sdk/pull/13881) Optimize iteration on nested cached KV stores and other operations in general. * (x/gov) [#14347](https://github.com/cosmos/cosmos-sdk/pull/14347) Support `v1.Proposal` message in `v1beta1.Proposal.Content`. * (x/gov) [#14390](https://github.com/cosmos/cosmos-sdk/pull/14390) Add title, proposer and summary to proposal struct +* (baseapp) [#14417](https://github.com/cosmos/cosmos-sdk/pull/14417) `SetStreamingService` accepts appOptions, AppCodec and Storekeys needed to set streamers. + * Store pacakge no longer has a dependency on baseapp. ### State Machine Breaking diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 1e95dac964ee..1115954500e0 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -141,7 +141,7 @@ type BaseApp struct { //nolint: maligned // abciListeners for hooking into the ABCI message processing of the BaseApp // and exposing the requests and responses to external consumers - abciListeners []ABCIListener + abciListeners []storetypes.ABCIListener } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a diff --git a/baseapp/options.go b/baseapp/options.go index 90e58973fe89..a98992398d7f 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -7,10 +7,13 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/codec/types" + servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store" pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types" "github.com/cosmos/cosmos-sdk/store/snapshots" snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types" + "github.com/cosmos/cosmos-sdk/store/streaming" + storetypes "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/mempool" ) @@ -232,14 +235,24 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) { } // SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore -func (app *BaseApp) SetStreamingService(s StreamingService) { +func (app *BaseApp) SetStreamingService( + appOpts servertypes.AppOptions, + appCodec storetypes.Codec, + keys map[string]*storetypes.KVStoreKey) error { + streamers, _, err := streaming.LoadStreamingServices(appOpts, appCodec, app.logger, keys) + if err != nil { + return err + } // add the listeners for each StoreKey - for key, lis := range s.Listeners() { - app.cms.AddListeners(key, lis) + for _, streamer := range streamers { + for key, lis := range streamer.Listeners() { + app.cms.AddListeners(key, lis) + } + // register the StreamingService within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context + app.abciListeners = append(app.abciListeners, streamer) } - // register the StreamingService within the BaseApp - // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context - app.abciListeners = append(app.abciListeners, s) + return nil } // SetTxDecoder sets the TxDecoder if it wasn't provided in the BaseApp constructor. diff --git a/simapp/app.go b/simapp/app.go index cc7dee35e80e..b5fa910b1bcd 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -31,7 +31,6 @@ import ( "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/std" - "github.com/cosmos/cosmos-sdk/store/streaming" storetypes "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/testutil/testdata_pulsar" sdk "github.com/cosmos/cosmos-sdk/types" @@ -262,8 +261,8 @@ func NewSimApp( // not include this key. memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey") - // load state streaming if enabled - if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, logger, keys); err != nil { + // register the streaming service with the BaseApp + if err := bApp.SetStreamingService(appOpts, appCodec, keys); err != nil { logger.Error("failed to load state streaming", "err", err) os.Exit(1) } diff --git a/simapp/app_v2.go b/simapp/app_v2.go index 99f93d531638..9c5e117d6ef4 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -22,7 +22,6 @@ import ( "github.com/cosmos/cosmos-sdk/server/api" "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" - "github.com/cosmos/cosmos-sdk/store/streaming" storetypes "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/testutil/testdata_pulsar" "github.com/cosmos/cosmos-sdk/types/module" @@ -245,8 +244,7 @@ func NewSimApp( app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...) - // load state streaming if enabled - if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, logger, app.kvStoreKeys()); err != nil { + if err := app.App.BaseApp.SetStreamingService(appOpts, app.appCodec, app.kvStoreKeys()); err != nil { logger.Error("failed to load state streaming", "err", err) os.Exit(1) } diff --git a/store/cachekv/internal/btree_test.go b/store/cachekv/internal/btree_test.go index b6aa22db8e0f..903558070b0a 100644 --- a/store/cachekv/internal/btree_test.go +++ b/store/cachekv/internal/btree_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/cosmos/cosmos-sdk/store/types" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" ) @@ -195,9 +194,9 @@ func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg stri } func int642Bytes(i int64) []byte { - return sdk.Uint64ToBigEndian(uint64(i)) + return types.Uint64ToBigEndian(uint64(i)) } func bytes2Int64(buf []byte) int64 { - return int64(sdk.BigEndianToUint64(buf)) + return int64(types.BigEndianToUint64(buf)) } diff --git a/store/cachekv/store.go b/store/cachekv/store.go index 666a71257354..203223493d38 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -9,8 +9,8 @@ import ( "github.com/tendermint/tendermint/libs/math" dbm "github.com/tendermint/tm-db" - "github.com/cosmos/cosmos-sdk/internal/conv" "github.com/cosmos/cosmos-sdk/store/cachekv/internal" + "github.com/cosmos/cosmos-sdk/store/internal/conv" "github.com/cosmos/cosmos-sdk/store/internal/kv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" diff --git a/store/internal/conv/doc.go b/store/internal/conv/doc.go new file mode 100644 index 000000000000..1c86f5c14409 --- /dev/null +++ b/store/internal/conv/doc.go @@ -0,0 +1,2 @@ +// Package conv provides internal functions for convertions and data manipulation +package conv diff --git a/store/internal/conv/string.go b/store/internal/conv/string.go new file mode 100644 index 000000000000..ab2b7f44b38a --- /dev/null +++ b/store/internal/conv/string.go @@ -0,0 +1,26 @@ +package conv + +import ( + "reflect" + "unsafe" +) + +// UnsafeStrToBytes uses unsafe to convert string into byte array. Returned bytes +// must not be altered after this function is called as it will cause a segmentation fault. +func UnsafeStrToBytes(s string) []byte { + var buf []byte + sHdr := (*reflect.StringHeader)(unsafe.Pointer(&s)) + bufHdr := (*reflect.SliceHeader)(unsafe.Pointer(&buf)) + bufHdr.Data = sHdr.Data + bufHdr.Cap = sHdr.Len + bufHdr.Len = sHdr.Len + return buf +} + +// UnsafeBytesToStr is meant to make a zero allocation conversion +// from []byte -> string to speed up operations, it is not meant +// to be used generally, but for a specific pattern to delete keys +// from a map. +func UnsafeBytesToStr(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} diff --git a/store/internal/conv/string_test.go b/store/internal/conv/string_test.go new file mode 100644 index 000000000000..3e051d37b907 --- /dev/null +++ b/store/internal/conv/string_test.go @@ -0,0 +1,54 @@ +package conv + +import ( + "runtime" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +func TestStringSuite(t *testing.T) { + suite.Run(t, new(StringSuite)) +} + +type StringSuite struct{ suite.Suite } + +func unsafeConvertStr() []byte { + return UnsafeStrToBytes("abc") +} + +func (s *StringSuite) TestUnsafeStrToBytes() { + // we convert in other function to trigger GC. We want to check that + // the underlying array in []bytes is accessible after GC will finish swapping. + for i := 0; i < 5; i++ { + b := unsafeConvertStr() + runtime.GC() + <-time.NewTimer(2 * time.Millisecond).C + b2 := append(b, 'd') //nolint:gocritic // append is fine here + s.Equal("abc", string(b)) + s.Equal("abcd", string(b2)) + } +} + +func unsafeConvertBytes() string { + return UnsafeBytesToStr([]byte("abc")) +} + +func (s *StringSuite) TestUnsafeBytesToStr() { + // we convert in other function to trigger GC. We want to check that + // the underlying array in []bytes is accessible after GC will finish swapping. + for i := 0; i < 5; i++ { + str := unsafeConvertBytes() + runtime.GC() + <-time.NewTimer(2 * time.Millisecond).C + s.Equal("abc", str) + } +} + +func BenchmarkUnsafeStrToBytes(b *testing.B) { + for i := 0; i < b.N; i++ { + UnsafeStrToBytes(strconv.Itoa(i)) + } +} diff --git a/store/streaming/README.md b/store/streaming/README.md index 9eb962ac862c..57fa3f6631ec 100644 --- a/store/streaming/README.md +++ b/store/streaming/README.md @@ -3,7 +3,7 @@ This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a file or stream, as described in [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) -and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go). +and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/store/types/streaming.go). The child directories contain the implementations for specific output destinations. Currently, a `StreamingService` implementation that writes state changes out to diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go index c756c61b0f3f..b44018c96718 100644 --- a/store/streaming/constructor.go +++ b/store/streaming/constructor.go @@ -7,7 +7,6 @@ import ( "strings" "sync" - "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client/flags" serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store/streaming/file" @@ -18,7 +17,7 @@ import ( ) // ServiceConstructor is used to construct a streaming service -type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (baseapp.StreamingService, error) +type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (types.StreamingService, error) // ServiceType enum for specifying the type of StreamingService type ServiceType int @@ -90,7 +89,7 @@ func NewFileStreamingService( keys []types.StoreKey, marshaller types.Codec, logger log.Logger, -) (baseapp.StreamingService, error) { +) (types.StreamingService, error) { homePath := cast.ToString(opts.Get(flags.FlagHome)) filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix)) fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir)) @@ -118,18 +117,17 @@ func NewFileStreamingService( // WaitGroup and quit channel used to synchronize with the streaming services // and any error that occurs during the setup. func LoadStreamingServices( - bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, appCodec types.Codec, logger log.Logger, keys map[string]*types.KVStoreKey, -) ([]baseapp.StreamingService, *sync.WaitGroup, error) { +) ([]types.StreamingService, *sync.WaitGroup, error) { // waitgroup and quit channel for optional shutdown coordination of the streaming service(s) wg := new(sync.WaitGroup) // configure state listening capabilities using AppOptions streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers)) - activeStreamers := make([]baseapp.StreamingService, 0, len(streamers)) + activeStreamers := make([]types.StreamingService, 0, len(streamers)) for _, streamerName := range streamers { var exposeStoreKeys []types.StoreKey @@ -180,9 +178,6 @@ func LoadStreamingServices( return nil, nil, err } - // register the streaming service with the BaseApp - bApp.SetStreamingService(streamingService) - // kick off the background streaming service loop streamingService.Stream(wg) diff --git a/store/streaming/constructor_test.go b/store/streaming/constructor_test.go index 03a3574f0479..28f462a268fc 100644 --- a/store/streaming/constructor_test.go +++ b/store/streaming/constructor_test.go @@ -5,9 +5,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/log" - dbm "github.com/tendermint/tm-db" - "github.com/cosmos/cosmos-sdk/baseapp" serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store/streaming" "github.com/cosmos/cosmos-sdk/store/streaming/file" @@ -49,10 +47,9 @@ func TestStreamingServiceConstructor(t *testing.T) { } func TestLoadStreamingServices(t *testing.T) { - db := dbm.NewMemDB() + encCdc := types.NewTestCodec() keys := types.NewKVStoreKeys("mockKey1", "mockKey2") - bApp := baseapp.NewBaseApp("appName", log.NewNopLogger(), db, nil) testCases := map[string]struct { appOpts serverTypes.AppOptions @@ -76,7 +73,7 @@ func TestLoadStreamingServices(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc, log.NewNopLogger(), keys) + activeStreamers, _, err := streaming.LoadStreamingServices(tc.appOpts, encCdc, log.NewNopLogger(), keys) require.NoError(t, err) require.Equal(t, tc.activeStreamersLen, len(activeStreamers)) }) diff --git a/store/streaming/file/README.md b/store/streaming/file/README.md index 0c34de7f3bea..d5ca3534d3ea 100644 --- a/store/streaming/file/README.md +++ b/store/streaming/file/README.md @@ -1,6 +1,6 @@ # File Streaming Service -This pkg contains an implementation of the [StreamingService](../../../baseapp/streaming.go) that writes +This pkg contains an implementation of the [StreamingService](../../types/streaming.go) that writes the data stream out to files on the local filesystem. This process is performed synchronously with the message processing of the state machine. diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index a5be0538d715..9e5928ac96ce 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -14,11 +14,10 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" - "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/store/types" ) -var _ baseapp.StreamingService = &StreamingService{} +var _ types.StreamingService = &StreamingService{} // StreamingService is a concrete implementation of StreamingService that writes // state changes out to files. diff --git a/baseapp/streaming.go b/store/types/streaming.go similarity index 93% rename from baseapp/streaming.go rename to store/types/streaming.go index b8b382ae05b3..cf963bcb0b61 100644 --- a/baseapp/streaming.go +++ b/store/types/streaming.go @@ -1,4 +1,4 @@ -package baseapp +package types import ( "context" @@ -6,8 +6,6 @@ import ( "sync" abci "github.com/tendermint/tendermint/abci/types" - - store "github.com/cosmos/cosmos-sdk/store/types" ) // ABCIListener interface used to hook into the ABCI message processing of the BaseApp. @@ -29,7 +27,7 @@ type StreamingService interface { // Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file Stream(wg *sync.WaitGroup) error // Listeners returns the streaming service's listeners for the BaseApp to register - Listeners() map[store.StoreKey][]store.WriteListener + Listeners() map[StoreKey][]WriteListener // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp ABCIListener // Closer interface