Skip to content

Commit

Permalink
kaf/group/commit: parallelize partition offsets computation
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed May 4, 2021
1 parent 88fdefe commit e9ffb2f
Showing 1 changed file with 44 additions and 30 deletions.
74 changes: 44 additions & 30 deletions cmd/kaf/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,45 +166,59 @@ func createGroupCommitOffsetCmd() *cobra.Command {

sort.Slice(partitions, func(i int, j int) bool { return partitions[i] < partitions[j] })

partitionOffsets := make(map[int32]int64)
type Assignment struct {
partition int32
offset int64
}
assignments := make(chan Assignment, len(partitions))

// TODO offset must be calced per partition

var wg sync.WaitGroup
for _, partition := range partitions {
i, err := strconv.ParseInt(offset, 10, 64)
if err != nil {
// Try oldest/newest/..
if offset == "oldest" {
i = sarama.OffsetOldest
} else if offset == "newest" || offset == "latest" {
i = sarama.OffsetNewest
} else {
// Try timestamp
t, err := time.Parse(time.RFC3339, offset)
if err != nil {
errorExit("offset is neither offset nor timestamp", nil)
wg.Add(1)
go func(partition int32) {
defer wg.Done()
i, err := strconv.ParseInt(offset, 10, 64)
if err != nil {
// Try oldest/newest/..
if offset == "oldest" {
i = sarama.OffsetOldest
} else if offset == "newest" || offset == "latest" {
i = sarama.OffsetNewest
} else {
// Try timestamp
t, err := time.Parse(time.RFC3339, offset)
if err != nil {
errorExit("offset is neither offset nor timestamp", nil)
}
i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}
i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}

o, err := client.GetOffset(topic, partition, i)
if err != nil {
errorExit("Failed to determine offset for timestamp: %v", err)
}
o, err := client.GetOffset(topic, partition, i)
if err != nil {
errorExit("Failed to determine offset for timestamp: %v", err)
}

if o == -1 {
fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition)
continue
//errorExit("Determined offset -1 from timestamp. Skipping.", o)
}
if o == -1 {
fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition)
return
//errorExit("Determined offset -1 from timestamp. Skipping.", o)
}

partitionOffsets[partition] = o
assignments <- Assignment{partition: partition, offset: o}

fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o)
} else {
partitionOffsets[partition] = i
}
fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o)
} else {
assignments <- Assignment{partition: partition, offset: i}
}
}(partition)
}
wg.Wait()
close(assignments)

partitionOffsets := make(map[int32]int64, len(partitions))
for assign := range assignments {
partitionOffsets[assign.partition] = assign.offset
}

fmt.Printf("Resetting offsets to: %v\n", partitionOffsets)
Expand Down

0 comments on commit e9ffb2f

Please sign in to comment.