Skip to content

Commit

Permalink
refactor: State Streaming Docs + Explicit Config Support (backport co…
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored and JeancarloBarrios committed Sep 28, 2024
1 parent 52d0130 commit eed260f
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 124 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Improvements

* (config) [#13894](https://github.com/cosmos/cosmos-sdk/pull/13894) Support state streaming configuration in `app.toml` template and default configuration.

## [v0.46.5](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.46.5) - 2022-11-17

### Features
Expand Down
20 changes: 4 additions & 16 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
DefaultGRPCMaxSendMsgSize = math.MaxInt32

// FileStreamer defines the store streaming type for file streaming.
FileStreamer = "file"
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -171,15 +174,6 @@ type (
Keys []string `mapstructure:"keys"`
WriteDir string `mapstructure:"write_dir"`
Prefix string `mapstructure:"prefix"`
// OutputMetadata specifies if output the block metadata file which includes
// the abci requests/responses, otherwise only the data file is outputted.
OutputMetadata bool `mapstructure:"output-metadata"`
// StopNodeOnError specifies if propagate the streamer errors to the consensus
// state machine, it's nesserary for data integrity of output.
StopNodeOnError bool `mapstructure:"stop-node-on-error"`
// Fsync specifies if calling fsync after writing the files, it slows down
// the commit, but don't lose data in face of system crash.
Fsync bool `mapstructure:"fsync"`
}
)

Expand Down Expand Up @@ -257,13 +251,7 @@ func DefaultConfig() *Config {
},
Streamers: StreamersConfig{
File: FileStreamerConfig{
Keys: []string{"*"},
WriteDir: "",
OutputMetadata: true,
StopNodeOnError: true,
// NOTICE: The default config doesn't protect the streamer data integrity
// in face of system crash.
Fsync: false,
Keys: []string{"*"},
},
},
}
Expand Down
87 changes: 15 additions & 72 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

pruningtypes "cosmossdk.io/store/pruning/types"
Expand Down Expand Up @@ -60,79 +59,24 @@ func TestIndexEventsMarshalling(t *testing.T) {
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestStreamingConfig(t *testing.T) {
cfg := Config{
Streaming: StreamingConfig{
ABCI: ABCIListenerConfig{
Keys: []string{"one", "two"},
Plugin: "plugin-A",
StopNodeOnErr: false,
},
},
}

testDir := t.TempDir()
cfgFile := filepath.Join(testDir, "app.toml")
err := WriteConfigFile(cfgFile, &cfg)
require.NoError(t, err)

cfgFileBz, err := os.ReadFile(cfgFile)
require.NoError(t, err, "reading %s", cfgFile)
cfgFileContents := string(cfgFileBz)
t.Logf("Config file contents: %s:\n%s", cfgFile, cfgFileContents)

expectedLines := []string{
`keys = ["one", "two", ]`,
`plugin = "plugin-A"`,
`stop-node-on-err = false`,
}

for _, line := range expectedLines {
assert.Contains(t, cfgFileContents, line+"\n", "config file contents")
}

vpr := viper.New()
vpr.SetConfigFile(cfgFile)
err = vpr.ReadInConfig()
require.NoError(t, err, "reading config file into viper")

var actual Config
err = vpr.Unmarshal(&actual)
require.NoError(t, err, "vpr.Unmarshal")

assert.Equal(t, cfg.Streaming, actual.Streaming, "Streaming")
}
func TestParseStoreStreaming(t *testing.T) {
expectedContents := `[store]
streamers = ["file", ]
func TestParseStreaming(t *testing.T) {
expectedKeys := `keys = ["*", ]` + "\n"
expectedPlugin := `plugin = "abci_v1"` + "\n"
expectedStopNodeOnErr := `stop-node-on-err = true` + "\n"
[streamers]
[streamers.file]
keys = ["*", ]
write_dir = "/foo/bar"
prefix = ""`

cfg := DefaultConfig()
cfg.Streaming.ABCI.Keys = []string{"*"}
cfg.Streaming.ABCI.Plugin = "abci_v1"
cfg.Streaming.ABCI.StopNodeOnErr = true
cfg.Store.Streamers = []string{FileStreamer}
cfg.Streamers.File.Keys = []string{"*"}
cfg.Streamers.File.WriteDir = "/foo/bar"

var buffer bytes.Buffer
err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
require.Contains(t, actual, expectedKeys, "config file contents")
require.Contains(t, actual, expectedPlugin, "config file contents")
require.Contains(t, actual, expectedStopNodeOnErr, "config file contents")
}

func TestReadConfig(t *testing.T) {
cfg := DefaultConfig()
tmpFile := filepath.Join(t.TempDir(), "config")
err := WriteConfigFile(tmpFile, cfg)
require.NoError(t, err)

v := viper.New()
otherCfg, err := GetConfig(v)
require.NoError(t, err)

require.Equal(t, *cfg, otherCfg)
require.NoError(t, configTemplate.Execute(&buffer, cfg), "executing template")
require.Contains(t, buffer.String(), expectedContents, "config file contents")
}

func TestIndexEventsWriteRead(t *testing.T) {
Expand All @@ -143,14 +87,13 @@ func TestIndexEventsWriteRead(t *testing.T) {
conf := DefaultConfig()
conf.IndexEvents = expected

err := WriteConfigFile(confFile, conf)
require.NoError(t, err)
WriteConfigFile(confFile, conf)

// read the file into Viper
vpr := viper.New()
vpr.SetConfigFile(confFile)

err = vpr.ReadInConfig()
err := vpr.ReadInConfig()
require.NoError(t, err, "reading config file into viper")

// Check that the raw viper value is correct.
Expand Down
10 changes: 0 additions & 10 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,6 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
# output-metadata specifies if output the metadata file which includes the abci request/responses
# during processing the block.
output-metadata = "{{ .Streamers.File.OutputMetadata }}"
# stop-node-on-error specifies if propagate the file streamer errors to consensus state machine.
stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}"
# fsync specifies if call fsync after writing the files.
fsync = "{{ .Streamers.File.Fsync }}"
`

var configTemplate *template.Template
Expand Down
74 changes: 48 additions & 26 deletions store/streaming/README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
# State Streaming Service
This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a
file or stream, as described in [ADR-038](../../docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](../../baseapp/streaming.go).

This package contains the constructors for the `StreamingService`s used to write
state changes out from individual KVStores to a file or stream, as described in
[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md)
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
The child directories contain the implementations for specific output destinations.

Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional
output destinations can be added.
Currently, a `StreamingService` implementation that writes state changes out to
files is supported, in the future support for additional output destinations can
be added.

The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file:
The `StreamingService` is configured from within an App using the `AppOptions`
loaded from the `app.toml` file:

```toml
# ...

[store]
streamers = [ # if len(streamers) > 0 we are streaming
"file", # name of the streaming service, used by constructor
]
# streaming is enabled if one or more streamers are defined
streamers = [
# name of the streaming service, used by constructor
"file"
]

[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
```

`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString`
to return the `ServiceConstructor` for that particular implementation:
The `store.streamers` field contains a list of the names of the `StreamingService`
implementations to employ which are used by `ServiceTypeFromString` to return
the `ServiceConstructor` for that particular implementation:


```go
Expand All @@ -35,18 +45,27 @@ for _, listenerName := range listeners {
}
```

`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service.
`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`.
In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off.
The `streamers` field contains a mapping of the specific `StreamingService`
implementation name to the configuration parameters for that specific service.

The `streamers.x.keys` field contains the list of `StoreKey` names for the
KVStores to expose using this service and is required by every type of
`StreamingService`. In order to expose *ALL* KVStores, we can include `*` in
this list. An empty list is equivalent to turning the service off.

Additional configuration parameters are optional and specific to the implementation.
In the case of the file streaming service, `streamers.file.write_dir` contains the path to the
directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions
with other App `StreamingService` output files.
In the case of the file streaming service, the `streamers.file.write_dir` field
contains the path to the directory to write the files to, and `streamers.file.prefix`
contains an optional prefix to prepend to the output files to prevent potential
collisions with other App `StreamingService` output files.

The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and
returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options,
e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`.
The `ServiceConstructor` accepts `AppOptions`, the store keys collected using
`streamers.x.keys`, a `BinaryMarshaller` and returns a `StreamingService
implementation.

The `AppOptions` are passed in to provide access to any implementation specific
configuration options, e.g. in the case of the file streaming service the
`streamers.file.write_dir` and `streamers.file.prefix`.

```go
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
Expand All @@ -55,9 +74,12 @@ if err != nil {
}
```

The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method.
The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process
may be synchronous or asynchronous with the message processing of the state machine.
The returned `StreamingService` is loaded into the BaseApp using the BaseApp's
`SetStreamingService` method.

The `Stream` method is called on the service to begin the streaming process.
Depending on the implementation this process may be synchronous or asynchronous
with the message processing of the state machine.

```go
bApp.SetStreamingService(streamingService)
Expand Down

0 comments on commit eed260f

Please sign in to comment.