From 088e47274c3c2dbb8851067a2212f8c89a094fe5 Mon Sep 17 00:00:00 2001 From: Lyuben Manolov Date: Tue, 28 Feb 2017 17:42:50 +0100 Subject: [PATCH 1/3] Implement hasher as a callback function --- partitioner.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/partitioner.go b/partitioner.go index a28f997e6..1d6539c10 100644 --- a/partitioner.go +++ b/partitioner.go @@ -82,14 +82,16 @@ func (p *roundRobinPartitioner) RequiresConsistency() bool { return false } +type hasherFunc func() hash.Hash32 + type hashPartitioner struct { random Partitioner - hasher hash.Hash32 + hasher hasherFunc } // NewCustomHashPartitioner is a wrapper around NewHashPartitioner, // allowing the use of custom hasher -func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor { +func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor { return func(topic string) Partitioner { p := new(hashPartitioner) p.random = NewRandomPartitioner(topic) @@ -105,7 +107,7 @@ func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor { func NewHashPartitioner(topic string) Partitioner { p := new(hashPartitioner) p.random = NewRandomPartitioner(topic) - p.hasher = fnv.New32a() + p.hasher = fnv.New32a return p } @@ -117,12 +119,12 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3 if err != nil { return -1, err } - p.hasher.Reset() - _, err = p.hasher.Write(bytes) + hash := p.hasher() + _, err = hash.Write(bytes) if err != nil { return -1, err } - partition := int32(p.hasher.Sum32()) % numPartitions + partition := int32(hash.Sum32()) % numPartitions if partition < 0 { partition = -partition } From f3b2363114a32a50b935b035410470538a4b0d7a Mon Sep 17 00:00:00 2001 From: Lyuben Manolov Date: Thu, 2 Mar 2017 15:36:17 +0100 Subject: [PATCH 2/3] Fix concurrency issue in hashedPartitioner Passing an already instantiated hasher is a really bad idea. Instead pass a function returning the expected interface, so the hasher is instantiated when needed, also assuring there's one hasher for each partitionDispatcher and thus avoid concurrency problems." --- partitioner.go | 12 ++++++------ partitioner_test.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/partitioner.go b/partitioner.go index 1d6539c10..5709b91d8 100644 --- a/partitioner.go +++ b/partitioner.go @@ -86,7 +86,7 @@ type hasherFunc func() hash.Hash32 type hashPartitioner struct { random Partitioner - hasher hasherFunc + hasher hash.Hash32 } // NewCustomHashPartitioner is a wrapper around NewHashPartitioner, @@ -95,7 +95,7 @@ func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor { return func(topic string) Partitioner { p := new(hashPartitioner) p.random = NewRandomPartitioner(topic) - p.hasher = hasher + p.hasher = hasher() return p } } @@ -107,7 +107,7 @@ func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor { func NewHashPartitioner(topic string) Partitioner { p := new(hashPartitioner) p.random = NewRandomPartitioner(topic) - p.hasher = fnv.New32a + p.hasher = fnv.New32a() return p } @@ -119,12 +119,12 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3 if err != nil { return -1, err } - hash := p.hasher() - _, err = hash.Write(bytes) + p.hasher.Reset() + _, err = p.hasher.Write(bytes) if err != nil { return -1, err } - partition := int32(hash.Sum32()) % numPartitions + partition := int32(p.hasher.Sum32()) % numPartitions if partition < 0 { partition = -partition } diff --git a/partitioner_test.go b/partitioner_test.go index 4123713fb..83376431f 100644 --- a/partitioner_test.go +++ b/partitioner_test.go @@ -73,7 +73,7 @@ func TestRoundRobinPartitioner(t *testing.T) { func TestNewHashPartitionerWithHasher(t *testing.T) { // use the current default hasher fnv.New32a() - partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic") + partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") choice, err := partitioner.Partition(&ProducerMessage{}, 1) if err != nil { @@ -104,7 +104,7 @@ func TestNewHashPartitionerWithHasher(t *testing.T) { func TestHashPartitionerWithHasherMinInt32(t *testing.T) { // use the current default hasher fnv.New32a() - partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic") + partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") msg := ProducerMessage{} // "1468509572224" generates 2147483648 (uint32) result from Sum32 function From 8637e89db9d11cb4d7a549e36d09d41f3108edbb Mon Sep 17 00:00:00 2001 From: Lyuben Manolov Date: Thu, 2 Mar 2017 22:01:11 +0100 Subject: [PATCH 3/3] Minor changes to reflect comments to PR Remove type hasherFunc. Adjust doc comments for the NewCustomHashPartitioner. --- partitioner.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/partitioner.go b/partitioner.go index 5709b91d8..972932728 100644 --- a/partitioner.go +++ b/partitioner.go @@ -82,16 +82,15 @@ func (p *roundRobinPartitioner) RequiresConsistency() bool { return false } -type hasherFunc func() hash.Hash32 - type hashPartitioner struct { random Partitioner hasher hash.Hash32 } -// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, -// allowing the use of custom hasher -func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor { +// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. +// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that +// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance. +func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor { return func(topic string) Partitioner { p := new(hashPartitioner) p.random = NewRandomPartitioner(topic)