Skip to content

Commit

Permalink
Implement hasher as a callback function
Browse files Browse the repository at this point in the history
  • Loading branch information
kruftmeister committed Feb 28, 2017
1 parent 431b10f commit 088e472
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ func (p *roundRobinPartitioner) RequiresConsistency() bool {
return false
}

type hasherFunc func() hash.Hash32

type hashPartitioner struct {
random Partitioner
hasher hash.Hash32
hasher hasherFunc
}

// NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
// allowing the use of custom hasher
func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor {
func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
Expand All @@ -105,7 +107,7 @@ func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor {
func NewHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.hasher = fnv.New32a
return p
}

Expand All @@ -117,12 +119,12 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
if err != nil {
return -1, err
}
p.hasher.Reset()
_, err = p.hasher.Write(bytes)
hash := p.hasher()
_, err = hash.Write(bytes)
if err != nil {
return -1, err
}
partition := int32(p.hasher.Sum32()) % numPartitions
partition := int32(hash.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
Expand Down

0 comments on commit 088e472

Please sign in to comment.