From 44175166a44e878cdb9802badd7be73d334c43d3 Mon Sep 17 00:00:00 2001 From: Daniel Selans Date: Sun, 2 Aug 2020 11:53:09 -0700 Subject: [PATCH] add gcp pubsub write support --- README.md | 3 +- backends/gcp-pubsub/gcp-pubsub.go | 8 ++ backends/gcp-pubsub/write.go | 180 ++++++++++++++++++++++++++++++ main.go | 65 +++++++++-- 4 files changed, 247 insertions(+), 9 deletions(-) create mode 100644 backends/gcp-pubsub/write.go diff --git a/README.md b/README.md index c4c4f184..02c9e067 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ plumber ======= plumber is a CLI devtool for inspecting, piping, massaging and redirecting data -in message systems like Kafka, RabbitMQ , Amazon SQS and +in message systems like Kafka, RabbitMQ , GCP PubSub and [many more](#supported-messaging-systems). \[1] The tool enables you to: @@ -143,6 +143,7 @@ We wanted a swiss army knife type of tool for working with messaging systems * Kafka * RabbitMQ +* Google Cloud Platform PubSub * Amazon SQS (coming soon) * NATS (coming soon) * ActiveMQ (coming soon) diff --git a/backends/gcp-pubsub/gcp-pubsub.go b/backends/gcp-pubsub/gcp-pubsub.go index dc103948..ddac0c4e 100644 --- a/backends/gcp-pubsub/gcp-pubsub.go +++ b/backends/gcp-pubsub/gcp-pubsub.go @@ -13,6 +13,7 @@ import ( type Options struct { ProjectId string SubscriptionId string + TopicId string OutputType string ProtobufDir string ProtobufRootMessage string @@ -20,6 +21,9 @@ type Options struct { Convert string LineNumbers bool Ack bool + InputData string + InputFile string + InputType string } type GCPPubSub struct { @@ -33,10 +37,14 @@ func parseOptions(c *cli.Context) (*Options, error) { return &Options{ ProjectId: c.String("project-id"), SubscriptionId: c.String("sub-id"), + TopicId: c.String("topic-id"), OutputType: c.String("output-type"), Convert: c.String("convert"), ProtobufDir: c.String("protobuf-dir"), ProtobufRootMessage: c.String("protobuf-root-message"), + InputData: c.String("input-data"), + InputFile: c.String("input-file"), + InputType: c.String("input-type"), Follow: c.Bool("follow"), LineNumbers: c.Bool("line-numbers"), Ack: c.Bool("ack"), diff --git a/backends/gcp-pubsub/write.go b/backends/gcp-pubsub/write.go new file mode 100644 index 00000000..b48ec4cd --- /dev/null +++ b/backends/gcp-pubsub/write.go @@ -0,0 +1,180 @@ +package gcppubsub + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + + "cloud.google.com/go/pubsub" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/dynamic" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" + + "github.com/batchcorp/plumber/pb" +) + +// Write is the entry point function for performing write operations in GCP PubSub. +// +// This is where we verify that the passed args and flags combo makes sense, +// attempt to establish a connection, parse protobuf before finally attempting +// to perform the write. +func Write(c *cli.Context) error { + opts, err := parseOptions(c) + if err != nil { + return errors.Wrap(err, "unable to parse options") + } + + if err := validateWriteOptions(opts); err != nil { + return errors.Wrap(err, "unable to validate read options") + } + + var mdErr error + var md *desc.MessageDescriptor + + if opts.OutputType == "protobuf" { + md, mdErr = pb.FindMessageDescriptor(opts.ProtobufDir, opts.ProtobufRootMessage) + if mdErr != nil { + return errors.Wrap(mdErr, "unable to find root message descriptor") + } + } + + client, err := NewClient(opts) + if err != nil { + return errors.Wrap(err, "unable to create client") + } + + msg, err := generateWriteValue(md, opts) + if err != nil { + return errors.Wrap(err, "unable to generate write value") + } + + g := &GCPPubSub{ + Options: opts, + MsgDesc: md, + Client: client, + log: logrus.WithField("pkg", "rabbitmq/read.go"), + } + + return g.Write(context.Background(), msg) +} + +// Write is a wrapper for amqp Publish method. We wrap it so that we can mock +// it in tests, add logging etc. +func (g *GCPPubSub) Write(ctx context.Context, value []byte) error { + t := g.Client.Topic(g.Options.TopicId) + + result := t.Publish(ctx, &pubsub.Message{ + Data: value, + }) + + // Block until the result is returned and a server-generated + // ID is returned for the published message. + _, err := result.Get(ctx) + if err != nil { + return errors.Wrap(err, "unable to ensure that message was published") + } + + return nil +} + +func validateWriteOptions(opts *Options) error { + // If output-type is protobuf, ensure that protobuf flags are set + // If type is protobuf, ensure both --protobuf-dir and --protobuf-root-message + // are set as well + if opts.OutputType == "protobuf" { + if opts.ProtobufDir == "" { + return errors.New("'--protobuf-dir' must be set when type " + + "is set to 'protobuf'") + } + + if opts.ProtobufRootMessage == "" { + return errors.New("'--protobuf-root-message' must be when " + + "type is set to 'protobuf'") + } + + // Does given dir exist? + if _, err := os.Stat(opts.ProtobufDir); os.IsNotExist(err) { + return fmt.Errorf("--protobuf-dir '%s' does not exist", opts.ProtobufDir) + } + } + + // InputData and file cannot be set at the same time + if opts.InputData != "" && opts.InputFile != "" { + return fmt.Errorf("--input-data and --input-file cannot both be set (choose one!)") + } + + if opts.InputFile != "" { + if _, err := os.Stat(opts.InputFile); os.IsNotExist(err) { + return fmt.Errorf("--input-file '%s' does not exist", opts.InputFile) + } + } + + return nil +} + +func generateWriteValue(md *desc.MessageDescriptor, opts *Options) ([]byte, error) { + // Do we read value or file? + var data []byte + + if opts.InputData != "" { + data = []byte(opts.InputData) + } else if opts.InputFile != "" { + var readErr error + + data, readErr = ioutil.ReadFile(opts.InputFile) + if readErr != nil { + return nil, fmt.Errorf("unable to read file '%s': %s", opts.InputFile, readErr) + } + } + + // Ensure we do not try to operate on a nil md + if opts.OutputType == "protobuf" && md == nil { + return nil, errors.New("message descriptor cannot be nil when --output-type is protobuf") + } + + // Input: Plain Output: Plain + if opts.InputType == "plain" && opts.OutputType == "plain" { + return data, nil + } + + // Input: JSONPB Output: Protobuf + if opts.InputType == "jsonpb" && opts.OutputType == "protobuf" { + var convertErr error + + data, convertErr = convertJSONPBToProtobuf(data, dynamic.NewMessage(md)) + if convertErr != nil { + return nil, errors.Wrap(convertErr, "unable to convert JSONPB to protobuf") + } + + return data, nil + } + + // TODO: Input: Base64 Output: Plain + // TODO: Input: Base64 Output: Protobuf + // TODO: And a few more combinations ... + + return nil, errors.New("unsupported input/output combination") +} + +// Convert jsonpb -> protobuf -> bytes +func convertJSONPBToProtobuf(data []byte, m *dynamic.Message) ([]byte, error) { + buf := bytes.NewBuffer(data) + + if err := jsonpb.Unmarshal(buf, m); err != nil { + return nil, errors.Wrap(err, "unable to unmarshal data into dynamic message") + } + + // Now let's encode that into a proper protobuf message + pbBytes, err := proto.Marshal(m) + if err != nil { + return nil, errors.Wrap(err, "unable to marshal dynamic protobuf message to bytes") + } + + return pbBytes, nil +} diff --git a/main.go b/main.go index 11bfc912..f8a09680 100644 --- a/main.go +++ b/main.go @@ -97,6 +97,14 @@ func setupCLI() *cli.App { }, } + gcpPubSubFlags := []cli.Flag{ + &cli.StringFlag{ + Name: "project-id", + Usage: "Project Id", + Required: true, + }, + } + globalFlags := []cli.Flag{ &cli.BoolFlag{ Name: "debug", @@ -221,14 +229,11 @@ func setupCLI() *cli.App { Name: "gcp-pubsub", Usage: "Google Cloud Platform PubSub", Action: gcppubsub.Read, - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "project-id", - Usage: "Project Id", - }, + Flags: append(gcpPubSubFlags, []cli.Flag{ &cli.StringFlag{ - Name: "sub-id", - Usage: "Subscription Id", + Name: "sub-id", + Usage: "Subscription Id", + Required: true, }, &cli.StringFlag{ Name: "protobuf-dir", @@ -267,7 +272,7 @@ func setupCLI() *cli.App { Default: "", }, }, - }, + }...), }, }, }, @@ -370,6 +375,50 @@ func setupCLI() *cli.App { }, }...), }, + { + Name: "gcp-pubsub", + Usage: "Google Cloud Platform PubSub", + Action: gcppubsub.Write, + Flags: append(gcpPubSubFlags, []cli.Flag{ + &cli.StringFlag{ + Name: "topic-id", + Usage: "Topic Id", + Required: true, + }, + &cli.StringFlag{ + Name: "input-data", + Usage: "The data to write to rabbitmq", + }, + &cli.StringFlag{ + Name: "input-file", + Usage: "File containing input data (1 file = 1 message)", + }, + &cli.GenericFlag{ + Name: "input-type", + Usage: "Treat input data as this type to enable output conversion", + Value: &EnumValue{ + Enum: []string{"plain", "base64", "jsonpb"}, + Default: "plain", + }, + }, + &cli.GenericFlag{ + Name: "output-type", + Usage: "Convert the input to this type when writing message", + Value: &EnumValue{ + Enum: []string{"plain", "protobuf"}, + Default: "plain", + }, + }, + &cli.StringFlag{ + Name: "protobuf-dir", + Usage: "Directory with .proto files", + }, + &cli.StringFlag{ + Name: "protobuf-root-message", + Usage: "Specifies the root message in a protobuf descriptor set (required if protobuf-dir set)", + }, + }...), + }, }, }, },