From 79f3cfd7493cdaf4795d5309669595515c10cfa7 Mon Sep 17 00:00:00 2001 From: Corey McGregor Date: Mon, 14 Nov 2022 10:21:21 +1000 Subject: [PATCH] feat(consume): add output flag to consume for full json output --- cmd/kaf/consume.go | 120 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 102 insertions(+), 18 deletions(-) diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 3a37b32..b141302 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -6,11 +6,10 @@ import ( "encoding/binary" "encoding/json" "fmt" + "strconv" "sync" "text/tabwriter" - "strconv" - "github.com/Shopify/sarama" "github.com/birdayz/kaf/pkg/avro" "github.com/birdayz/kaf/pkg/proto" @@ -24,11 +23,13 @@ var ( offsetFlag string groupFlag string groupCommitFlag bool - raw bool - follow bool - tail int32 - schemaCache *avro.SchemaCache - keyfmt *prettyjson.Formatter + outputFormat = OutputFormatDefault + // Deprecated: Use outputFormat instead. + raw bool + follow bool + tail int32 + schemaCache *avro.SchemaCache + keyfmt *prettyjson.Formatter protoType string keyProtoType string @@ -44,6 +45,7 @@ func init() { rootCmd.AddCommand(consumeCmd) consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest, or integer.") consumeCmd.Flags().BoolVar(&raw, "raw", false, "Print raw output of messages, without key or prettified JSON") + consumeCmd.Flags().Var(&outputFormat, "output", "Set output format messages: default, raw (without key or prettified JSON), json") consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Continue to consume messages until program execution is interrupted/terminated") consumeCmd.Flags().Int32VarP(&tail, "tail", "n", 0, "Print last n messages per partition") consumeCmd.Flags().StringSliceVar(&protoFiles, "proto-include", []string{}, "Path to proto files") @@ -56,6 +58,14 @@ func init() { consumeCmd.Flags().StringVarP(&groupFlag, "group", "g", "", "Consumer Group to use for consume") consumeCmd.Flags().BoolVar(&groupCommitFlag, "commit", false, "Commit Group offset after receiving messages. Works only if consuming as Consumer Group") + if err := consumeCmd.RegisterFlagCompletionFunc("output", completeOutputFormat); err != nil { + errorExit("Failed to register flag completion: %v", err) + } + + if err := consumeCmd.Flags().MarkDeprecated("raw", "use --output raw instead"); err != nil { + errorExit("Failed to mark flag as deprecated: %v", err) + } + keyfmt = prettyjson.NewFormatter() keyfmt.Newline = " " // Replace newline with space to avoid condensed output. keyfmt.Indent = 0 @@ -95,6 +105,11 @@ var consumeCmd = &cobra.Command{ topic := args[0] client := getClientFromConfig(cfg) + // Allow deprecated flag to override when outputFormat is not specified, or default. + if outputFormat == OutputFormatDefault && raw { + outputFormat = OutputFormatRaw + } + switch offsetFlag { case "oldest": offset = sarama.OffsetOldest @@ -266,16 +281,51 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) { } } - if !raw { - if isJSON(dataToDisplay) { - dataToDisplay = formatValue(dataToDisplay) + dataToDisplay = formatMessage(msg, dataToDisplay, keyToDisplay, &stderr) + + mu.Lock() + stderr.WriteTo(errWriter) + _, _ = colorableOut.Write(dataToDisplay) + fmt.Fprintln(outWriter) + mu.Unlock() +} + +func formatMessage(msg *sarama.ConsumerMessage, rawMessage []byte, keyToDisplay []byte, stderr *bytes.Buffer) []byte { + switch outputFormat { + case OutputFormatRaw: + return rawMessage + case OutputFormatJSON: + jsonMessage := make(map[string]interface{}) + + jsonMessage["partition"] = msg.Partition + jsonMessage["offset"] = msg.Offset + jsonMessage["timestamp"] = msg.Timestamp + + if len(msg.Headers) > 0 { + jsonMessage["headers"] = msg.Headers + } + + jsonMessage["key"] = formatJSON(keyToDisplay) + jsonMessage["payload"] = formatJSON(rawMessage) + + jsonToDisplay, err := json.Marshal(jsonMessage) + if err != nil { + fmt.Fprintf(stderr, "could not decode JSON data: %v", err) + } + + return jsonToDisplay + case OutputFormatDefault: + fallthrough + default: + if isJSON(rawMessage) { + rawMessage = formatValue(rawMessage) } if isJSON(keyToDisplay) { keyToDisplay = formatKey(keyToDisplay) } - w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags) + w := tabwriter.NewWriter(stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags) if len(msg.Headers) > 0 { fmt.Fprintf(w, "Headers:\n") @@ -304,14 +354,9 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) { } fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp) w.Flush() - } - - mu.Lock() - stderr.WriteTo(errWriter) - _, _ = colorableOut.Write(dataToDisplay) - fmt.Fprintln(outWriter) - mu.Unlock() + return rawMessage + } } // proto to JSON @@ -359,6 +404,15 @@ func formatValue(key []byte) []byte { return key } +func formatJSON(data []byte) interface{} { + var i interface{} + if err := json.Unmarshal(data, &i); err != nil { + return string(data) + } + + return i +} + func isJSON(data []byte) bool { var i interface{} if err := json.Unmarshal(data, &i); err == nil { @@ -366,3 +420,33 @@ func isJSON(data []byte) bool { } return false } + +type OutputFormat string + +const ( + OutputFormatDefault OutputFormat = "default" + OutputFormatRaw OutputFormat = "raw" + OutputFormatJSON OutputFormat = "json" +) + +func (e *OutputFormat) String() string { + return string(*e) +} + +func (e *OutputFormat) Set(v string) error { + switch v { + case "default", "raw", "json": + *e = OutputFormat(v) + return nil + default: + return fmt.Errorf("must be one of: default, raw, json") + } +} + +func (e *OutputFormat) Type() string { + return "OutputFormat" +} + +func completeOutputFormat(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return []string{"default", "raw", "json"}, cobra.ShellCompDirectiveNoFileComp +}