Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: remove provider transaction event subscription #622

Merged
merged 1 commit into from
May 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
perf: remove provider transaction event subscription
* Deactivated events.Publish subscribes to the tendermint event bus
and re-published events to the rest of the provider/
modules.

fixes #606

Signed-off-by: Josh Roppo <[email protected]>
AkashJosh committed May 29, 2020
commit 10920b3b4490d45f381c3351dc159704be937542
13 changes: 2 additions & 11 deletions events/publish.go
Original file line number Diff line number Diff line change
@@ -15,23 +15,17 @@ import (
"golang.org/x/sync/errgroup"
)

// Publish publishes events along tm buses to clients
// Publish events using tm buses to clients. Waits on context
// shutdown signals to exit.
func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus pubsub.Bus) error {

const (
queuesz = 100
)
var (
txname = name + "-tx"
blkname = name + "-blk"
)

txch, err := tmbus.Subscribe(ctx, txname, txQuery().String(), queuesz)
Ropes marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
defer tmbus.UnsubscribeAll(ctx, txname)

blkch, err := tmbus.Subscribe(ctx, blkname, blkQuery().String(), queuesz)
if err != nil {
return err
@@ -40,9 +34,6 @@ func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return publishEvents(ctx, txch, bus)
})
g.Go(func() error {
return publishEvents(ctx, blkch, bus)
})
5 changes: 0 additions & 5 deletions events/query.go
Original file line number Diff line number Diff line change
@@ -8,11 +8,6 @@ import (
tmtypes "github.com/tendermint/tendermint/types"
)

func txQuery() pubsub.Query {
return tmquery.MustParse(
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx))
}

func blkQuery() pubsub.Query {
return tmquery.MustParse(
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader))
3 changes: 3 additions & 0 deletions provider/cmd/run.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ func runCmd(cdc *codec.Codec) *cobra.Command {
return cmd
}

// doRunCmd initializes all of the Provider functionality, hangs, and awaits shutdown signals.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments ++

func doRunCmd(ctx context.Context, cdc *codec.Codec, cmd *cobra.Command, _ []string) error {
cctx := ccontext.NewCLIContext().WithCodec(cdc)

@@ -81,6 +82,7 @@ func doRunCmd(ctx context.Context, cdc *codec.Codec, cmd *cobra.Command, _ []str
return err
}

// k8s client creation
cclient, err := createClusterClient(log, cmd, pinfo.HostURI)
if err != nil {
return err
@@ -117,6 +119,7 @@ func doRunCmd(ctx context.Context, cdc *codec.Codec, cmd *cobra.Command, _ []str

func createClusterClient(log log.Logger, cmd *cobra.Command, host string) (cluster.Client, error) {
if val, _ := cmd.Flags().GetBool(flagClusterK8s); !val {
// Condition that there is no Kubernetes API to work with.
return cluster.NullClient(), nil
}
ns, err := cmd.Flags().GetString(flagK8sManifestNS)