From f3b2363114a32a50b935b035410470538a4b0d7a Mon Sep 17 00:00:00 2001 From: Lyuben Manolov Date: Thu, 2 Mar 2017 15:36:17 +0100 Subject: [PATCH] Fix concurrency issue in hashedPartitioner Passing an already instantiated hasher is a really bad idea. Instead pass a function returning the expected interface, so the hasher is instantiated when needed, also assuring there's one hasher for each partitionDispatcher and thus avoid concurrency problems." --- partitioner.go | 12 ++++++------ partitioner_test.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/partitioner.go b/partitioner.go index 1d6539c10..5709b91d8 100644 --- a/partitioner.go +++ b/partitioner.go @@ -86,7 +86,7 @@ type hasherFunc func() hash.Hash32 type hashPartitioner struct { random Partitioner - hasher hasherFunc + hasher hash.Hash32 } // NewCustomHashPartitioner is a wrapper around NewHashPartitioner, @@ -95,7 +95,7 @@ func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor { return func(topic string) Partitioner { p := new(hashPartitioner) p.random = NewRandomPartitioner(topic) - p.hasher = hasher + p.hasher = hasher() return p } } @@ -107,7 +107,7 @@ func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor { func NewHashPartitioner(topic string) Partitioner { p := new(hashPartitioner) p.random = NewRandomPartitioner(topic) - p.hasher = fnv.New32a + p.hasher = fnv.New32a() return p } @@ -119,12 +119,12 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3 if err != nil { return -1, err } - hash := p.hasher() - _, err = hash.Write(bytes) + p.hasher.Reset() + _, err = p.hasher.Write(bytes) if err != nil { return -1, err } - partition := int32(hash.Sum32()) % numPartitions + partition := int32(p.hasher.Sum32()) % numPartitions if partition < 0 { partition = -partition } diff --git a/partitioner_test.go b/partitioner_test.go index 4123713fb..83376431f 100644 --- a/partitioner_test.go +++ b/partitioner_test.go @@ -73,7 +73,7 @@ func TestRoundRobinPartitioner(t *testing.T) { func TestNewHashPartitionerWithHasher(t *testing.T) { // use the current default hasher fnv.New32a() - partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic") + partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") choice, err := partitioner.Partition(&ProducerMessage{}, 1) if err != nil { @@ -104,7 +104,7 @@ func TestNewHashPartitionerWithHasher(t *testing.T) { func TestHashPartitionerWithHasherMinInt32(t *testing.T) { // use the current default hasher fnv.New32a() - partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic") + partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") msg := ProducerMessage{} // "1468509572224" generates 2147483648 (uint32) result from Sum32 function