Skip to content

Commit

Permalink
feat(consume): add output flag to consume for full json output
Browse files Browse the repository at this point in the history
  • Loading branch information
shousper committed Nov 14, 2022
1 parent 3c2ce88 commit 79f3cfd
Showing 1 changed file with 102 additions and 18 deletions.
120 changes: 102 additions & 18 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"strconv"
"sync"
"text/tabwriter"

"strconv"

"github.com/Shopify/sarama"
"github.com/birdayz/kaf/pkg/avro"
"github.com/birdayz/kaf/pkg/proto"
Expand All @@ -24,11 +23,13 @@ var (
offsetFlag string
groupFlag string
groupCommitFlag bool
raw bool
follow bool
tail int32
schemaCache *avro.SchemaCache
keyfmt *prettyjson.Formatter
outputFormat = OutputFormatDefault
// Deprecated: Use outputFormat instead.
raw bool
follow bool
tail int32
schemaCache *avro.SchemaCache
keyfmt *prettyjson.Formatter

protoType string
keyProtoType string
Expand All @@ -44,6 +45,7 @@ func init() {
rootCmd.AddCommand(consumeCmd)
consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest, or integer.")
consumeCmd.Flags().BoolVar(&raw, "raw", false, "Print raw output of messages, without key or prettified JSON")
consumeCmd.Flags().Var(&outputFormat, "output", "Set output format messages: default, raw (without key or prettified JSON), json")
consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Continue to consume messages until program execution is interrupted/terminated")
consumeCmd.Flags().Int32VarP(&tail, "tail", "n", 0, "Print last n messages per partition")
consumeCmd.Flags().StringSliceVar(&protoFiles, "proto-include", []string{}, "Path to proto files")
Expand All @@ -56,6 +58,14 @@ func init() {
consumeCmd.Flags().StringVarP(&groupFlag, "group", "g", "", "Consumer Group to use for consume")
consumeCmd.Flags().BoolVar(&groupCommitFlag, "commit", false, "Commit Group offset after receiving messages. Works only if consuming as Consumer Group")

if err := consumeCmd.RegisterFlagCompletionFunc("output", completeOutputFormat); err != nil {
errorExit("Failed to register flag completion: %v", err)
}

if err := consumeCmd.Flags().MarkDeprecated("raw", "use --output raw instead"); err != nil {
errorExit("Failed to mark flag as deprecated: %v", err)
}

keyfmt = prettyjson.NewFormatter()
keyfmt.Newline = " " // Replace newline with space to avoid condensed output.
keyfmt.Indent = 0
Expand Down Expand Up @@ -95,6 +105,11 @@ var consumeCmd = &cobra.Command{
topic := args[0]
client := getClientFromConfig(cfg)

// Allow deprecated flag to override when outputFormat is not specified, or default.
if outputFormat == OutputFormatDefault && raw {
outputFormat = OutputFormatRaw
}

switch offsetFlag {
case "oldest":
offset = sarama.OffsetOldest
Expand Down Expand Up @@ -266,16 +281,51 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
}
}

if !raw {
if isJSON(dataToDisplay) {
dataToDisplay = formatValue(dataToDisplay)
dataToDisplay = formatMessage(msg, dataToDisplay, keyToDisplay, &stderr)

mu.Lock()
stderr.WriteTo(errWriter)
_, _ = colorableOut.Write(dataToDisplay)
fmt.Fprintln(outWriter)
mu.Unlock()
}

func formatMessage(msg *sarama.ConsumerMessage, rawMessage []byte, keyToDisplay []byte, stderr *bytes.Buffer) []byte {
switch outputFormat {
case OutputFormatRaw:
return rawMessage
case OutputFormatJSON:
jsonMessage := make(map[string]interface{})

jsonMessage["partition"] = msg.Partition
jsonMessage["offset"] = msg.Offset
jsonMessage["timestamp"] = msg.Timestamp

if len(msg.Headers) > 0 {
jsonMessage["headers"] = msg.Headers
}

jsonMessage["key"] = formatJSON(keyToDisplay)
jsonMessage["payload"] = formatJSON(rawMessage)

jsonToDisplay, err := json.Marshal(jsonMessage)
if err != nil {
fmt.Fprintf(stderr, "could not decode JSON data: %v", err)
}

return jsonToDisplay
case OutputFormatDefault:
fallthrough
default:
if isJSON(rawMessage) {
rawMessage = formatValue(rawMessage)
}

if isJSON(keyToDisplay) {
keyToDisplay = formatKey(keyToDisplay)
}

w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
w := tabwriter.NewWriter(stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)

if len(msg.Headers) > 0 {
fmt.Fprintf(w, "Headers:\n")
Expand Down Expand Up @@ -304,14 +354,9 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
}
fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp)
w.Flush()
}

mu.Lock()
stderr.WriteTo(errWriter)
_, _ = colorableOut.Write(dataToDisplay)
fmt.Fprintln(outWriter)
mu.Unlock()

return rawMessage
}
}

// proto to JSON
Expand Down Expand Up @@ -359,10 +404,49 @@ func formatValue(key []byte) []byte {
return key
}

func formatJSON(data []byte) interface{} {
var i interface{}
if err := json.Unmarshal(data, &i); err != nil {
return string(data)
}

return i
}

func isJSON(data []byte) bool {
var i interface{}
if err := json.Unmarshal(data, &i); err == nil {
return true
}
return false
}

type OutputFormat string

const (
OutputFormatDefault OutputFormat = "default"
OutputFormatRaw OutputFormat = "raw"
OutputFormatJSON OutputFormat = "json"
)

func (e *OutputFormat) String() string {
return string(*e)
}

func (e *OutputFormat) Set(v string) error {
switch v {
case "default", "raw", "json":
*e = OutputFormat(v)
return nil
default:
return fmt.Errorf("must be one of: default, raw, json")
}
}

func (e *OutputFormat) Type() string {
return "OutputFormat"
}

func completeOutputFormat(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return []string{"default", "raw", "json"}, cobra.ShellCompDirectiveNoFileComp
}

0 comments on commit 79f3cfd

Please sign in to comment.