Skip to content

Commit

Permalink
Merge pull request #841 from manolovl/hasher-callback
Browse files Browse the repository at this point in the history
Fix CustomHasher
  • Loading branch information
eapache authored Mar 2, 2017
2 parents 2eecce0 + 8637e89 commit 8093021
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
9 changes: 5 additions & 4 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ type hashPartitioner struct {
hasher hash.Hash32
}

// NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
// allowing the use of custom hasher
func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor {
// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = hasher
p.hasher = hasher()
return p
}
}
Expand Down
4 changes: 2 additions & 2 deletions partitioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestRoundRobinPartitioner(t *testing.T) {

func TestNewHashPartitionerWithHasher(t *testing.T) {
// use the current default hasher fnv.New32a()
partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")

choice, err := partitioner.Partition(&ProducerMessage{}, 1)
if err != nil {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNewHashPartitionerWithHasher(t *testing.T) {

func TestHashPartitionerWithHasherMinInt32(t *testing.T) {
// use the current default hasher fnv.New32a()
partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")

msg := ProducerMessage{}
// "1468509572224" generates 2147483648 (uint32) result from Sum32 function
Expand Down

0 comments on commit 8093021

Please sign in to comment.