Skip to content

Commit

Permalink
add reference-compatible hash partitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Jun 15, 2018
1 parent d020291 commit 6967cdb
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ func (p *roundRobinPartitioner) RequiresConsistency() bool {
}

type hashPartitioner struct {
random Partitioner
hasher hash.Hash32
random Partitioner
hasher hash.Hash32
referenceAbs bool
}

// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
Expand All @@ -110,6 +111,7 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = hasher()
p.referenceAbs = false
return p
}
}
Expand All @@ -122,6 +124,19 @@ func NewHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
return p
}

// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
// that but it had a mistake and now there are people depending on both behaviours. This will
// all go away on the next major version bump.
func NewReferenceHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = true
return p
}

Expand All @@ -138,9 +153,18 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
if err != nil {
return -1, err
}
partition := int32(p.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
var partition int32
// Turns out we were doing our absolute value in a subtly different way from the upstream
// implementation, but now we need to maintain backwards compat for people who started using
// the old version; if referenceAbs is set we are compatible with the reference java client
// but not past Sarama versions
if p.referenceAbs {
partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
} else {
partition = int32(p.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
}
return partition, nil
}
Expand Down

0 comments on commit 6967cdb

Please sign in to comment.