Skip to content

Commit

Permalink
fix issue where follow tries to consume from invalid offsets (-1)
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Apr 3, 2020
1 parent b6fba02 commit fb5467c
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ var consumeCmd = &cobra.Command{

wg.Add(1)

go func(partition int32) {
go func(partition int32, offset int64) {
req := &sarama.OffsetRequest{
Version: int16(1),
}
Expand All @@ -141,9 +141,14 @@ var consumeCmd = &cobra.Command{
fmt.Fprintf(os.Stderr, "Starting on partition %v with offset %v\n", partition, offset)
}

// Ignore invalid partitions
if offset < 0 {
return
}

pc, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
errorExit("Unable to consume partition: %v\n", err)
errorExit("Unable to consume partition: %v %v %v %v\n", topic, partition, offset, err)
}

for msg := range pc.Messages() {
Expand Down Expand Up @@ -218,7 +223,7 @@ var consumeCmd = &cobra.Command{
mu.Unlock()
}
wg.Done()
}(partition)
}(partition, offset)
}
wg.Wait()

Expand Down

0 comments on commit fb5467c

Please sign in to comment.