Skip to content

Commit

Permalink
refactor: State Streaming Docs + Explicit Config Support (#13894)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderbez authored Nov 17, 2022
1 parent 6dbf113 commit 2c0d445
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Improvements

* (config) [#13894](https://github.com/cosmos/cosmos-sdk/pull/13894) Support state streaming configuration in `app.toml` template and default configuration.
* (x/nft) [#13836](https://github.com/cosmos/cosmos-sdk/pull/13836) Remove the validation for `classID` and `nftID` from the NFT module.
* [#13789](https://github.com/cosmos/cosmos-sdk/pull/13789) Add tx `encode` and `decode` endpoints to tx service.
> Note: This endpoint will only encode proto messages, Amino encoding is not supported.
Expand Down
35 changes: 35 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
DefaultGRPCMaxSendMsgSize = math.MaxInt32

// FileStreamer defines the store streaming type for file streaming.
FileStreamer = "file"
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -196,6 +199,28 @@ type StateSyncConfig struct {
SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"`
}

type (
// StoreConfig defines application configuration for state streaming and other
// storage related operations.
StoreConfig struct {
Streamers []string `mapstructure:"streamers"`
}

// StreamersConfig defines concrete state streaming configuration options. These
// fields are required to be set when state streaming is enabled via a non-empty
// list defined by 'StoreConfig.Streamers'.
StreamersConfig struct {
File FileStreamerConfig `mapstructure:"file"`
}

// FileStreamerConfig defines the file streaming configuration options.
FileStreamerConfig struct {
Keys []string `mapstructure:"keys"`
WriteDir string `mapstructure:"write_dir"`
Prefix string `mapstructure:"prefix"`
}
)

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`
Expand All @@ -207,6 +232,8 @@ type Config struct {
Rosetta RosettaConfig `mapstructure:"rosetta"`
GRPCWeb GRPCWebConfig `mapstructure:"grpc-web"`
StateSync StateSyncConfig `mapstructure:"state-sync"`
Store StoreConfig `mapstructure:"store"`
Streamers StreamersConfig `mapstructure:"streamers"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -288,6 +315,14 @@ func DefaultConfig() *Config {
SnapshotInterval: 0,
SnapshotKeepRecent: 2,
},
Store: StoreConfig{
Streamers: []string{},
},
Streamers: StreamersConfig{
File: FileStreamerConfig{
Keys: []string{"*"},
},
},
}
}

Expand Down
35 changes: 30 additions & 5 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -32,28 +31,54 @@ func TestIndexEventsMarshalling(t *testing.T) {
err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
assert.Contains(t, actual, expectedIn, "config file contents")
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestParseStoreStreaming(t *testing.T) {
expectedContents := `[store]
streamers = ["file", ]
[streamers]
[streamers.file]
keys = ["*", ]
write_dir = "/foo/bar"
prefix = ""`

cfg := DefaultConfig()
cfg.Store.Streamers = []string{FileStreamer}
cfg.Streamers.File.Keys = []string{"*"}
cfg.Streamers.File.WriteDir = "/foo/bar"

var buffer bytes.Buffer
require.NoError(t, configTemplate.Execute(&buffer, cfg), "executing template")
require.Contains(t, buffer.String(), expectedContents, "config file contents")
}

func TestIndexEventsWriteRead(t *testing.T) {
expected := []string{"key3", "key4"}

// Create config with two IndexEvents entries, and write it to a file.
confFile := filepath.Join(t.TempDir(), "app.toml")
conf := DefaultConfig()
conf.IndexEvents = expected

WriteConfigFile(confFile, conf)

// Read that file into viper.
// read the file into Viper
vpr := viper.New()
vpr.SetConfigFile(confFile)

err := vpr.ReadInConfig()
require.NoError(t, err, "reading config file into viper")

// Check that the raw viper value is correct.
actualRaw := vpr.GetStringSlice("index-events")
require.Equal(t, expected, actualRaw, "viper's index events")

// Check that it is parsed into the config correctly.
cfg, perr := ParseConfig(vpr)
require.NoError(t, perr, "parsing config")

actual := cfg.IndexEvents
require.Equal(t, expected, actual, "config value")
}
Expand All @@ -62,15 +87,15 @@ func TestGlobalLabelsEventsMarshalling(t *testing.T) {
expectedIn := `global-labels = [
["labelname1", "labelvalue1"],
["labelname2", "labelvalue2"],
]` + "\n"
]`
cfg := DefaultConfig()
cfg.Telemetry.GlobalLabels = [][]string{{"labelname1", "labelvalue1"}, {"labelname2", "labelvalue2"}}
var buffer bytes.Buffer

err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
assert.Contains(t, actual, expectedIn, "config file contents")
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestGlobalLabelsWriteRead(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,19 @@ snapshot-interval = {{ .StateSync.SnapshotInterval }}
# snapshot-keep-recent specifies the number of recent snapshots to keep and serve (0 to keep all).
snapshot-keep-recent = {{ .StateSync.SnapshotKeepRecent }}
###############################################################################
### Store / State Streaming ###
###############################################################################
[store]
streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
[streamers]
[streamers.file]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
`

var configTemplate *template.Template
Expand Down
6 changes: 3 additions & 3 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
dbm "github.com/tendermint/tm-db"

"cosmossdk.io/depinject"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
Expand Down Expand Up @@ -266,10 +267,9 @@ func NewSimApp(

app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...)

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, app.keys); err != nil {
fmt.Println(err.Error())
fmt.Printf("failed to load state streaming: %s", err)
os.Exit(1)
}

Expand Down
6 changes: 3 additions & 3 deletions simapp/app_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
dbm "github.com/tendermint/tm-db"

simappparams "cosmossdk.io/simapp/params"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
Expand Down Expand Up @@ -246,10 +247,9 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil {
fmt.Println(err.Error())
fmt.Printf("failed to load state streaming: %s", err)
os.Exit(1)
}

Expand Down
73 changes: 47 additions & 26 deletions store/streaming/README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
# State Streaming Service

This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a
file or stream, as described in [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
This package contains the constructors for the `StreamingService`s used to write
state changes out from individual KVStores to a file or stream, as described in
[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md)
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
The child directories contain the implementations for specific output destinations.

Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional
output destinations can be added.
Currently, a `StreamingService` implementation that writes state changes out to
files is supported, in the future support for additional output destinations can
be added.

The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file:
The `StreamingService` is configured from within an App using the `AppOptions`
loaded from the `app.toml` file:

```toml
# ...

[store]
streamers = [ # if len(streamers) > 0 we are streaming
"file", # name of the streaming service, used by constructor
]
# streaming is enabled if one or more streamers are defined
streamers = [
# name of the streaming service, used by constructor
"file"
]

[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
```

`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString`
to return the `ServiceConstructor` for that particular implementation:
The `store.streamers` field contains a list of the names of the `StreamingService`
implementations to employ which are used by `ServiceTypeFromString` to return
the `ServiceConstructor` for that particular implementation:

```go
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
Expand All @@ -35,18 +44,27 @@ for _, listenerName := range listeners {
}
```

`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service.
`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`.
In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off.
The `streamers` field contains a mapping of the specific `StreamingService`
implementation name to the configuration parameters for that specific service.

The `streamers.x.keys` field contains the list of `StoreKey` names for the
KVStores to expose using this service and is required by every type of
`StreamingService`. In order to expose *ALL* KVStores, we can include `*` in
this list. An empty list is equivalent to turning the service off.

Additional configuration parameters are optional and specific to the implementation.
In the case of the file streaming service, `streamers.file.write_dir` contains the path to the
directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions
with other App `StreamingService` output files.
In the case of the file streaming service, the `streamers.file.write_dir` field
contains the path to the directory to write the files to, and `streamers.file.prefix`
contains an optional prefix to prepend to the output files to prevent potential
collisions with other App `StreamingService` output files.

The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and
returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options,
e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`.
The `ServiceConstructor` accepts `AppOptions`, the store keys collected using
`streamers.x.keys`, a `BinaryMarshaller` and returns a `StreamingService
implementation.

The `AppOptions` are passed in to provide access to any implementation specific
configuration options, e.g. in the case of the file streaming service the
`streamers.file.write_dir` and `streamers.file.prefix`.

```go
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
Expand All @@ -55,9 +73,12 @@ if err != nil {
}
```

The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method.
The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process
may be synchronous or asynchronous with the message processing of the state machine.
The returned `StreamingService` is loaded into the BaseApp using the BaseApp's
`SetStreamingService` method.

The `Stream` method is called on the service to begin the streaming process.
Depending on the implementation this process may be synchronous or asynchronous
with the message processing of the state machine.

```go
bApp.SetStreamingService(streamingService)
Expand Down

0 comments on commit 2c0d445

Please sign in to comment.