diff --git a/events/publish.go b/events/publish.go index f3b3ac52ba..e02cb2a634 100644 --- a/events/publish.go +++ b/events/publish.go @@ -15,23 +15,17 @@ import ( "golang.org/x/sync/errgroup" ) -// Publish publishes events along tm buses to clients +// Publish events using tm buses to clients. Waits on context +// shutdown signals to exit. func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus pubsub.Bus) error { const ( queuesz = 100 ) var ( - txname = name + "-tx" blkname = name + "-blk" ) - txch, err := tmbus.Subscribe(ctx, txname, txQuery().String(), queuesz) - if err != nil { - return err - } - defer tmbus.UnsubscribeAll(ctx, txname) - blkch, err := tmbus.Subscribe(ctx, blkname, blkQuery().String(), queuesz) if err != nil { return err @@ -40,9 +34,6 @@ func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - return publishEvents(ctx, txch, bus) - }) g.Go(func() error { return publishEvents(ctx, blkch, bus) }) diff --git a/events/query.go b/events/query.go index 249b842eaa..52dca417f3 100644 --- a/events/query.go +++ b/events/query.go @@ -8,11 +8,6 @@ import ( tmtypes "github.com/tendermint/tendermint/types" ) -func txQuery() pubsub.Query { - return tmquery.MustParse( - fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx)) -} - func blkQuery() pubsub.Query { return tmquery.MustParse( fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader)) diff --git a/provider/cmd/run.go b/provider/cmd/run.go index ce639e8662..f4553a80d6 100644 --- a/provider/cmd/run.go +++ b/provider/cmd/run.go @@ -47,6 +47,7 @@ func runCmd(cdc *codec.Codec) *cobra.Command { return cmd } +// doRunCmd initializes all of the Provider functionality, hangs, and awaits shutdown signals. func doRunCmd(ctx context.Context, cdc *codec.Codec, cmd *cobra.Command, _ []string) error { cctx := ccontext.NewCLIContext().WithCodec(cdc) @@ -81,6 +82,7 @@ func doRunCmd(ctx context.Context, cdc *codec.Codec, cmd *cobra.Command, _ []str return err } + // k8s client creation cclient, err := createClusterClient(log, cmd, pinfo.HostURI) if err != nil { return err @@ -117,6 +119,7 @@ func doRunCmd(ctx context.Context, cdc *codec.Codec, cmd *cobra.Command, _ []str func createClusterClient(log log.Logger, cmd *cobra.Command, host string) (cluster.Client, error) { if val, _ := cmd.Flags().GetBool(flagClusterK8s); !val { + // Condition that there is no Kubernetes API to work with. return cluster.NullClient(), nil } ns, err := cmd.Flags().GetString(flagK8sManifestNS)