Skip to content

Commit

Permalink
fix: state listener observe writes at wrong time (backport cosmos#13516
Browse files Browse the repository at this point in the history
…) (cosmos#14138)

* fix: state listener observe writes at wrong time (cosmos#13516)

* fix: state listener observe writes at wrong time

Closes: cosmos#13457

Currently state listener is notified when the cache store write, which happens in commit event only, which breaks the current design.
The solution (as discussed in the issue) is to listen state writes on rootmulti store only.

It also changes the file streamer to output single data file for the writes in the whole block, since we can't distinguish writes from different stage of abci events.

It adds new config items for file streamer:
- streamers.file.output-metadata
- streamers.file.stop-node-on-error
- streamers.file.fsync

* synchronous abci call, and format doc

* fix comment

* update file streamer readme and fix typos

* typo

* fix: state listener observe writes at wrong time

Closes: cosmos#13457

Currently state listener is notified when the cache store write, which happens in commit event only, which breaks the current design.
The solution (as discussed in the issue) is to listen state writes on rootmulti store only.

It also changes the file streamer to output single data file for the writes in the whole block, since we can't distinguish writes from different stage of abci events.

It adds new config items for file streamer:
- streamers.file.output-metadata
- streamers.file.stop-node-on-error
- streamers.file.fsync

synchronous abci call, and format doc

fix comment

update file streamer readme and fix typos

typo

* improve UX of file streamer, make it immediately usable after enabled

- set default value to write_dir.
- make write_dir based on home directory by default.
- auto-create the directory if not exists.

* get homePage from opts

Co-authored-by: Marko <[email protected]>
(cherry picked from commit 1f91ee2)

# Conflicts:
#	CHANGELOG.md
#	api/cosmos/base/store/v1beta1/listening.pulsar.go
#	baseapp/streaming.go
#	docs/architecture/adr-038-state-listening.md
#	server/config/toml.go
#	simapp/app_v2.go
#	store/cachemulti/store.go
#	store/iavl/store.go
#	store/mem/store.go
#	store/streaming/constructor.go
#	store/streaming/file/service.go
#	store/streaming/file/service_test.go
#	store/types/listening.pb.go
#	store/types/store.go

* `make proto-gen`, update changelog, delete uncessary files

* fix conflicts

* fix conflicts

* revert api breaking change

* fix build

* fix unit test

Co-authored-by: yihuang <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
Co-authored-by: Marko <[email protected]>
  • Loading branch information
4 people authored and JeancarloBarrios committed Sep 28, 2024
1 parent f36f4ed commit 2a2797b
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 179 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,20 @@ Ref: https://keepachangelog.com/en/1.0.0/

### API Breaking Changes

* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Update State Streaming APIs:
* Add method `ListenCommit` to `ABCIListener`
* Move `ListeningEnabled` and `AddListener` methods to `CommitMultiStore`
* Remove `CacheWrapWithListeners` from `CacheWrap` and `CacheWrapper` interfaces
* Remove listening APIs from the caching layer (it should only listen to the `rootmulti.Store`)
* Add three new options to file streaming service constructor.
* Modify `ABCIListener` such that any error from any method will always halt the app via `panic`
* (store) [#13529](https://github.com/cosmos/cosmos-sdk/pull/13529) Add method `LatestVersion` to `MultiStore` interface, add method `SetQueryMultiStore` to baesapp to support alternative `MultiStore` implementation for query service.

### Bug Fixes

* (baseapp) [#13983](https://github.com/cosmos/cosmos-sdk/pull/13983) Don't emit duplicate ante-handler events when a post-handler is defined.
* (baseapp) [#14049](https://github.com/cosmos/cosmos-sdk/pull/14049) Fix state sync when interval is zero.
* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Fix state listener that was observing writes at wrong time.

## [v0.46.6](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.46.6) - 2022-11-18

Expand Down
18 changes: 5 additions & 13 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg

// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
goCtx := sdk.WrapSDKContext(app.deliverState.ctx)
if err := streamingListener.ListenBeginBlock(goCtx, req, res); err != nil {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
panic(fmt.Errorf("BeginBlock listening hook failed, height: %d, err: %w", req.Header.Height, err))
}
}
Expand Down Expand Up @@ -260,8 +259,7 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc

// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
goCtx := sdk.WrapSDKContext(app.deliverState.ctx)
if err := streamingListener.ListenEndBlock(goCtx, req, res); err != nil {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
panic(fmt.Errorf("EndBlock listening hook failed, height: %d, err: %w", req.Height, err))
}
}
Expand Down Expand Up @@ -317,7 +315,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
defer func() {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
panic(fmt.Errorf("DeliverTx listening hook failed: %w", err))
}
}
}()
Expand Down Expand Up @@ -354,8 +352,6 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() abci.ResponseCommit {
defer telemetry.MeasureSince(time.Now(), "abci", "commit")

header := app.deliverState.ctx.BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

Expand All @@ -372,8 +368,7 @@ func (app *BaseApp) Commit() abci.ResponseCommit {

// call the hooks with the Commit message
for _, streamingListener := range app.abciListeners {
goCtx := sdk.WrapSDKContext(app.deliverState.ctx)
if err := streamingListener.ListenCommit(goCtx, res); err != nil {
if err := streamingListener.ListenCommit(app.deliverState.ctx, res); err != nil {
panic(fmt.Errorf("Commit listening hook failed, height: %d, err: %w", header.Height, err))
}
}
Expand Down Expand Up @@ -407,10 +402,7 @@ func (app *BaseApp) Commit() abci.ResponseCommit {
app.halt()
}

var snapshotHeight int64
if app.snapshotInterval > 0 && uint64(header.Height)%app.snapshotInterval == 0 {
snapshotHeight = header.Height
}
go app.snapshotManager.SnapshotIfApplicable(header.Height)

return res
}
Expand Down
90 changes: 40 additions & 50 deletions docs/architecture/adr-038-state-listening.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,8 @@ func NewMemoryListener() *MemoryListener {
return &MemoryListener{}
}

// OnWrite writes state change events to the internal cache
func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) {
fl.stateCache = append(fl.stateCache, StoreKVPair{
StoreKey: storeKey.Name(),
Delete: delete,
Key: key,
Value: value,
})
}
We will create two concrete implementations of the `WriteListener` interface in `store/types/listening.go`, that writes out protobuf
encoded KV pairs to an underlying `io.Writer`, and simply accumulate them in memory.

We will create two concrete implementations of the `WriteListener` interface in `store/types/listening.go`, that writes out protobuf
encoded KV pairs to an underlying `io.Writer`, and simply accumulate them in memory.
Expand Down Expand Up @@ -301,14 +294,8 @@ func (app *BaseApp) SetStreamingService(s StreamingService) {
}
```

Implementing the service above:

```go
// streaming/plugins/abci/{plugin_version}/grpc.go
var (
_ baseapp.ABCIListener = (*GRPCClient)(nil)
)
We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any streaming service hooks registered
with the `BaseApp`.

defer func() {
// call the hooks with the BeginBlock messages
Expand All @@ -325,11 +312,14 @@ func (m *GRPCClient) ListenFinalizeBlock(goCtx context.Context, req abci.Request
return err
}

func (m *GRPCClient) ListenCommit(goCtx context.Context, res abci.ResponseCommit, changeSet []store.StoreKVPair) error {
ctx := sdk.UnwrapSDKContext(goCtx)
_, err := m.client.ListenCommit(ctx, &ListenCommitRequest{BlockHeight: ctx.BlockHeight(), Res: res, ChangeSet: changeSet})
return err
}
defer func() {
// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
panic(sdkerrors.Wrapf(err, "BeginBlock listening hook failed, height: %d", req.Header.Height))
}
}
}()

// GRPCServer is the gRPC server that GRPCClient talks to.
type GRPCServer struct {
Expand Down Expand Up @@ -362,12 +352,37 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
}
}()
And the pre-compiled Go plugin `Impl`(*this is only used for plugins that are written in Go*):
defer func() {
// Call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
panic(sdkerrors.Wrapf(err, "EndBlock listening hook failed, height: %d", req.Height))
}
}
}()
return res
}
```

```go
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
defer func() {
// call the hooks with the DeliverTx messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
panic(sdkerrors.Wrap(err, "DeliverTx listening hook failed"))
}
}
}()
...
app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))
...
}
```golang
func (app *BaseApp) Commit() abci.ResponseCommit {
header := app.deliverState.ctx.BlockHeader()
Expand All @@ -394,6 +409,7 @@ func (app *BaseApp) Commit() abci.ResponseCommit {
app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))
...
}
```

#### Error Handling And Async Consumers

Expand Down Expand Up @@ -477,33 +493,7 @@ func NewSimApp(
baseAppOptions ...func(*baseapp.BaseApp),
) *SimApp {

...
keys := sdk.NewKVStoreKeys(
authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey,
minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey,
govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey,
evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey,
)
...
// register streaming services
streamingCfg := cast.ToStringMap(appOpts.Get(baseapp.StreamingTomlKey))
for service := range streamingCfg {
pluginKey := fmt.Sprintf("%s.%s.%s", baseapp.StreamingTomlKey, service, baseapp.StreamingPluginTomlKey)
pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey)))
if len(pluginName) > 0 {
logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel))
plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel)
if err != nil {
tmos.Exit(err.Error())
}
if err := baseapp.RegisterStreamingPlugin(bApp, appOpts, keys, plugin); err != nil {
tmos.Exit(err.Error())
}
}
}
...

keys := sdk.NewKVStoreKeys(
authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey,
Expand Down
17 changes: 16 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ type (
Keys []string `mapstructure:"keys"`
WriteDir string `mapstructure:"write_dir"`
Prefix string `mapstructure:"prefix"`
// OutputMetadata specifies if output the block metadata file which includes
// the abci requests/responses, otherwise only the data file is outputted.
OutputMetadata bool `mapstructure:"output-metadata"`
// StopNodeOnError specifies if propagate the streamer errors to the consensus
// state machine, it's nesserary for data integrity of output.
StopNodeOnError bool `mapstructure:"stop-node-on-error"`
// Fsync specifies if calling fsync after writing the files, it slows down
// the commit, but don't lose data in face of system crash.
Fsync bool `mapstructure:"fsync"`
}
)

Expand Down Expand Up @@ -251,7 +260,13 @@ func DefaultConfig() *Config {
},
Streamers: StreamersConfig{
File: FileStreamerConfig{
Keys: []string{"*"},
Keys: []string{"*"},
WriteDir: "data/file_streamer",
OutputMetadata: true,
StopNodeOnError: true,
// NOTICE: the default config don't protect the streamer data integrity
// in face of system crash.
Fsync: false,
},
},
}
Expand Down
7 changes: 7 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
# output-metadata specifies if output the metadata file which includes the abci request/responses
# during processing the block.
output-metadata = "{{ .Streamers.File.OutputMetadata }}"
# stop-node-on-error specifies if propagate the file streamer errors to consensus state machine.
stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}"
# fsync specifies if call fsync after writing the files.
fsync = "{{ .Streamers.File.Fsync }}"
`

var configTemplate *template.Template
Expand Down
1 change: 0 additions & 1 deletion store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/internal/conv"
"github.com/cosmos/cosmos-sdk/store/cachekv/internal"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
Expand Down
23 changes: 1 addition & 22 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func NewFromKVStore(
store types.KVStore, stores map[types.StoreKey]types.CacheWrapper,
keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext,
) Store {
if listeners == nil {
listeners = make(map[types.StoreKey][]types.WriteListener)
}
cms := Store{
db: cachekv.NewStore(store),
stores: make(map[types.StoreKey]types.CacheWrap, len(stores)),
Expand Down Expand Up @@ -81,8 +78,7 @@ func newCacheMultiStoreFromCMS(cms Store) Store {
stores[k] = v
}

// don't pass listeners to nested cache store.
return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, nil)
return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext)
}

// SetTracer sets the tracer for the MultiStore that the underlying
Expand Down Expand Up @@ -113,23 +109,6 @@ func (cms Store) TracingEnabled() bool {
return cms.traceWriter != nil
}

// AddListeners adds listeners for a specific KVStore
func (cms Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) {
if ls, ok := cms.listeners[key]; ok {
cms.listeners[key] = append(ls, listeners...)
} else {
cms.listeners[key] = listeners
}
}

// ListeningEnabled returns if listening is enabled for a specific KVStore
func (cms Store) ListeningEnabled(key types.StoreKey) bool {
if ls, ok := cms.listeners[key]; ok {
return len(ls) != 0
}
return false
}

// LatestVersion returns the branch version of the store
func (cms Store) LatestVersion() int64 {
panic("cannot get latest version from branch cached multi-store")
Expand Down
14 changes: 12 additions & 2 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,10 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {

// If it was deleted, remove all data
if upgrades.IsDeleted(key.Name()) {
deleteKVStore(store.(types.KVStore))
if err := deleteKVStore(types.KVStore(store)); err != nil {
return errors.Wrapf(err, "failed to delete store %s", key.Name())
}
rs.removalMap[key] = true
} else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" {
// handle renames specially
// make an unregistered key to satisfy loadCommitStore params
Expand All @@ -257,7 +260,14 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
}

// move all data
moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore))
if err := moveKVStoreData(types.KVStore(oldStore), types.KVStore(store)); err != nil {
return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name())
}

// add the old key so its deletion is committed
newStores[oldKey] = oldStore
// this will ensure it's not perpetually stored in commitInfo
rs.removalMap[oldKey] = true
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/streaming/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewFileStreamingService(
fileDir = path.Join(homePath, fileDir)
}

// try to create output directory if it does not exist
// try to create output directory if not exists.
if _, err := os.Stat(fileDir); os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 2a2797b

Please sign in to comment.