From 2c0d445ad3c50a4fcee8a08a9a0562ef8ba0238a Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Thu, 17 Nov 2022 13:48:44 -0500 Subject: [PATCH] refactor: State Streaming Docs + Explicit Config Support (#13894) --- CHANGELOG.md | 1 + server/config/config.go | 35 +++++++++++++++++ server/config/config_test.go | 35 ++++++++++++++--- server/config/toml.go | 13 +++++++ simapp/app.go | 6 +-- simapp/app_legacy.go | 6 +-- store/streaming/README.md | 73 +++++++++++++++++++++++------------- 7 files changed, 132 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c8a01d9345b..0f3b8fa5f145 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Improvements +* (config) [#13894](https://github.com/cosmos/cosmos-sdk/pull/13894) Support state streaming configuration in `app.toml` template and default configuration. * (x/nft) [#13836](https://github.com/cosmos/cosmos-sdk/pull/13836) Remove the validation for `classID` and `nftID` from the NFT module. * [#13789](https://github.com/cosmos/cosmos-sdk/pull/13789) Add tx `encode` and `decode` endpoints to tx service. > Note: This endpoint will only encode proto messages, Amino encoding is not supported. diff --git a/server/config/config.go b/server/config/config.go index abe2c70713d4..f28679f8051f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -33,6 +33,9 @@ const ( // DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in // bytes the server can send. DefaultGRPCMaxSendMsgSize = math.MaxInt32 + + // FileStreamer defines the store streaming type for file streaming. + FileStreamer = "file" ) // BaseConfig defines the server's basic configuration @@ -196,6 +199,28 @@ type StateSyncConfig struct { SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"` } +type ( + // StoreConfig defines application configuration for state streaming and other + // storage related operations. + StoreConfig struct { + Streamers []string `mapstructure:"streamers"` + } + + // StreamersConfig defines concrete state streaming configuration options. These + // fields are required to be set when state streaming is enabled via a non-empty + // list defined by 'StoreConfig.Streamers'. + StreamersConfig struct { + File FileStreamerConfig `mapstructure:"file"` + } + + // FileStreamerConfig defines the file streaming configuration options. + FileStreamerConfig struct { + Keys []string `mapstructure:"keys"` + WriteDir string `mapstructure:"write_dir"` + Prefix string `mapstructure:"prefix"` + } +) + // Config defines the server's top level configuration type Config struct { BaseConfig `mapstructure:",squash"` @@ -207,6 +232,8 @@ type Config struct { Rosetta RosettaConfig `mapstructure:"rosetta"` GRPCWeb GRPCWebConfig `mapstructure:"grpc-web"` StateSync StateSyncConfig `mapstructure:"state-sync"` + Store StoreConfig `mapstructure:"store"` + Streamers StreamersConfig `mapstructure:"streamers"` } // SetMinGasPrices sets the validator's minimum gas prices. @@ -288,6 +315,14 @@ func DefaultConfig() *Config { SnapshotInterval: 0, SnapshotKeepRecent: 2, }, + Store: StoreConfig{ + Streamers: []string{}, + }, + Streamers: StreamersConfig{ + File: FileStreamerConfig{ + Keys: []string{"*"}, + }, + }, } } diff --git a/server/config/config_test.go b/server/config/config_test.go index 442e56026ec4..042b1ebd1612 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/spf13/viper" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" sdk "github.com/cosmos/cosmos-sdk/types" @@ -32,28 +31,54 @@ func TestIndexEventsMarshalling(t *testing.T) { err := configTemplate.Execute(&buffer, cfg) require.NoError(t, err, "executing template") actual := buffer.String() - assert.Contains(t, actual, expectedIn, "config file contents") + require.Contains(t, actual, expectedIn, "config file contents") +} + +func TestParseStoreStreaming(t *testing.T) { + expectedContents := `[store] +streamers = ["file", ] + +[streamers] +[streamers.file] +keys = ["*", ] +write_dir = "/foo/bar" +prefix = ""` + + cfg := DefaultConfig() + cfg.Store.Streamers = []string{FileStreamer} + cfg.Streamers.File.Keys = []string{"*"} + cfg.Streamers.File.WriteDir = "/foo/bar" + + var buffer bytes.Buffer + require.NoError(t, configTemplate.Execute(&buffer, cfg), "executing template") + require.Contains(t, buffer.String(), expectedContents, "config file contents") } func TestIndexEventsWriteRead(t *testing.T) { expected := []string{"key3", "key4"} + // Create config with two IndexEvents entries, and write it to a file. confFile := filepath.Join(t.TempDir(), "app.toml") conf := DefaultConfig() conf.IndexEvents = expected + WriteConfigFile(confFile, conf) - // Read that file into viper. + // read the file into Viper vpr := viper.New() vpr.SetConfigFile(confFile) + err := vpr.ReadInConfig() require.NoError(t, err, "reading config file into viper") + // Check that the raw viper value is correct. actualRaw := vpr.GetStringSlice("index-events") require.Equal(t, expected, actualRaw, "viper's index events") + // Check that it is parsed into the config correctly. cfg, perr := ParseConfig(vpr) require.NoError(t, perr, "parsing config") + actual := cfg.IndexEvents require.Equal(t, expected, actual, "config value") } @@ -62,7 +87,7 @@ func TestGlobalLabelsEventsMarshalling(t *testing.T) { expectedIn := `global-labels = [ ["labelname1", "labelvalue1"], ["labelname2", "labelvalue2"], -]` + "\n" +]` cfg := DefaultConfig() cfg.Telemetry.GlobalLabels = [][]string{{"labelname1", "labelvalue1"}, {"labelname2", "labelvalue2"}} var buffer bytes.Buffer @@ -70,7 +95,7 @@ func TestGlobalLabelsEventsMarshalling(t *testing.T) { err := configTemplate.Execute(&buffer, cfg) require.NoError(t, err, "executing template") actual := buffer.String() - assert.Contains(t, actual, expectedIn, "config file contents") + require.Contains(t, actual, expectedIn, "config file contents") } func TestGlobalLabelsWriteRead(t *testing.T) { diff --git a/server/config/toml.go b/server/config/toml.go index 0f8a4c9340c6..075a5e5d4499 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -235,6 +235,19 @@ snapshot-interval = {{ .StateSync.SnapshotInterval }} # snapshot-keep-recent specifies the number of recent snapshots to keep and serve (0 to keep all). snapshot-keep-recent = {{ .StateSync.SnapshotKeepRecent }} + +############################################################################### +### Store / State Streaming ### +############################################################################### + +[store] +streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}] + +[streamers] +[streamers.file] +keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}] +write_dir = "{{ .Streamers.File.WriteDir }}" +prefix = "{{ .Streamers.File.Prefix }}" ` var configTemplate *template.Template diff --git a/simapp/app.go b/simapp/app.go index 45d518a0ac48..d09f399a3871 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -14,6 +14,7 @@ import ( dbm "github.com/tendermint/tm-db" "cosmossdk.io/depinject" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" @@ -266,10 +267,9 @@ func NewSimApp( app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...) - // configure state listening capabilities using AppOptions - // we are doing nothing with the returned streamingServices and waitGroup in this case + // load state streaming if enabled if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, app.keys); err != nil { - fmt.Println(err.Error()) + fmt.Printf("failed to load state streaming: %s", err) os.Exit(1) } diff --git a/simapp/app_legacy.go b/simapp/app_legacy.go index a7956291ecb6..d1525845c5a3 100644 --- a/simapp/app_legacy.go +++ b/simapp/app_legacy.go @@ -17,6 +17,7 @@ import ( dbm "github.com/tendermint/tm-db" simappparams "cosmossdk.io/simapp/params" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" @@ -246,10 +247,9 @@ func NewSimApp( // not include this key. memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey") - // configure state listening capabilities using AppOptions - // we are doing nothing with the returned streamingServices and waitGroup in this case + // load state streaming if enabled if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil { - fmt.Println(err.Error()) + fmt.Printf("failed to load state streaming: %s", err) os.Exit(1) } diff --git a/store/streaming/README.md b/store/streaming/README.md index 3118ceed9834..9eb962ac862c 100644 --- a/store/streaming/README.md +++ b/store/streaming/README.md @@ -1,29 +1,38 @@ # State Streaming Service -This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a -file or stream, as described in [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go). +This package contains the constructors for the `StreamingService`s used to write +state changes out from individual KVStores to a file or stream, as described in +[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) +and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go). The child directories contain the implementations for specific output destinations. -Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional -output destinations can be added. +Currently, a `StreamingService` implementation that writes state changes out to +files is supported, in the future support for additional output destinations can +be added. -The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file: +The `StreamingService` is configured from within an App using the `AppOptions` +loaded from the `app.toml` file: ```toml +# ... + [store] - streamers = [ # if len(streamers) > 0 we are streaming - "file", # name of the streaming service, used by constructor - ] +# streaming is enabled if one or more streamers are defined +streamers = [ + # name of the streaming service, used by constructor + "file" +] [streamers] - [streamers.file] - keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] - write_dir = "path to the write directory" - prefix = "optional prefix to prepend to the generated file names" +[streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" ``` -`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString` -to return the `ServiceConstructor` for that particular implementation: +The `store.streamers` field contains a list of the names of the `StreamingService` +implementations to employ which are used by `ServiceTypeFromString` to return +the `ServiceConstructor` for that particular implementation: ```go listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) @@ -35,18 +44,27 @@ for _, listenerName := range listeners { } ``` -`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service. -`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`. -In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off. +The `streamers` field contains a mapping of the specific `StreamingService` +implementation name to the configuration parameters for that specific service. + +The `streamers.x.keys` field contains the list of `StoreKey` names for the +KVStores to expose using this service and is required by every type of +`StreamingService`. In order to expose *ALL* KVStores, we can include `*` in +this list. An empty list is equivalent to turning the service off. Additional configuration parameters are optional and specific to the implementation. -In the case of the file streaming service, `streamers.file.write_dir` contains the path to the -directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions -with other App `StreamingService` output files. +In the case of the file streaming service, the `streamers.file.write_dir` field +contains the path to the directory to write the files to, and `streamers.file.prefix` +contains an optional prefix to prepend to the output files to prevent potential +collisions with other App `StreamingService` output files. -The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and -returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options, -e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`. +The `ServiceConstructor` accepts `AppOptions`, the store keys collected using +`streamers.x.keys`, a `BinaryMarshaller` and returns a `StreamingService +implementation. + +The `AppOptions` are passed in to provide access to any implementation specific +configuration options, e.g. in the case of the file streaming service the +`streamers.file.write_dir` and `streamers.file.prefix`. ```go streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) @@ -55,9 +73,12 @@ if err != nil { } ``` -The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method. -The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process -may be synchronous or asynchronous with the message processing of the state machine. +The returned `StreamingService` is loaded into the BaseApp using the BaseApp's +`SetStreamingService` method. + +The `Stream` method is called on the service to begin the streaming process. +Depending on the implementation this process may be synchronous or asynchronous +with the message processing of the state machine. ```go bApp.SetStreamingService(streamingService)