Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add murmur2_random support for message partitioning. #884

Merged
merged 10 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions lib/kafka/crc32_hash.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

require "zlib"

module Kafka
class Crc32Hash

# crc32 is part of the gems dependencies
def load; end

def hash(value)
Zlib.crc32(value)
end
end
end
22 changes: 22 additions & 0 deletions lib/kafka/digest.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require "kafka/crc32_hash"
require "kafka/murmur2_hash"

module Kafka
module Digest
FUNCTIONS_BY_NAME = {
:crc32 => Crc32Hash.new,
:murmur2 => Murmur2Hash.new
}.freeze

def self.find_digest(name)
digest = FUNCTIONS_BY_NAME.fetch(name) do
raise LoadError, "Unknown hash function #{name}"
end

digest.load
digest
end
end
end
17 changes: 17 additions & 0 deletions lib/kafka/murmur2_hash.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Kafka
class Murmur2Hash
SEED = [0x9747b28c].pack('L')

def load
require 'digest/murmurhash'
rescue LoadError
raise LoadError, "using murmur2 hashing requires adding a dependency on the `digest-murmurhash` gem to your Gemfile."
end

def hash(value)
::Digest::MurmurHash2.rawdigest(value, SEED) & 0x7fffffff
end
end
end
7 changes: 5 additions & 2 deletions lib/kafka/partitioner.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# frozen_string_literal: true

require "zlib"
require "kafka/digest"

module Kafka

# Assigns partitions to messages.
class Partitioner
def initialize(hash_function: nil)
@digest = Digest.find_digest(hash_function || :crc32)
end

# Assigns a partition number based on a partition key. If no explicit
# partition key is provided, the message key will be used instead.
Expand All @@ -28,7 +31,7 @@ def call(partition_count, message)
if key.nil?
rand(partition_count)
else
Zlib.crc32(key) % partition_count
@digest.hash(key) % partition_count
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka/protocol/record_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def encode(encoder)
record_batch_encoder.write_int8(MAGIC_BYTE)

body = encode_record_batch_body
crc = Digest::CRC32c.checksum(body)
crc = ::Digest::CRC32c.checksum(body)

record_batch_encoder.write_int32(crc)
record_batch_encoder.write(body)
Expand Down
1 change: 1 addition & 0 deletions ruby-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "rspec"
spec.add_development_dependency "pry"
spec.add_development_dependency "digest-murmurhash"
spec.add_development_dependency "dotenv"
spec.add_development_dependency "docker-api"
spec.add_development_dependency "rspec-benchmark"
Expand Down
33 changes: 33 additions & 0 deletions spec/digest_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

describe Kafka::Digest do
describe "crc32" do
let(:digest) { Kafka::Digest.find_digest(:crc32) }

it "is supported" do
expect(digest).to be_truthy
end

it "produces hash for value" do
expect(digest.hash("yolo")).to eq(1623057525)
end
end

describe "murmur2" do
let(:digest) { Kafka::Digest.find_digest(:murmur2) }

it "is supported" do
expect(digest).to be_truthy
end

it "produces hash for value" do
expect(digest.hash("yolo")).to eq(1633766415)
end
end

describe "unknown hash function" do
it "raises" do
expect { Kafka::Digest.find_digest(:yolo) }.to raise_error
end
end
end
82 changes: 67 additions & 15 deletions spec/partitioner_spec.rb
Original file line number Diff line number Diff line change
@@ -1,29 +1,81 @@
# frozen_string_literal: true

describe Kafka::Partitioner, "#call" do
let(:partitioner) { Kafka::Partitioner.new }
let(:message) { double(:message, key: nil, partition_key: "yolo") }

it "deterministically returns a partition number for a partition key and partition count" do
partition = partitioner.call(3, message)
expect(partition).to eq 0
end
describe "default partitioner" do
let(:partitioner) { Kafka::Partitioner.new }

it "deterministically returns a partition number for a partition key and partition count" do
partition = partitioner.call(3, message)
expect(partition).to eq 0
end

it "falls back to the message key if no partition key is available" do
allow(message).to receive(:partition_key) { nil }
allow(message).to receive(:key) { "hey" }

it "falls back to the message key if no partition key is available" do
allow(message).to receive(:partition_key) { nil }
allow(message).to receive(:key) { "hey" }
partition = partitioner.call(3, message)

partition = partitioner.call(3, message)
expect(partition).to eq 2
end

expect(partition).to eq 2
it "randomly picks a partition if the key is nil" do
allow(message).to receive(:key) { nil }
allow(message).to receive(:partition_key) { nil }

partitions = 30.times.map { partitioner.call(3, message) }

expect(partitions.uniq).to contain_exactly(0, 1, 2)
end
end

it "randomly picks a partition if the key is nil" do
allow(message).to receive(:key) { nil }
allow(message).to receive(:partition_key) { nil }
describe "murmur2 partitioner" do
let(:partitioner) { Kafka::Partitioner.new(hash_function: :murmur2) }
let(:message) { double(:message, key: nil, partition_key: "yolo") }

it "deterministically returns a partition number for a partition key and partition count" do
partition = partitioner.call(3, message)
expect(partition).to eq 0
end

it "falls back to the message key if no partition key is available" do
allow(message).to receive(:partition_key) { nil }
allow(message).to receive(:key) { "hey" }

partition = partitioner.call(3, message)

expect(partition).to eq 1
end

it "randomly picks a partition if the key is nil" do
allow(message).to receive(:key) { nil }
allow(message).to receive(:partition_key) { nil }

partitions = 30.times.map { partitioner.call(3, message) }

partitions = 30.times.map { partitioner.call(3, message) }
expect(partitions.uniq).to contain_exactly(0, 1, 2)
end

expect(partitions.uniq).to contain_exactly(0, 1, 2)
it "picks a Java Kafka compatible partition" do
partition_count = 100
{
# librdkafka test cases taken from tests/0048-partitioner.c
"" => 0x106e08d9 % partition_count,
"this is another string with more length to it perhaps" => 0x4f7703da % partition_count,
"hejsan" => 0x5ec19395 % partition_count,
# Java Kafka test cases taken from UtilsTest.java.
# The Java tests check the result of murmur2 directly,
# so have been ANDd with 0x7fffffff to work here
"21" => (-973932308 & 0x7fffffff) % partition_count,
"foobar" => (-790332482 & 0x7fffffff) % partition_count,
"a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count,
"a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count,
"lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count
}.each do |key, partition|
allow(message).to receive(:partition_key) { key }
expect(partitioner.call(partition_count, message)).to eq partition
end
end
end
end