Skip to content

Commit

Permalink
Merge pull request #2 from batchcorp/dselans/gcp-pubsub-support
Browse files Browse the repository at this point in the history
Dselans/gcp pubsub support
  • Loading branch information
dselans authored Aug 2, 2020
2 parents 14a702c + 4417516 commit ceecc7b
Show file tree
Hide file tree
Showing 998 changed files with 270,455 additions and 101,210 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ plumber
=======

plumber is a CLI devtool for inspecting, piping, massaging and redirecting data
in message systems like Kafka, RabbitMQ , Amazon SQS and
in message systems like Kafka, RabbitMQ , GCP PubSub and
[many more](#supported-messaging-systems). \[1]

The tool enables you to:
Expand Down Expand Up @@ -143,6 +143,7 @@ We wanted a swiss army knife type of tool for working with messaging systems

* Kafka
* RabbitMQ
* Google Cloud Platform PubSub
* Amazon SQS (coming soon)
* NATS (coming soon)
* ActiveMQ (coming soon)
Expand Down
61 changes: 61 additions & 0 deletions backends/gcp-pubsub/gcp-pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package gcppubsub

import (
"context"

"cloud.google.com/go/pubsub"
"github.com/jhump/protoreflect/desc"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

type Options struct {
ProjectId string
SubscriptionId string
TopicId string
OutputType string
ProtobufDir string
ProtobufRootMessage string
Follow bool
Convert string
LineNumbers bool
Ack bool
InputData string
InputFile string
InputType string
}

type GCPPubSub struct {
Options *Options
MsgDesc *desc.MessageDescriptor
Client *pubsub.Client
log *logrus.Entry
}

func parseOptions(c *cli.Context) (*Options, error) {
return &Options{
ProjectId: c.String("project-id"),
SubscriptionId: c.String("sub-id"),
TopicId: c.String("topic-id"),
OutputType: c.String("output-type"),
Convert: c.String("convert"),
ProtobufDir: c.String("protobuf-dir"),
ProtobufRootMessage: c.String("protobuf-root-message"),
InputData: c.String("input-data"),
InputFile: c.String("input-file"),
InputType: c.String("input-type"),
Follow: c.Bool("follow"),
LineNumbers: c.Bool("line-numbers"),
Ack: c.Bool("ack"),
}, nil
}

func NewClient(opts *Options) (*pubsub.Client, error) {
c, err := pubsub.NewClient(context.Background(), opts.ProjectId)
if err != nil {
return nil, errors.Wrap(err, "unable to create new pubsub client")
}

return c, nil
}
164 changes: 164 additions & 0 deletions backends/gcp-pubsub/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package gcppubsub

import (
"context"
"encoding/base64"
"fmt"
"os"
"sync"

"cloud.google.com/go/pubsub"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

"github.com/batchcorp/plumber/pb"
"github.com/batchcorp/plumber/printer"
"github.com/batchcorp/plumber/util"
)

func Read(c *cli.Context) error {
opts, err := parseOptions(c)
if err != nil {
return errors.Wrap(err, "unable to parse options")
}

if err := validateReadOptions(opts); err != nil {
return errors.Wrap(err, "unable to validate read options")
}

var mdErr error
var md *desc.MessageDescriptor

if opts.OutputType == "protobuf" {
md, mdErr = pb.FindMessageDescriptor(opts.ProtobufDir, opts.ProtobufRootMessage)
if mdErr != nil {
return errors.Wrap(mdErr, "unable to find root message descriptor")
}
}

client, err := NewClient(opts)
if err != nil {
return errors.Wrap(err, "unable to create client")
}

r := &GCPPubSub{
Options: opts,
MsgDesc: md,
Client: client,
log: logrus.WithField("pkg", "rabbitmq/read.go"),
}

return r.Read()
}

func (g *GCPPubSub) Read() error {
g.log.Info("Listening for message(s) ...")

sub := g.Client.Subscription(g.Options.SubscriptionId)

// Receive launches several goroutines to exec func, need to use a mutex
var m sync.Mutex

lineNumber := 1

// Standard way to cancel Receive in gcp's pubsub
cctx, cancel := context.WithCancel(context.Background())

if err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
m.Lock()
defer m.Unlock()

if g.Options.Ack {
defer msg.Ack()
}

if g.Options.OutputType == "protobuf" {
decoded, err := pb.DecodeProtobufToJSON(dynamic.NewMessage(g.MsgDesc), msg.Data)
if err != nil {
if !g.Options.Follow {
printer.Error(fmt.Sprintf("unable to decode protobuf message: %s", err))
cancel()
return
}

// Continue running
printer.Error(fmt.Sprintf("unable to decode protobuf message: %s", err))
return
}

msg.Data = decoded
}

var data []byte
var convertErr error

switch g.Options.Convert {
case "base64":
_, convertErr = base64.StdEncoding.Decode(data, msg.Data)
case "gzip":
data, convertErr = util.Gunzip(msg.Data)
default:
data = msg.Data
}

if convertErr != nil {
if !g.Options.Follow {
printer.Error(fmt.Sprintf("unable to complete conversion for message: %s", convertErr))
cancel()
return
}

// Continue running
printer.Error(fmt.Sprintf("unable to complete conversion for message: %s", convertErr))
return
}

str := string(data)

if g.Options.LineNumbers {
str = fmt.Sprintf("%d: ", lineNumber) + str
lineNumber++
}

printer.Print(str)

if !g.Options.Follow {
cancel()
return
}
}); err != nil {
return errors.Wrap(err, "unable to complete msg receive")
}

g.log.Info("read completed")

return nil
}

func validateReadOptions(opts *Options) error {
if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" {
return errors.New("GOOGLE_APPLICATION_CREDENTIALS must be set")
}

if opts.OutputType == "protobuf" {
if opts.ProtobufDir == "" {
return errors.New("'--protobuf-dir' must be set when type " +
"is set to 'protobuf'")
}

if opts.ProtobufRootMessage == "" {
return errors.New("'--protobuf-root-message' must be when " +
"type is set to 'protobuf'")
}

// Does given dir exist?
if _, err := os.Stat(opts.ProtobufDir); os.IsNotExist(err) {
return fmt.Errorf("--protobuf-dir '%s' does not exist", opts.ProtobufDir)
}
}

return nil
}
Loading

0 comments on commit ceecc7b

Please sign in to comment.