Skip to content

Commit

Permalink
sarama-compatible hasher
Browse files Browse the repository at this point in the history
  • Loading branch information
C-Pro committed Nov 22, 2023
1 parent a6d10d4 commit 1dc3d40
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,21 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher {
}
}

// SaramaHasher returns a PartitionerHasher using hashFn that mirrors how
// SaramaHasher is not compatible with Sarama's default default partitioner.
// If you need sarama compatibility use SaramaCompatHasher instead.
// This function is left as is to provide compatibility with older versions of
// this library.
func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher {
return func(key []byte, n int) int {
p := int(hashFn(key)) % n
if p < 0 {
p = -p
}
return p
}
}

// SaramaCompatHasher returns a PartitionerHasher using hashFn that mirrors how
// Sarama partitions after hashing data.
//
// Sarama has two differences from Kafka when partitioning:
Expand All @@ -506,14 +520,14 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher {
//
// In short, to *exactly* match the Sarama defaults, use the following:
//
// kgo.StickyKeyPartitioner(kgo.SaramaHasher(fnv.New32a()))
func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher {
// kgo.StickyKeyPartitioner(kgo.SaramaCompatHasher(fnv.New32a()))
func SaramaCompatHasher(hashFn func([]byte) uint32) PartitionerHasher {
return func(key []byte, n int) int {
p := int(hashFn(key)) % n
p := int32(hashFn(key)) % int32(n)
if p < 0 {
p = -p
}
return p
return int(p)
}
}

Expand Down

0 comments on commit 1dc3d40

Please sign in to comment.