Skip to content

Commit

Permalink
Merge pull request #837 from manolovl/custom-hasher-partitioner
Browse files Browse the repository at this point in the history
Add functionality for custom hash partitioner.
  • Loading branch information
eapache authored Feb 27, 2017
2 parents 1416bd7 + cd5b0ea commit a71bff8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
11 changes: 11 additions & 0 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ type hashPartitioner struct {
hasher hash.Hash32
}

// NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
// allowing the use of custom hasher
func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = hasher
return p
}
}

// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
// modulus the number of partitions. This ensures that messages with the same key always end up on the
Expand Down
50 changes: 50 additions & 0 deletions partitioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"crypto/rand"
"hash/fnv"
"log"
"testing"
)
Expand Down Expand Up @@ -70,6 +71,55 @@ func TestRoundRobinPartitioner(t *testing.T) {
}
}

func TestNewHashPartitionerWithHasher(t *testing.T) {
// use the current default hasher fnv.New32a()
partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")

choice, err := partitioner.Partition(&ProducerMessage{}, 1)
if err != nil {
t.Error(partitioner, err)
}
if choice != 0 {
t.Error("Returned non-zero partition when only one available.")
}

for i := 1; i < 50; i++ {
choice, err := partitioner.Partition(&ProducerMessage{}, 50)
if err != nil {
t.Error(partitioner, err)
}
if choice < 0 || choice >= 50 {
t.Error("Returned partition", choice, "outside of range for nil key.")
}
}

buf := make([]byte, 256)
for i := 1; i < 50; i++ {
if _, err := rand.Read(buf); err != nil {
t.Error(err)
}
assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
}
}

func TestHashPartitionerWithHasherMinInt32(t *testing.T) {
// use the current default hasher fnv.New32a()
partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")

msg := ProducerMessage{}
// "1468509572224" generates 2147483648 (uint32) result from Sum32 function
// which is -2147483648 or int32's min value
msg.Key = StringEncoder("1468509572224")

choice, err := partitioner.Partition(&msg, 50)
if err != nil {
t.Error(partitioner, err)
}
if choice < 0 || choice >= 50 {
t.Error("Returned partition", choice, "outside of range for nil key.")
}
}

func TestHashPartitioner(t *testing.T) {
partitioner := NewHashPartitioner("mytopic")

Expand Down

0 comments on commit a71bff8

Please sign in to comment.