Skip to content

Commit

Permalink
fix(cli): Kafka inspect output formatting (SeldonIO#6130)
Browse files Browse the repository at this point in the history
* add kafka inspect consumer timeout (-d) as parameter

* add formatting
  • Loading branch information
sakoush authored Dec 6, 2024
1 parent 6d89d57 commit f284b4a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
10 changes: 8 additions & 2 deletions operator/cmd/seldon/cli/pipeline_inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cli

import (
"fmt"
"time"

"github.com/spf13/cobra"
"k8s.io/utils/env"
Expand All @@ -24,7 +25,7 @@ const (
flagOutputFormat = "format"
flagTruncate = "truncate"
flagNamespace = "namespace"
flagTimeoutDefault = int64(60)
flagTimeoutDefault = int64(5)
)

func createPipelineInspect() *cobra.Command {
Expand Down Expand Up @@ -74,12 +75,16 @@ func createPipelineInspect() *cobra.Command {
if err != nil {
return err
}
timeoutSecs, err := flags.GetInt64(flagTimeout)
if err != nil {
return err
}
kc, err := cli.NewKafkaClient(kafkaBroker, kafkaBrokerIsSet, schedulerHost, schedulerHostIsSet, kafkaConfigPath)
if err != nil {
return err
}
data := []byte(args[0])
err = kc.InspectStep(string(data), offset, requestId, format, verbose, truncateData, namespace)
err = kc.InspectStep(string(data), offset, requestId, format, verbose, truncateData, namespace, time.Duration(timeoutSecs)*time.Second)
return err
},
}
Expand All @@ -94,5 +99,6 @@ func createPipelineInspect() *cobra.Command {
flags.BoolP(flagVerbose, "v", false, "display more details, such as headers")
flags.BoolP(flagTruncate, "t", false, "truncate data")
flags.String(flagKafkaConfigPath, env.GetString(envKafkaConfigPath, ""), "path to kafka config file")
flags.Int64P(flagTimeout, "d", flagTimeoutDefault, "timeout seconds for kafka operations")
return cmd
}
19 changes: 12 additions & 7 deletions operator/pkg/cli/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const (
OutputsSpecifier = "outputs"
PipelineSpecifier = "pipeline"
ModelSpecifier = "model"
KafkaTimeoutSeconds = 2
DefaultNamespace = "default"
DefaultMaxMessageSize = 1000000000
)
Expand Down Expand Up @@ -222,7 +221,9 @@ func getPipelineNameFromHeaders(headers []kafka.Header) (string, error) {
return "", fmt.Errorf("No pipeline found in headers.")
}

func (kc *KafkaClient) InspectStep(pipelineStep string, offset int64, key string, format string, verbose bool, truncateData bool, namespace string) error {
func (kc *KafkaClient) InspectStep(
pipelineStep string, offset int64, key string, format string, verbose bool, truncateData bool, namespace string, timeout time.Duration,
) error {
defer kc.consumer.Close()
if namespace == "" {
namespace = kc.namespace
Expand All @@ -238,7 +239,7 @@ func (kc *KafkaClient) InspectStep(pipelineStep string, offset int64, key string

ki := KafkaInspect{}
for _, topic := range pipelineTopics.topics {
kit, err := kc.createInspectTopic(topic, pipelineTopics.pipeline, pipelineTopics.tensor, offset, key, verbose, truncateData)
kit, err := kc.createInspectTopic(topic, pipelineTopics.pipeline, pipelineTopics.tensor, offset, key, verbose, truncateData, timeout)
if err != nil {
return err
}
Expand All @@ -253,24 +254,28 @@ func (kc *KafkaClient) InspectStep(pipelineStep string, offset int64, key string
fmt.Printf("%s\n", string(b))
} else {
for _, topic := range ki.Topics {
fmt.Printf("Topic: %s\n", topic.Name)
for _, msg := range topic.Msgs {
if verbose {
fmt.Printf("%s\t%s\t%s\t", topic.Name, msg.Key, msg.Value)
fmt.Printf("%s\t%s\t", msg.Key, msg.Value)
for k, v := range msg.Headers {
fmt.Printf("\t%s=%s", k, v)
}
fmt.Println("")
} else {
fmt.Printf("%s\t%s\t%s\n", topic.Name, msg.Key, msg.Value)
fmt.Printf("%s\t%s\n", msg.Key, msg.Value)
}
}
fmt.Print("----------------\n")
}
}

return nil
}

func (kc *KafkaClient) createInspectTopic(topic string, pipeline string, tensor string, offset int64, key string, verbose bool, truncateData bool) (*KafkaInspectTopic, error) {
func (kc *KafkaClient) createInspectTopic(
topic string, pipeline string, tensor string, offset int64, key string, verbose bool, truncateData bool, timeout time.Duration,
) (*KafkaInspectTopic, error) {
kit := KafkaInspectTopic{
Name: topic,
}
Expand All @@ -279,7 +284,7 @@ func (kc *KafkaClient) createInspectTopic(topic string, pipeline string, tensor
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), KafkaTimeoutSeconds*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

run := true
Expand Down

0 comments on commit f284b4a

Please sign in to comment.