Skip to content

Commit

Permalink
add --key-proto-type: pretty print proto keys
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Mar 17, 2020
1 parent 6231952 commit 0dc081f
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/birdayz/kaf/pkg/proto"
"github.com/golang/protobuf/jsonpb"
prettyjson "github.com/hokaccha/go-prettyjson"
colorable "github.com/mattn/go-colorable"
"github.com/mattn/go-colorable"
"github.com/spf13/cobra"
)

Expand All @@ -26,7 +26,8 @@ var (
schemaCache *avro.SchemaCache
keyfmt *prettyjson.Formatter

protoType string
protoType string
keyProtoType string

reg *proto.DescriptorRegistry
)
Expand All @@ -39,6 +40,7 @@ func init() {
consumeCmd.Flags().StringSliceVar(&protoFiles, "proto-include", []string{}, "Path to proto files")
consumeCmd.Flags().StringSliceVar(&protoExclude, "proto-exclude", []string{}, "Proto exclusions (path prefixes)")
consumeCmd.Flags().StringVar(&protoType, "proto-type", "", "Fully qualified name of the proto message type. Example: com.test.SampleMessage")
consumeCmd.Flags().StringVar(&keyProtoType, "key-proto-type", "", "Fully qualified name of the proto key type. Example: com.test.SampleMessage")

keyfmt = prettyjson.NewFormatter()
keyfmt.Newline = " " // Replace newline with space to avoid condensed output.
Expand Down Expand Up @@ -140,18 +142,29 @@ var consumeCmd = &cobra.Command{
for msg := range pc.Messages() {
var stderr bytes.Buffer

// TODO make this nicer
var dataToDisplay []byte
var keyToDisplay []byte

if protoType != "" {
dataToDisplay, err = protoDecode(reg, msg.Value, protoType)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v", err)
fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v\n", err)
}

keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err)
}
} else {
dataToDisplay, err = avroDecode(msg.Value)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}

keyToDisplay, err = avroDecode(msg.Key)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}
}

if !raw {
Expand Down Expand Up @@ -185,12 +198,7 @@ var consumeCmd = &cobra.Command{
}

if msg.Key != nil && len(msg.Key) > 0 {

key, err := avroDecode(msg.Key)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}
fmt.Fprintf(w, "Key:\t%v\n", formatKey(key))
fmt.Fprintf(w, "Key:\t%v\n", formatKey(keyToDisplay))
}
fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp)
w.Flush()
Expand Down

0 comments on commit 0dc081f

Please sign in to comment.