Skip to content

Commit

Permalink
improvements from upstream feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Ergels Gaxhaj committed Jan 30, 2023
1 parent a7f217e commit 3f523d8
Show file tree
Hide file tree
Showing 29 changed files with 341 additions and 319 deletions.
70 changes: 13 additions & 57 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,11 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the streaming service hook with the BeginBlock messages
for _, abciListener := range app.abciListeners {
for _, abciListener := range app.streamingManager.AbciListeners {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
if app.abciListenersAsync {
go func(req abci.RequestBeginBlock, res abci.ResponseBeginBlock) {
if err := abciListener.ListenBeginBlock(ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err)
}
}(req, res)
} else {
if err := abciListener.ListenBeginBlock(ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err)
if app.stopNodeOnABCIListenerErr {
plugin.CleanupClients()
os.Exit(1)
}
}
if err := abciListener.ListenBeginBlock(ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err)
}
}

Expand All @@ -230,23 +218,11 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
}

// call the streaming service hook with the EndBlock messages
for _, abciListener := range app.abciListeners {
for _, abciListener := range app.streamingManager.AbciListeners {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
if app.abciListenersAsync {
go func(req abci.RequestEndBlock, res abci.ResponseEndBlock) {
if err := abciListener.ListenEndBlock(ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err)
}
}(req, res)
} else {
if err := abciListener.ListenEndBlock(ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err)
if app.stopNodeOnABCIListenerErr {
plugin.CleanupClients()
os.Exit(1)
}
}
if err := abciListener.ListenEndBlock(ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err)
}
}

Expand Down Expand Up @@ -300,23 +276,11 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
var res abci.ResponseDeliverTx
defer func() {
// call the streaming service hook with the EndBlock messages
for _, abciListener := range app.abciListeners {
for _, abciListener := range app.streamingManager.AbciListeners {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
if app.abciListenersAsync {
go func(req abci.RequestDeliverTx, res abci.ResponseDeliverTx) {
if err := abciListener.ListenDeliverTx(ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err)
}
}(req, res)
} else {
if err := abciListener.ListenDeliverTx(ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err)
if app.stopNodeOnABCIListenerErr {
plugin.CleanupClients()
os.Exit(1)
}
}
if err := abciListener.ListenDeliverTx(ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err)
}
}
}()
Expand Down Expand Up @@ -369,25 +333,17 @@ func (app *BaseApp) Commit() abci.ResponseCommit {
}

// call the streaming service hook with the EndBlock messages
for _, abciListener := range app.abciListeners {
abciListeners := app.streamingManager.AbciListeners
if len(abciListeners) > 0 {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()
if app.abciListenersAsync {
go func(res abci.ResponseCommit) {
if err := abciListener.ListenCommit(ctx, res, changeSet); err != nil {
app.logger.Error("ListenCommit listening hook failed", "height", blockHeight, "err", err)
}
}(res)
} else {
for _, abciListener := range abciListeners {
if err := abciListener.ListenCommit(ctx, res, changeSet); err != nil {
app.logger.Error("ListenCommit listening hook failed", "height", blockHeight, "err", err)
if app.stopNodeOnABCIListenerErr {
plugin.CleanupClients()
os.Exit(1)
}
}
}
changeSet = nil
}

app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))
Expand Down
12 changes: 2 additions & 10 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,8 @@ type BaseApp struct { // nolint: maligned
// which informs Tendermint what to index. If empty, all events will be indexed.
indexEvents map[string]struct{}

// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener

// abciListenersAsync for determining if abciListeners will run asynchronously.
abciListenersAsync bool

// stopNodeOnABCIListenerErr halts the node when ABCI streaming service listening results in an error.
// stopNodeOnABCIListenerErr=true MUST be paired with abciListenersAsync=false, otherwise it will be ignored.
stopNodeOnABCIListenerErr bool
// streamingManager for managing instances and configuration of ABCIListener services
streamingManager storetypes.StreamingManager

feeHandler sdk.FeeHandler

Expand Down
79 changes: 79 additions & 0 deletions baseapp/baseapp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2265,3 +2265,82 @@ func TestDefaultStoreLoader(t *testing.T) {
require.Equal(t, storetypes.StoreTypeIAVL, store.GetStoreType())
}
}

var (
distKey1 = storetypes.NewKVStoreKey("distKey1")
)

func TestABCI_MultiListener_StateChanges(t *testing.T) {
// increment the tx counter
anteKey := []byte("ante-key")
anteOpt := func(bapp *BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, anteKey)) }

// increment the msg counter
deliverKey := []byte("deliver-key")
deliverKey2 := []byte("deliver-key2")
routerOpt := func(bapp *BaseApp) {
r1 := sdk.NewRoute(routeMsgCounter, handlerMsgCounter(t, capKey1, deliverKey))
r2 := sdk.NewRoute(routeMsgCounter2, handlerMsgCounter(t, capKey1, deliverKey2))
bapp.Router().AddRoute(r1)
bapp.Router().AddRoute(r2)
}

distOpt := func(bapp *BaseApp) { bapp.MountStores(distKey1) }
mockListener1 := NewMockABCIListener("lis_1")
mockListener2 := NewMockABCIListener("lis_2")
streamingManager := storetypes.StreamingManager{AbciListeners: []storetypes.ABCIListener{&mockListener1, &mockListener2}}
streamingManagerOpt := func(bapp *BaseApp) { bapp.SetStreamingManager(streamingManager) }
addListenerOpt := func(bapp *BaseApp) { bapp.CommitMultiStore().AddListeners([]storetypes.StoreKey{distKey1}) }

app := setupBaseApp(t, anteOpt, routerOpt, distOpt, streamingManagerOpt, addListenerOpt)
app.InitChain(abci.RequestInitChain{})

// Create same codec used in txDecoder
codec := codec.NewLegacyAmino()
registerTestCodec(codec)

nBlocks := 3
txPerHeight := 5

for blockN := 0; blockN < nBlocks; blockN++ {
header := tmproto.Header{Height: int64(blockN) + 1}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
var expectedChangeSet []*storetypes.StoreKVPair

for i := 0; i < txPerHeight; i++ {
counter := int64(blockN*txPerHeight + i)
tx := newTxCounter(counter, counter)

txBytes, err := codec.Marshal(tx)
require.NoError(t, err)

sKey := []byte(fmt.Sprintf("distKey%d", i))
sVal := []byte(fmt.Sprintf("distVal%d", i))

ctx := app.getState(runTxModeDeliver).ctx
store := ctx.KVStore(distKey1)
store.Set(sKey, sVal)

expectedChangeSet = append(expectedChangeSet, &storetypes.StoreKVPair{
StoreKey: distKey1.Name(),
Delete: false,
Key: sKey,
Value: sVal,
})

res := app.DeliverTx(abci.RequestDeliverTx{Tx: txBytes})
require.True(t, res.IsOK(), fmt.Sprintf("%v", res))

events := res.GetEvents()
require.Len(t, events, 3, "should contain ante handler, message type and counter events respectively")
require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0], events[0], "ante handler event")
require.Equal(t, sdk.MarkEventsToIndex(counterEvent(sdk.EventTypeMessage, counter).ToABCIEvents(), map[string]struct{}{})[0], events[2], "msg handler update counter event")
}

app.EndBlock(abci.RequestEndBlock{})
app.Commit()

require.Equal(t, expectedChangeSet, mockListener1.ChangeSet, "should contain the same changeSet")
require.Equal(t, expectedChangeSet, mockListener2.ChangeSet, "should contain the same changeSet")
}
}
13 changes: 6 additions & 7 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"fmt"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"io"

abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -234,13 +235,6 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
app.msgServiceRouter.SetInterfaceRegistry(registry)
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s ABCIListener) {
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
}

// SetQueryMultiStore set a alternative MultiStore implementation to support grpc query service.
//
// Ref: https://github.com/cosmos/cosmos-sdk/issues/13317
Expand Down Expand Up @@ -268,3 +262,8 @@ func (app *BaseApp) SetAggregateEventsFunc(aggregateEventsFunc func(resultEvents

app.aggregateEventsFunc = aggregateEventsFunc
}

// SetStreamingManager sets the streaming manager for the BaseApp.test
func (app *BaseApp) SetStreamingManager(manager storetypes.StreamingManager) {
app.streamingManager = manager
}
45 changes: 13 additions & 32 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,32 @@
package baseapp

import (
"context"
"fmt"
"sort"

"github.com/spf13/cast"

abci "github.com/tendermint/tendermint/abci/types"

servertypes "github.com/cosmos/cosmos-sdk/server/types"
store "github.com/cosmos/cosmos-sdk/store/types"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
)

// ABCIListener interface is used to hook into the ABCI message processing of the BaseApp.
// the error results are propagated to consensus state machine,
// if you don't want to affect consensus, handle the errors internally and always return `nil` in these APIs.
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// ListenCommit updates the steaming service with the latest Commit messages and state changes
ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error
}

const (
StreamingTomlKey = "streaming"
StreamingABCITomlKey = "abci"
StreamingABCIPluginTomlKey = "plugin"
StreamingABCIKeysTomlKey = "keys"
StreamingABCIStopNodeOnErrTomlKey = "stop-node-on-err"
StreamingABCIAsync = "async"
)

// RegisterStreamingPlugin registers streaming plugins with the App.
func RegisterStreamingPlugin(
bApp *BaseApp,
appOpts servertypes.AppOptions,
keys map[string]*store.KVStoreKey,
keys map[string]*storetypes.KVStoreKey,
streamingPlugin interface{},
) error {
switch t := streamingPlugin.(type) {
case ABCIListener:
case storetypes.ABCIListener:
registerABCIListenerPlugin(bApp, appOpts, keys, t)
default:
return fmt.Errorf("unexpected plugin type %T", t)
Expand All @@ -55,20 +37,19 @@ func RegisterStreamingPlugin(
func registerABCIListenerPlugin(
bApp *BaseApp,
appOpts servertypes.AppOptions,
keys map[string]*store.KVStoreKey,
abciListener ABCIListener,
keys map[string]*storetypes.KVStoreKey,
abciListener storetypes.ABCIListener,
) {
asyncKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIAsync)
async := cast.ToBool(appOpts.Get(asyncKey))
stopNodeOnErrKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIStopNodeOnErrTomlKey)
stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey))
keysKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIKeysTomlKey)
exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey))
exposedKeys := exposeStoreKeysSorted(exposeKeysStr, keys)
bApp.cms.AddListeners(exposedKeys)
bApp.SetStreamingService(abciListener)
bApp.stopNodeOnABCIListenerErr = stopNodeOnErr
bApp.abciListenersAsync = async
bApp.streamingManager = storetypes.StreamingManager{
AbciListeners: []storetypes.ABCIListener{abciListener},
StopNodeOnErr: stopNodeOnErr,
}
}

func exposeAll(list []string) bool {
Expand All @@ -80,15 +61,15 @@ func exposeAll(list []string) bool {
return false
}

func exposeStoreKeysSorted(keysStr []string, keys map[string]*store.KVStoreKey) []store.StoreKey {
var exposeStoreKeys []store.StoreKey
func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStoreKey) []storetypes.StoreKey {
var exposeStoreKeys []storetypes.StoreKey
if exposeAll(keysStr) {
exposeStoreKeys = make([]store.StoreKey, 0, len(keys))
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keys))
for key := range keys {
exposeStoreKeys = append(exposeStoreKeys, keys[key])
}
} else {
exposeStoreKeys = make([]store.StoreKey, 0, len(keysStr))
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keysStr))
for _, keyStr := range keysStr {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
Expand Down
38 changes: 38 additions & 0 deletions baseapp/streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package baseapp

import (
"context"

abci "github.com/tendermint/tendermint/abci/types"

storetypes "github.com/cosmos/cosmos-sdk/store/types"
)

type mockABCIListener struct {
name string
ChangeSet []*storetypes.StoreKVPair
}

func NewMockABCIListener(name string) mockABCIListener {
return mockABCIListener{
name: name,
ChangeSet: make([]*storetypes.StoreKVPair, 0),
}
}

func (m mockABCIListener) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
return nil
}

func (m mockABCIListener) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
return nil
}

func (m mockABCIListener) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
return nil
}

func (m *mockABCIListener) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*storetypes.StoreKVPair) error {
m.ChangeSet = changeSet
return nil
}
2 changes: 0 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ type (
ABCIListenerConfig struct {
Keys []string `mapstructure:"keys"`
Plugin string `mapstructure:"plugin"`
Async bool `mapstructure:"async"`
StopNodeOnErr bool `mapstructure:"stop-node-on-err"`
}
)
Expand Down Expand Up @@ -358,7 +357,6 @@ func DefaultConfig() *Config {
Streaming: StreamingConfig{
ABCI: ABCIListenerConfig{
Keys: []string{"*"},
Async: false,
StopNodeOnErr: true,
},
},
Expand Down
Loading

0 comments on commit 3f523d8

Please sign in to comment.