diff --git a/CHANGELOG.md b/CHANGELOG.md index 002488612053..d56a2510a275 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,10 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] +### Improvements + +* (config) [#13894](https://github.com/cosmos/cosmos-sdk/pull/13894) Support state streaming configuration in `app.toml` template and default configuration. + ## [v0.46.5](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.46.5) - 2022-11-17 ### Features diff --git a/server/config/config.go b/server/config/config.go index 303c5e25126a..3123175589a9 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -29,6 +29,9 @@ const ( // DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in // bytes the server can send. DefaultGRPCMaxSendMsgSize = math.MaxInt32 + + // FileStreamer defines the store streaming type for file streaming. + FileStreamer = "file" ) // BaseConfig defines the server's basic configuration @@ -171,15 +174,6 @@ type ( Keys []string `mapstructure:"keys"` WriteDir string `mapstructure:"write_dir"` Prefix string `mapstructure:"prefix"` - // OutputMetadata specifies if output the block metadata file which includes - // the abci requests/responses, otherwise only the data file is outputted. - OutputMetadata bool `mapstructure:"output-metadata"` - // StopNodeOnError specifies if propagate the streamer errors to the consensus - // state machine, it's nesserary for data integrity of output. - StopNodeOnError bool `mapstructure:"stop-node-on-error"` - // Fsync specifies if calling fsync after writing the files, it slows down - // the commit, but don't lose data in face of system crash. - Fsync bool `mapstructure:"fsync"` } ) @@ -257,13 +251,7 @@ func DefaultConfig() *Config { }, Streamers: StreamersConfig{ File: FileStreamerConfig{ - Keys: []string{"*"}, - WriteDir: "", - OutputMetadata: true, - StopNodeOnError: true, - // NOTICE: The default config doesn't protect the streamer data integrity - // in face of system crash. - Fsync: false, + Keys: []string{"*"}, }, }, } diff --git a/server/config/config_test.go b/server/config/config_test.go index 6d827ec27dc7..6ff9f0e828b7 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/spf13/viper" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" pruningtypes "cosmossdk.io/store/pruning/types" @@ -60,79 +59,24 @@ func TestIndexEventsMarshalling(t *testing.T) { require.Contains(t, actual, expectedIn, "config file contents") } -func TestStreamingConfig(t *testing.T) { - cfg := Config{ - Streaming: StreamingConfig{ - ABCI: ABCIListenerConfig{ - Keys: []string{"one", "two"}, - Plugin: "plugin-A", - StopNodeOnErr: false, - }, - }, - } - - testDir := t.TempDir() - cfgFile := filepath.Join(testDir, "app.toml") - err := WriteConfigFile(cfgFile, &cfg) - require.NoError(t, err) - - cfgFileBz, err := os.ReadFile(cfgFile) - require.NoError(t, err, "reading %s", cfgFile) - cfgFileContents := string(cfgFileBz) - t.Logf("Config file contents: %s:\n%s", cfgFile, cfgFileContents) - - expectedLines := []string{ - `keys = ["one", "two", ]`, - `plugin = "plugin-A"`, - `stop-node-on-err = false`, - } - - for _, line := range expectedLines { - assert.Contains(t, cfgFileContents, line+"\n", "config file contents") - } - - vpr := viper.New() - vpr.SetConfigFile(cfgFile) - err = vpr.ReadInConfig() - require.NoError(t, err, "reading config file into viper") - - var actual Config - err = vpr.Unmarshal(&actual) - require.NoError(t, err, "vpr.Unmarshal") - - assert.Equal(t, cfg.Streaming, actual.Streaming, "Streaming") -} +func TestParseStoreStreaming(t *testing.T) { + expectedContents := `[store] +streamers = ["file", ] -func TestParseStreaming(t *testing.T) { - expectedKeys := `keys = ["*", ]` + "\n" - expectedPlugin := `plugin = "abci_v1"` + "\n" - expectedStopNodeOnErr := `stop-node-on-err = true` + "\n" +[streamers] +[streamers.file] +keys = ["*", ] +write_dir = "/foo/bar" +prefix = ""` cfg := DefaultConfig() - cfg.Streaming.ABCI.Keys = []string{"*"} - cfg.Streaming.ABCI.Plugin = "abci_v1" - cfg.Streaming.ABCI.StopNodeOnErr = true + cfg.Store.Streamers = []string{FileStreamer} + cfg.Streamers.File.Keys = []string{"*"} + cfg.Streamers.File.WriteDir = "/foo/bar" var buffer bytes.Buffer - err := configTemplate.Execute(&buffer, cfg) - require.NoError(t, err, "executing template") - actual := buffer.String() - require.Contains(t, actual, expectedKeys, "config file contents") - require.Contains(t, actual, expectedPlugin, "config file contents") - require.Contains(t, actual, expectedStopNodeOnErr, "config file contents") -} - -func TestReadConfig(t *testing.T) { - cfg := DefaultConfig() - tmpFile := filepath.Join(t.TempDir(), "config") - err := WriteConfigFile(tmpFile, cfg) - require.NoError(t, err) - - v := viper.New() - otherCfg, err := GetConfig(v) - require.NoError(t, err) - - require.Equal(t, *cfg, otherCfg) + require.NoError(t, configTemplate.Execute(&buffer, cfg), "executing template") + require.Contains(t, buffer.String(), expectedContents, "config file contents") } func TestIndexEventsWriteRead(t *testing.T) { @@ -143,14 +87,13 @@ func TestIndexEventsWriteRead(t *testing.T) { conf := DefaultConfig() conf.IndexEvents = expected - err := WriteConfigFile(confFile, conf) - require.NoError(t, err) + WriteConfigFile(confFile, conf) // read the file into Viper vpr := viper.New() vpr.SetConfigFile(confFile) - err = vpr.ReadInConfig() + err := vpr.ReadInConfig() require.NoError(t, err, "reading config file into viper") // Check that the raw viper value is correct. diff --git a/server/config/toml.go b/server/config/toml.go index 7820fe487a8c..98ca20ccb5f5 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -230,16 +230,6 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}] keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}] write_dir = "{{ .Streamers.File.WriteDir }}" prefix = "{{ .Streamers.File.Prefix }}" - -# output-metadata specifies if output the metadata file which includes the abci request/responses -# during processing the block. -output-metadata = "{{ .Streamers.File.OutputMetadata }}" - -# stop-node-on-error specifies if propagate the file streamer errors to consensus state machine. -stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}" - -# fsync specifies if call fsync after writing the files. -fsync = "{{ .Streamers.File.Fsync }}" ` var configTemplate *template.Template diff --git a/store/streaming/README.md b/store/streaming/README.md index 819514aef796..47ddfb0d8f79 100644 --- a/store/streaming/README.md +++ b/store/streaming/README.md @@ -1,28 +1,38 @@ # State Streaming Service -This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a -file or stream, as described in [ADR-038](../../docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](../../baseapp/streaming.go). + +This package contains the constructors for the `StreamingService`s used to write +state changes out from individual KVStores to a file or stream, as described in +[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) +and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go). The child directories contain the implementations for specific output destinations. -Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional -output destinations can be added. +Currently, a `StreamingService` implementation that writes state changes out to +files is supported, in the future support for additional output destinations can +be added. -The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file: +The `StreamingService` is configured from within an App using the `AppOptions` +loaded from the `app.toml` file: ```toml +# ... + [store] - streamers = [ # if len(streamers) > 0 we are streaming - "file", # name of the streaming service, used by constructor - ] +# streaming is enabled if one or more streamers are defined +streamers = [ + # name of the streaming service, used by constructor + "file" +] [streamers] - [streamers.file] - keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] - write_dir = "path to the write directory" - prefix = "optional prefix to prepend to the generated file names" +[streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" ``` -`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString` -to return the `ServiceConstructor` for that particular implementation: +The `store.streamers` field contains a list of the names of the `StreamingService` +implementations to employ which are used by `ServiceTypeFromString` to return +the `ServiceConstructor` for that particular implementation: ```go @@ -35,18 +45,27 @@ for _, listenerName := range listeners { } ``` -`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service. -`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`. -In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off. +The `streamers` field contains a mapping of the specific `StreamingService` +implementation name to the configuration parameters for that specific service. + +The `streamers.x.keys` field contains the list of `StoreKey` names for the +KVStores to expose using this service and is required by every type of +`StreamingService`. In order to expose *ALL* KVStores, we can include `*` in +this list. An empty list is equivalent to turning the service off. Additional configuration parameters are optional and specific to the implementation. -In the case of the file streaming service, `streamers.file.write_dir` contains the path to the -directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions -with other App `StreamingService` output files. +In the case of the file streaming service, the `streamers.file.write_dir` field +contains the path to the directory to write the files to, and `streamers.file.prefix` +contains an optional prefix to prepend to the output files to prevent potential +collisions with other App `StreamingService` output files. -The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and -returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options, -e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`. +The `ServiceConstructor` accepts `AppOptions`, the store keys collected using +`streamers.x.keys`, a `BinaryMarshaller` and returns a `StreamingService +implementation. + +The `AppOptions` are passed in to provide access to any implementation specific +configuration options, e.g. in the case of the file streaming service the +`streamers.file.write_dir` and `streamers.file.prefix`. ```go streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) @@ -55,9 +74,12 @@ if err != nil { } ``` -The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method. -The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process -may be synchronous or asynchronous with the message processing of the state machine. +The returned `StreamingService` is loaded into the BaseApp using the BaseApp's +`SetStreamingService` method. + +The `Stream` method is called on the service to begin the streaming process. +Depending on the implementation this process may be synchronous or asynchronous +with the message processing of the state machine. ```go bApp.SetStreamingService(streamingService)