Skip to content

Commit

Permalink
add gcp pubsub write support
Browse files Browse the repository at this point in the history
  • Loading branch information
dselans committed Aug 2, 2020
1 parent 034802b commit 4417516
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 9 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ plumber
=======

plumber is a CLI devtool for inspecting, piping, massaging and redirecting data
in message systems like Kafka, RabbitMQ , Amazon SQS and
in message systems like Kafka, RabbitMQ , GCP PubSub and
[many more](#supported-messaging-systems). \[1]

The tool enables you to:
Expand Down Expand Up @@ -143,6 +143,7 @@ We wanted a swiss army knife type of tool for working with messaging systems

* Kafka
* RabbitMQ
* Google Cloud Platform PubSub
* Amazon SQS (coming soon)
* NATS (coming soon)
* ActiveMQ (coming soon)
Expand Down
8 changes: 8 additions & 0 deletions backends/gcp-pubsub/gcp-pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
type Options struct {
ProjectId string
SubscriptionId string
TopicId string
OutputType string
ProtobufDir string
ProtobufRootMessage string
Follow bool
Convert string
LineNumbers bool
Ack bool
InputData string
InputFile string
InputType string
}

type GCPPubSub struct {
Expand All @@ -33,10 +37,14 @@ func parseOptions(c *cli.Context) (*Options, error) {
return &Options{
ProjectId: c.String("project-id"),
SubscriptionId: c.String("sub-id"),
TopicId: c.String("topic-id"),
OutputType: c.String("output-type"),
Convert: c.String("convert"),
ProtobufDir: c.String("protobuf-dir"),
ProtobufRootMessage: c.String("protobuf-root-message"),
InputData: c.String("input-data"),
InputFile: c.String("input-file"),
InputType: c.String("input-type"),
Follow: c.Bool("follow"),
LineNumbers: c.Bool("line-numbers"),
Ack: c.Bool("ack"),
Expand Down
180 changes: 180 additions & 0 deletions backends/gcp-pubsub/write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package gcppubsub

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"

"cloud.google.com/go/pubsub"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

"github.com/batchcorp/plumber/pb"
)

// Write is the entry point function for performing write operations in GCP PubSub.
//
// This is where we verify that the passed args and flags combo makes sense,
// attempt to establish a connection, parse protobuf before finally attempting
// to perform the write.
func Write(c *cli.Context) error {
opts, err := parseOptions(c)
if err != nil {
return errors.Wrap(err, "unable to parse options")
}

if err := validateWriteOptions(opts); err != nil {
return errors.Wrap(err, "unable to validate read options")
}

var mdErr error
var md *desc.MessageDescriptor

if opts.OutputType == "protobuf" {
md, mdErr = pb.FindMessageDescriptor(opts.ProtobufDir, opts.ProtobufRootMessage)
if mdErr != nil {
return errors.Wrap(mdErr, "unable to find root message descriptor")
}
}

client, err := NewClient(opts)
if err != nil {
return errors.Wrap(err, "unable to create client")
}

msg, err := generateWriteValue(md, opts)
if err != nil {
return errors.Wrap(err, "unable to generate write value")
}

g := &GCPPubSub{
Options: opts,
MsgDesc: md,
Client: client,
log: logrus.WithField("pkg", "rabbitmq/read.go"),
}

return g.Write(context.Background(), msg)
}

// Write is a wrapper for amqp Publish method. We wrap it so that we can mock
// it in tests, add logging etc.
func (g *GCPPubSub) Write(ctx context.Context, value []byte) error {
t := g.Client.Topic(g.Options.TopicId)

result := t.Publish(ctx, &pubsub.Message{
Data: value,
})

// Block until the result is returned and a server-generated
// ID is returned for the published message.
_, err := result.Get(ctx)
if err != nil {
return errors.Wrap(err, "unable to ensure that message was published")
}

return nil
}

func validateWriteOptions(opts *Options) error {
// If output-type is protobuf, ensure that protobuf flags are set
// If type is protobuf, ensure both --protobuf-dir and --protobuf-root-message
// are set as well
if opts.OutputType == "protobuf" {
if opts.ProtobufDir == "" {
return errors.New("'--protobuf-dir' must be set when type " +
"is set to 'protobuf'")
}

if opts.ProtobufRootMessage == "" {
return errors.New("'--protobuf-root-message' must be when " +
"type is set to 'protobuf'")
}

// Does given dir exist?
if _, err := os.Stat(opts.ProtobufDir); os.IsNotExist(err) {
return fmt.Errorf("--protobuf-dir '%s' does not exist", opts.ProtobufDir)
}
}

// InputData and file cannot be set at the same time
if opts.InputData != "" && opts.InputFile != "" {
return fmt.Errorf("--input-data and --input-file cannot both be set (choose one!)")
}

if opts.InputFile != "" {
if _, err := os.Stat(opts.InputFile); os.IsNotExist(err) {
return fmt.Errorf("--input-file '%s' does not exist", opts.InputFile)
}
}

return nil
}

func generateWriteValue(md *desc.MessageDescriptor, opts *Options) ([]byte, error) {
// Do we read value or file?
var data []byte

if opts.InputData != "" {
data = []byte(opts.InputData)
} else if opts.InputFile != "" {
var readErr error

data, readErr = ioutil.ReadFile(opts.InputFile)
if readErr != nil {
return nil, fmt.Errorf("unable to read file '%s': %s", opts.InputFile, readErr)
}
}

// Ensure we do not try to operate on a nil md
if opts.OutputType == "protobuf" && md == nil {
return nil, errors.New("message descriptor cannot be nil when --output-type is protobuf")
}

// Input: Plain Output: Plain
if opts.InputType == "plain" && opts.OutputType == "plain" {
return data, nil
}

// Input: JSONPB Output: Protobuf
if opts.InputType == "jsonpb" && opts.OutputType == "protobuf" {
var convertErr error

data, convertErr = convertJSONPBToProtobuf(data, dynamic.NewMessage(md))
if convertErr != nil {
return nil, errors.Wrap(convertErr, "unable to convert JSONPB to protobuf")
}

return data, nil
}

// TODO: Input: Base64 Output: Plain
// TODO: Input: Base64 Output: Protobuf
// TODO: And a few more combinations ...

return nil, errors.New("unsupported input/output combination")
}

// Convert jsonpb -> protobuf -> bytes
func convertJSONPBToProtobuf(data []byte, m *dynamic.Message) ([]byte, error) {
buf := bytes.NewBuffer(data)

if err := jsonpb.Unmarshal(buf, m); err != nil {
return nil, errors.Wrap(err, "unable to unmarshal data into dynamic message")
}

// Now let's encode that into a proper protobuf message
pbBytes, err := proto.Marshal(m)
if err != nil {
return nil, errors.Wrap(err, "unable to marshal dynamic protobuf message to bytes")
}

return pbBytes, nil
}
65 changes: 57 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func setupCLI() *cli.App {
},
}

gcpPubSubFlags := []cli.Flag{
&cli.StringFlag{
Name: "project-id",
Usage: "Project Id",
Required: true,
},
}

globalFlags := []cli.Flag{
&cli.BoolFlag{
Name: "debug",
Expand Down Expand Up @@ -221,14 +229,11 @@ func setupCLI() *cli.App {
Name: "gcp-pubsub",
Usage: "Google Cloud Platform PubSub",
Action: gcppubsub.Read,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "project-id",
Usage: "Project Id",
},
Flags: append(gcpPubSubFlags, []cli.Flag{
&cli.StringFlag{
Name: "sub-id",
Usage: "Subscription Id",
Name: "sub-id",
Usage: "Subscription Id",
Required: true,
},
&cli.StringFlag{
Name: "protobuf-dir",
Expand Down Expand Up @@ -267,7 +272,7 @@ func setupCLI() *cli.App {
Default: "",
},
},
},
}...),
},
},
},
Expand Down Expand Up @@ -370,6 +375,50 @@ func setupCLI() *cli.App {
},
}...),
},
{
Name: "gcp-pubsub",
Usage: "Google Cloud Platform PubSub",
Action: gcppubsub.Write,
Flags: append(gcpPubSubFlags, []cli.Flag{
&cli.StringFlag{
Name: "topic-id",
Usage: "Topic Id",
Required: true,
},
&cli.StringFlag{
Name: "input-data",
Usage: "The data to write to rabbitmq",
},
&cli.StringFlag{
Name: "input-file",
Usage: "File containing input data (1 file = 1 message)",
},
&cli.GenericFlag{
Name: "input-type",
Usage: "Treat input data as this type to enable output conversion",
Value: &EnumValue{
Enum: []string{"plain", "base64", "jsonpb"},
Default: "plain",
},
},
&cli.GenericFlag{
Name: "output-type",
Usage: "Convert the input to this type when writing message",
Value: &EnumValue{
Enum: []string{"plain", "protobuf"},
Default: "plain",
},
},
&cli.StringFlag{
Name: "protobuf-dir",
Usage: "Directory with .proto files",
},
&cli.StringFlag{
Name: "protobuf-root-message",
Usage: "Specifies the root message in a protobuf descriptor set (required if protobuf-dir set)",
},
}...),
},
},
},
},
Expand Down

0 comments on commit 4417516

Please sign in to comment.