Skip to content

Commit

Permalink
Fix concurrency issue in hashedPartitioner
Browse files Browse the repository at this point in the history
Passing an already instantiated hasher is a really bad idea. Instead pass a function returning the expected interface,
so the hasher is instantiated when needed, also assuring there's one hasher for each partitionDispatcher and thus avoid concurrency problems."
  • Loading branch information
kruftmeister committed Mar 2, 2017
1 parent 088e472 commit f3b2363
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
12 changes: 6 additions & 6 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type hasherFunc func() hash.Hash32

type hashPartitioner struct {
random Partitioner
hasher hasherFunc
hasher hash.Hash32
}

// NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
Expand All @@ -95,7 +95,7 @@ func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = hasher
p.hasher = hasher()
return p
}
}
Expand All @@ -107,7 +107,7 @@ func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor {
func NewHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a
p.hasher = fnv.New32a()
return p
}

Expand All @@ -119,12 +119,12 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
if err != nil {
return -1, err
}
hash := p.hasher()
_, err = hash.Write(bytes)
p.hasher.Reset()
_, err = p.hasher.Write(bytes)
if err != nil {
return -1, err
}
partition := int32(hash.Sum32()) % numPartitions
partition := int32(p.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
Expand Down
4 changes: 2 additions & 2 deletions partitioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestRoundRobinPartitioner(t *testing.T) {

func TestNewHashPartitionerWithHasher(t *testing.T) {
// use the current default hasher fnv.New32a()
partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")

choice, err := partitioner.Partition(&ProducerMessage{}, 1)
if err != nil {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNewHashPartitionerWithHasher(t *testing.T) {

func TestHashPartitionerWithHasherMinInt32(t *testing.T) {
// use the current default hasher fnv.New32a()
partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")

msg := ProducerMessage{}
// "1468509572224" generates 2147483648 (uint32) result from Sum32 function
Expand Down

0 comments on commit f3b2363

Please sign in to comment.