From b323c63e52753b94902a606ebc046556981bfeee Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Sun, 24 Jan 2021 20:21:19 -0800 Subject: [PATCH 01/10] Adding murmur2_random partition assignment --- lib/kafka/client.rb | 8 +++- lib/kafka/murmur2_partitioner.rb | 35 ++++++++++++++ lib/kafka/producer.rb | 1 + ruby-kafka.gemspec | 1 + spec/partitioner_spec.rb | 82 ++++++++++++++++++++++++++------ 5 files changed, 110 insertions(+), 17 deletions(-) create mode 100644 lib/kafka/murmur2_partitioner.rb diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 0525f2bff..52ef9ffab 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -65,7 +65,7 @@ class Client # @param ssl_ca_certs_from_system [Boolean] whether to use the CA certs from the # system's default certificate store. # - # @param partitioner [Partitioner, nil] the partitioner that should be used by the client. + # @param partitioner [String, nil] the partitioner that should be used by the client. # # @param sasl_oauth_token_provider [Object, nil] OAuthBearer Token Provider instance that # implements method token. See {Sasl::OAuth#initialize} @@ -124,7 +124,11 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ) @cluster = initialize_cluster - @partitioner = partitioner || Partitioner.new + @partitioner = if partitioner + Object.const_get(partitioner).new + else + Partitioner.new + end end # Delivers a single message to the Kafka cluster. diff --git a/lib/kafka/murmur2_partitioner.rb b/lib/kafka/murmur2_partitioner.rb new file mode 100644 index 000000000..e7161dfb9 --- /dev/null +++ b/lib/kafka/murmur2_partitioner.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +require 'digest/murmurhash' + +module Kafka + + # Java producer compatible message partitioner + class Murmur2Partitioner + SEED = [0x9747b28c].pack('L') + + # Assigns a partition number based on a partition key. If no explicit + # partition key is provided, the message key will be used instead. + # + # If the key is nil, then a random partition is selected. Otherwise, a hash + # of the key is used to deterministically find a partition. As long as the + # number of partitions doesn't change, the same key will always be assigned + # to the same partition. + # + # @param partition_count [Integer] the number of partitions in the topic. + # @param message [Kafka::PendingMessage] the message that should be assigned + # a partition. + # @return [Integer] the partition number. + def call(partition_count, message) + raise ArgumentError if partition_count == 0 + + key = message.partition_key || message.key + + if key.nil? + rand(partition_count) + else + (Digest::MurmurHash2.rawdigest(key, SEED) & 0x7fffffff) % partition_count + end + end + end +end diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index b504dbfdd..0a70f1b41 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -2,6 +2,7 @@ require "set" require "kafka/partitioner" +require "kafka/murmur2_partitioner" require "kafka/message_buffer" require "kafka/produce_operation" require "kafka/pending_message_queue" diff --git a/ruby-kafka.gemspec b/ruby-kafka.gemspec index 2589e97d5..0b0dbbfd6 100644 --- a/ruby-kafka.gemspec +++ b/ruby-kafka.gemspec @@ -28,6 +28,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency 'digest-crc' + spec.add_dependency "digest-murmurhash" spec.add_development_dependency "bundler", ">= 1.9.5" spec.add_development_dependency "rake", "~> 10.0" diff --git a/spec/partitioner_spec.rb b/spec/partitioner_spec.rb index dafe73557..9b779551f 100644 --- a/spec/partitioner_spec.rb +++ b/spec/partitioner_spec.rb @@ -1,29 +1,81 @@ # frozen_string_literal: true describe Kafka::Partitioner, "#call" do - let(:partitioner) { Kafka::Partitioner.new } let(:message) { double(:message, key: nil, partition_key: "yolo") } - it "deterministically returns a partition number for a partition key and partition count" do - partition = partitioner.call(3, message) - expect(partition).to eq 0 - end + describe "default partitioner" do + let(:partitioner) { Kafka::Partitioner.new } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } - it "falls back to the message key if no partition key is available" do - allow(message).to receive(:partition_key) { nil } - allow(message).to receive(:key) { "hey" } + partition = partitioner.call(3, message) - partition = partitioner.call(3, message) + expect(partition).to eq 2 + end - expect(partition).to eq 2 + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } + + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end end - it "randomly picks a partition if the key is nil" do - allow(message).to receive(:key) { nil } - allow(message).to receive(:partition_key) { nil } + describe "murmur2 partitioner" do + let(:partitioner) { Kafka::Murmur2Partitioner.new } + let(:message) { double(:message, key: nil, partition_key: "yolo") } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } + + partition = partitioner.call(3, message) + + expect(partition).to eq 1 + end + + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } - partitions = 30.times.map { partitioner.call(3, message) } + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end - expect(partitions.uniq).to contain_exactly(0, 1, 2) + it "picks a Java Kafka compatible partition" do + partition_count = 100 + { + # librdkafka test cases taken from tests/0048-partitioner.c + "" => 0x106e08d9 % partition_count, + "this is another string with more length to it perhaps" => 0x4f7703da % partition_count, + "hejsan" => 0x5ec19395 % partition_count, + # Java Kafka test cases taken from UtilsTest.java. + # The Java tests check the result of murmur2 directly, + # so have been ANDd with 0x7fffffff to work here + "21" => (-973932308 & 0x7fffffff) % partition_count, + "foobar" => (-790332482 & 0x7fffffff) % partition_count, + "a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count, + "a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count, + "lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count + }.each do |key, partition| + allow(message).to receive(:partition_key) { key } + expect(partitioner.call(partition_count, message)).to eq partition + end + end end end From db7f5f4a3d097e72e0140aa0fd332c3d317eb170 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Tue, 26 Jan 2021 20:13:00 -0800 Subject: [PATCH 02/10] Add partitioner_klass as client param --- lib/kafka/client.rb | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 52ef9ffab..3ee8aa39e 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -65,7 +65,9 @@ class Client # @param ssl_ca_certs_from_system [Boolean] whether to use the CA certs from the # system's default certificate store. # - # @param partitioner [String, nil] the partitioner that should be used by the client. + # @param partitioner [Partitioner, nil] the partitioner that should be used by the client. + # + # @param partitioner_klass [String, nil] the partitioner klass that should be used by the client if no partitioner is supplied. # # @param sasl_oauth_token_provider [Object, nil] OAuthBearer Token Provider instance that # implements method token. See {Sasl::OAuth#initialize} @@ -80,7 +82,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, - sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) + sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, partitioner_klass: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) @logger = TaggedLogger.new(logger) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) @@ -124,11 +126,14 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ) @cluster = initialize_cluster - @partitioner = if partitioner - Object.const_get(partitioner).new - else - Partitioner.new - end + @partitioner = + if partitioner + partitioner + elsif partitioner_klass + Object.const_get(partitioner_klass).new + else + Partitioner.new + end end # Delivers a single message to the Kafka cluster. From 06175734c4d5f3094bfdaaf5b2f8345d39029e37 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 11:39:43 -0800 Subject: [PATCH 03/10] Changes `digest-murmurhash` to be an optional dependency And all the plumbing to load it --- lib/kafka/crc32_hash.rb | 15 +++++++++++++ lib/kafka/digest.rb | 24 ++++++++++++++++++++ lib/kafka/murmur2_hash.rb | 17 +++++++++++++++ lib/kafka/murmur2_partitioner.rb | 35 ------------------------------ lib/kafka/partitioner.rb | 7 ++++-- lib/kafka/producer.rb | 1 - lib/kafka/protocol/record_batch.rb | 2 +- ruby-kafka.gemspec | 2 +- spec/digest_spec.rb | 33 ++++++++++++++++++++++++++++ spec/partitioner_spec.rb | 2 +- 10 files changed, 97 insertions(+), 41 deletions(-) create mode 100644 lib/kafka/crc32_hash.rb create mode 100644 lib/kafka/digest.rb create mode 100644 lib/kafka/murmur2_hash.rb delete mode 100644 lib/kafka/murmur2_partitioner.rb create mode 100644 spec/digest_spec.rb diff --git a/lib/kafka/crc32_hash.rb b/lib/kafka/crc32_hash.rb new file mode 100644 index 000000000..af342ff48 --- /dev/null +++ b/lib/kafka/crc32_hash.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "zlib" + +module Kafka + class Crc32Hash + + # crc32 is part of the gems dependencies + def load; end + + def hash(value) + Zlib.crc32(value) + end + end +end diff --git a/lib/kafka/digest.rb b/lib/kafka/digest.rb new file mode 100644 index 000000000..f76885184 --- /dev/null +++ b/lib/kafka/digest.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +require "kafka/crc32_hash" +require "kafka/murmur2_hash" + +module Kafka + module Digest + FUNCTIONS_BY_NAME = { + :crc32 => Crc32Hash.new, + :murmur2 => Murmur2Hash.new + }.freeze + + # TODO: Should I just call this `hashing` or something? + def self.find_digest(name) + digest = FUNCTIONS_BY_NAME.fetch(name) do + raise LoadError, "Unknown hash function #{name}" + end + + digest.load + + digest + end + end +end diff --git a/lib/kafka/murmur2_hash.rb b/lib/kafka/murmur2_hash.rb new file mode 100644 index 000000000..a6223b0d6 --- /dev/null +++ b/lib/kafka/murmur2_hash.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Kafka + class Murmur2Hash + SEED = [0x9747b28c].pack('L') + + def load + require 'digest/murmurhash' + rescue LoadError + raise LoadError, "using murmur2 hashing requires adding a dependency on the `digest-murmurhash` gem to your Gemfile." + end + + def hash(value) + ::Digest::MurmurHash2.rawdigest(value, SEED) & 0x7fffffff + end + end +end diff --git a/lib/kafka/murmur2_partitioner.rb b/lib/kafka/murmur2_partitioner.rb deleted file mode 100644 index e7161dfb9..000000000 --- a/lib/kafka/murmur2_partitioner.rb +++ /dev/null @@ -1,35 +0,0 @@ -# frozen_string_literal: true - -require 'digest/murmurhash' - -module Kafka - - # Java producer compatible message partitioner - class Murmur2Partitioner - SEED = [0x9747b28c].pack('L') - - # Assigns a partition number based on a partition key. If no explicit - # partition key is provided, the message key will be used instead. - # - # If the key is nil, then a random partition is selected. Otherwise, a hash - # of the key is used to deterministically find a partition. As long as the - # number of partitions doesn't change, the same key will always be assigned - # to the same partition. - # - # @param partition_count [Integer] the number of partitions in the topic. - # @param message [Kafka::PendingMessage] the message that should be assigned - # a partition. - # @return [Integer] the partition number. - def call(partition_count, message) - raise ArgumentError if partition_count == 0 - - key = message.partition_key || message.key - - if key.nil? - rand(partition_count) - else - (Digest::MurmurHash2.rawdigest(key, SEED) & 0x7fffffff) % partition_count - end - end - end -end diff --git a/lib/kafka/partitioner.rb b/lib/kafka/partitioner.rb index f4fcd2882..14901ca1f 100644 --- a/lib/kafka/partitioner.rb +++ b/lib/kafka/partitioner.rb @@ -1,11 +1,14 @@ # frozen_string_literal: true -require "zlib" +require "kafka/digest" module Kafka # Assigns partitions to messages. class Partitioner + def initialize(hash_function: nil) + @digest = Digest.find_digest(hash_function || :crc32) + end # Assigns a partition number based on a partition key. If no explicit # partition key is provided, the message key will be used instead. @@ -28,7 +31,7 @@ def call(partition_count, message) if key.nil? rand(partition_count) else - Zlib.crc32(key) % partition_count + @digest.hash(key) % partition_count end end end diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index 0a70f1b41..b504dbfdd 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -2,7 +2,6 @@ require "set" require "kafka/partitioner" -require "kafka/murmur2_partitioner" require "kafka/message_buffer" require "kafka/produce_operation" require "kafka/pending_message_queue" diff --git a/lib/kafka/protocol/record_batch.rb b/lib/kafka/protocol/record_batch.rb index 4201cc737..b9f6ea2c9 100644 --- a/lib/kafka/protocol/record_batch.rb +++ b/lib/kafka/protocol/record_batch.rb @@ -77,7 +77,7 @@ def encode(encoder) record_batch_encoder.write_int8(MAGIC_BYTE) body = encode_record_batch_body - crc = Digest::CRC32c.checksum(body) + crc = ::Digest::CRC32c.checksum(body) record_batch_encoder.write_int32(crc) record_batch_encoder.write(body) diff --git a/ruby-kafka.gemspec b/ruby-kafka.gemspec index 0b0dbbfd6..d50092dc5 100644 --- a/ruby-kafka.gemspec +++ b/ruby-kafka.gemspec @@ -28,12 +28,12 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency 'digest-crc' - spec.add_dependency "digest-murmurhash" spec.add_development_dependency "bundler", ">= 1.9.5" spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rspec" spec.add_development_dependency "pry" + spec.add_development_dependency "digest-murmurhash" spec.add_development_dependency "dotenv" spec.add_development_dependency "docker-api" spec.add_development_dependency "rspec-benchmark" diff --git a/spec/digest_spec.rb b/spec/digest_spec.rb new file mode 100644 index 000000000..c4b98a8bf --- /dev/null +++ b/spec/digest_spec.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +describe Kafka::Digest do + describe "crc32" do + let(:digest) { Kafka::Digest.find_digest(:crc32) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1623057525) + end + end + + describe "murmur2" do + let(:digest) { Kafka::Digest.find_digest(:murmur2) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1633766415) + end + end + + describe "unknown hash function" do + it "raises" do + expect { Kafka::Digest.find_digest(:yolo) }.to raise_error + end + end +end diff --git a/spec/partitioner_spec.rb b/spec/partitioner_spec.rb index 9b779551f..547dccd13 100644 --- a/spec/partitioner_spec.rb +++ b/spec/partitioner_spec.rb @@ -31,7 +31,7 @@ end describe "murmur2 partitioner" do - let(:partitioner) { Kafka::Murmur2Partitioner.new } + let(:partitioner) { Kafka::Partitioner.new(hash_function: :murmur2) } let(:message) { double(:message, key: nil, partition_key: "yolo") } it "deterministically returns a partition number for a partition key and partition count" do From 4669c204498565fd8addf7d32907e0532c7f6d13 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 11:40:01 -0800 Subject: [PATCH 04/10] Revert client API change --- lib/kafka/client.rb | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 3ee8aa39e..0525f2bff 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -67,8 +67,6 @@ class Client # # @param partitioner [Partitioner, nil] the partitioner that should be used by the client. # - # @param partitioner_klass [String, nil] the partitioner klass that should be used by the client if no partitioner is supplied. - # # @param sasl_oauth_token_provider [Object, nil] OAuthBearer Token Provider instance that # implements method token. See {Sasl::OAuth#initialize} # @@ -82,7 +80,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, - sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, partitioner_klass: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) + sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) @logger = TaggedLogger.new(logger) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) @@ -126,14 +124,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ) @cluster = initialize_cluster - @partitioner = - if partitioner - partitioner - elsif partitioner_klass - Object.const_get(partitioner_klass).new - else - Partitioner.new - end + @partitioner = partitioner || Partitioner.new end # Delivers a single message to the Kafka cluster. From b2f3eedd288ab13a6752a155b216931a3861b332 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 11:42:34 -0800 Subject: [PATCH 05/10] Small cleanup --- lib/kafka/digest.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/kafka/digest.rb b/lib/kafka/digest.rb index f76885184..8ba4cc206 100644 --- a/lib/kafka/digest.rb +++ b/lib/kafka/digest.rb @@ -10,14 +10,12 @@ module Digest :murmur2 => Murmur2Hash.new }.freeze - # TODO: Should I just call this `hashing` or something? def self.find_digest(name) digest = FUNCTIONS_BY_NAME.fetch(name) do raise LoadError, "Unknown hash function #{name}" end digest.load - digest end end From d49e7c4493c130a63fc0951a50bba5496541cb98 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:19:03 -0800 Subject: [PATCH 06/10] small cleanup --- lib/kafka/crc32_hash.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka/crc32_hash.rb b/lib/kafka/crc32_hash.rb index af342ff48..1849008a6 100644 --- a/lib/kafka/crc32_hash.rb +++ b/lib/kafka/crc32_hash.rb @@ -5,7 +5,7 @@ module Kafka class Crc32Hash - # crc32 is part of the gems dependencies + # crc32 is supported natively def load; end def hash(value) From 5a3374e80354c3345c3d8053708b42d4fabf2844 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:19:12 -0800 Subject: [PATCH 07/10] Add murmur2 to readme --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index ebe3d8365..a6b963761 100644 --- a/README.md +++ b/README.md @@ -382,6 +382,16 @@ partitioner = -> (partition_count, message) { ... } Kafka.new(partitioner: partitioner, ...) ``` +##### Supported partitioning schemes + +In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. + +To use `murmur2` hashing pass it as an argument to `Partitioner`. For example: + +```ruby +Kafka.new(partitioner: Kafka::Partitioner.new(hash_function: :murmur2)) +``` + #### Buffering and Error Handling The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer. From c52c238000dded74097e9853b7a0f08d756719a7 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:19:22 -0800 Subject: [PATCH 08/10] Add murmur2 to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e31a47d6..e1db8b5aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Changes and additions to the library will be listed here. ## Unreleased - Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). +- Add support for `murmur2` based partitioning. ## 1.3.0 From f3e5078d902dbf9e0e09b56fa639f68fb50e9080 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:29:42 -0800 Subject: [PATCH 09/10] Add Partitioner doc --- lib/kafka/partitioner.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/kafka/partitioner.rb b/lib/kafka/partitioner.rb index 14901ca1f..e11052442 100644 --- a/lib/kafka/partitioner.rb +++ b/lib/kafka/partitioner.rb @@ -6,6 +6,8 @@ module Kafka # Assigns partitions to messages. class Partitioner + # @param hash_function [Symbol, nil] the algorithm used to compute a messages + # destination partition. Default is :crc32 def initialize(hash_function: nil) @digest = Digest.find_digest(hash_function || :crc32) end From fd89412ecad8f250e15670a57faaf479cc7f06f0 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:32:02 -0800 Subject: [PATCH 10/10] Correcr readme doc --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a6b963761..1ff5b616f 100644 --- a/README.md +++ b/README.md @@ -384,7 +384,7 @@ Kafka.new(partitioner: partitioner, ...) ##### Supported partitioning schemes -In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. +In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms that can be used to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. To use `murmur2` hashing pass it as an argument to `Partitioner`. For example: