Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the static membership feature to Kafka input #135

Merged
merged 23 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
42fa4e6
Minor, reused same constant already defined
andsel Jan 13, 2023
b75b9ca
Drafted an integration test to check the closing of 2 consumers with …
andsel Jan 13, 2023
97ca964
Update Kafka input do_poll to intercept FencedInstanceIdException in …
andsel Jan 16, 2023
98566cb
Postfix each consumer's thread group instance id with thread's id to …
andsel Jan 16, 2023
6b73a6a
Removed unused code in specs
andsel Jan 16, 2023
2d6209f
Fixed, added missed parameter in method calls in unit tests
andsel Jan 16, 2023
6809039
Fixed condition to generate unique group instance id for multithreade…
andsel Jan 17, 2023
e7c0fcf
Created separate topic to test static membership
andsel Jan 17, 2023
7641f24
Minor, renamed tests
andsel Jan 17, 2023
e2c80b9
Avoid to set group instance id when it's not configured
andsel Jan 17, 2023
47f58c2
Moved static membership test code under the top inputs/kafka section
andsel Jan 17, 2023
f638e3b
[Doc] described the new setting
andsel Jan 17, 2023
c3362ea
[Doc] improved sentence
andsel Jan 17, 2023
be16d01
Switched to Java's ArrayBlockingQueue to have a poll with max timeout
andsel Jan 17, 2023
67317d0
Minor fix in spec, from object identity to value identity
andsel Jan 17, 2023
baf72a9
Added flushing of enqueued producer's messages
andsel Jan 17, 2023
171809b
Fixed the check on boot sequence, the worker thread could consumer fr…
andsel Jan 17, 2023
bd67257
[Doc] specified that the 'group.instacne.id' has to be unique across …
andsel Jan 18, 2023
97f35b3
[Test] lifted some definitions in the rspec context
andsel Jan 18, 2023
a20570f
streamlined an `if` statement
andsel Jan 18, 2023
81dd9c3
Log the Kafka error message that contains the problematic group.insta…
andsel Jan 18, 2023
8a3039e
[Doc] Improved wording
andsel Jan 19, 2023
327f29b
Improved doc description and bumped version 11.1.0
andsel Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Unreleased
- Added config `group_instance_id` to use the Kafka's consumer static membership feature [#135](https://github.com/logstash-plugins/logstash-integration-kafka/pull/135)

## 11.0.0
- Changed Kafka client to 3.3.1, requires Logstash >= 8.3.0.
- Deprecated `default` value for setting `client_dns_lookup` forcing to `use_all_dns_ips` when explicitly used [#130](https://github.com/logstash-plugins/logstash-integration-kafka/pull/130)
Expand Down
22 changes: 22 additions & 0 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-fetch_max_wait_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-group_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-group_instance_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-isolation_level>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
Expand Down Expand Up @@ -344,6 +345,27 @@ NOTE: In cases when multiple inputs are being used in a single pipeline, reading
it's essential to set a different `group_id => ...` for each input. Setting a unique `client_id => ...`
is also recommended.

[id="plugins-{type}s-{plugin}-group_instance_id"]
===== `group_instance_id`

* Value type is <<string,string>>
* Optional, there is no default value for this setting.

The static membership identifier for this consumer. Static membership feature was introduced in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances[KIP-345],
available under Kafka property `group.instance.id` and aims to avoid rebalances in contexts where a lot of data
has to be handed off after a consumer goes offline.
This mitigates the cases where the service state is heavy and rebalance of one topic partition from instance
A to B means huge amount of data transfer. Client that goes offline and online frequently, if using this settings
avoid frequent and heavy rebalances.

NOTE: It has to be unique across all the clients belonging to the same <<plugins-{type}s-{plugin}-group_id>>, in case another client connects
with same `group.instance.id` value then the oldest is kicked off.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
NOTE: It has to be unique across all the clients belonging to the same <<plugins-{type}s-{plugin}-group_id>>, in case another client connects
with same `group.instance.id` value then the oldest is kicked off.
NOTE: The `group_instance_id` setting must be unique across all the clients belonging to the same <<plugins-{type}s-{plugin}-group_id>>. Otherwise, another client connecting
with same `group.instance.id` value would cause the oldest instance to be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I got all the details right. I'm trying to add precise language by restating the name of the option rather than just saying "it".
Also, "kicked off" is ambiguous in English. It can mean to start something (like kicking off a process" or deleting/removing something. I'm guessing you mean the latter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you guessed right :-) I thinks that instead of

instance to be removed

would be more appropriate

instance to be disconnected

Generally could be something like a hostname, an IP or anything that uniquely identifies the client application.

NOTE: In cases when multiple threads are configured, `consumer_threads` is greater than one, a suffix is appended to
the `group_instance_id` to avoid collisions.

[id="plugins-{type}s-{plugin}-heartbeat_interval_ms"]
===== `heartbeat_interval_ms`

Expand Down
1 change: 1 addition & 0 deletions kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 -
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_lz4_topic --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_zstd_topic --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_partitioner_topic --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_static_membership_topic --bootstrap-server localhost:9092
curl -s -o build/apache_logs.txt https://s3.amazonaws.com/data.elasticsearch.org/apache_logs/apache_logs.txt
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_plain --broker-list localhost:9092
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_snappy --broker-list localhost:9092 --compression-codec snappy
Expand Down
21 changes: 18 additions & 3 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# that happens to be made up of multiple processors. Messages in a topic will be distributed to all
# Logstash instances with the same `group_id`
config :group_id, :validate => :string, :default => "logstash"
# Set a static group instance id used in static membership feature to avoid rebalancing when a
# consumer goes offline. If set and `consumer_threads` is greater than 1 then for each
# consumer crated by each thread an artificial suffix is appended to the user provided `group_instance_id`
# to avoid clashing.
config :group_instance_id, :validate => :string
# The expected time between heartbeats to the consumer coordinator. Heartbeats are used to ensure
# that the consumer's session stays active and to facilitate rebalancing when new
# consumers join or leave the group. The value must be set lower than
Expand All @@ -136,7 +141,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# been aborted. Non-transactional messages will be returned unconditionally in either mode.
config :isolation_level, :validate => ["read_uncommitted", "read_committed"], :default => "read_uncommitted" # Kafka default
# Java Class used to deserialize the record's key
config :key_deserializer_class, :validate => :string, :default => "org.apache.kafka.common.serialization.StringDeserializer"
config :key_deserializer_class, :validate => :string, :default => DEFAULT_DESERIALIZER_CLASS
# The maximum delay between invocations of poll() when using consumer group management. This places
# an upper bound on the amount of time that the consumer can be idle before fetching more records.
# If poll() is not called before expiration of this timeout, then the consumer is considered failed and
Expand Down Expand Up @@ -287,7 +292,10 @@ def extract_metadata_level(decorate_events_setting)

public
def run(logstash_queue)
@runner_consumers = consumer_threads.times.map { |i| subscribe(create_consumer("#{client_id}-#{i}")) }
@runner_consumers = consumer_threads.times.map do |i|
thread_group_instance_id = consumer_threads > 1 && group_instance_id ? "#{group_instance_id}-#{i}" : group_instance_id
subscribe(create_consumer("#{client_id}-#{i}", thread_group_instance_id))
end
@runner_threads = @runner_consumers.map.with_index { |consumer, i| thread_runner(logstash_queue, consumer,
"kafka-input-worker-#{client_id}-#{i}") }
@runner_threads.each(&:start)
Expand Down Expand Up @@ -335,6 +343,9 @@ def do_poll(consumer)
rescue org.apache.kafka.common.errors.WakeupException => e
logger.debug("Wake up from poll", :kafka_error_message => e)
raise e unless stop?
rescue org.apache.kafka.common.errors.FencedInstanceIdException => e
logger.error("Another consumer with same group.instance.id has connected")
raise e unless stop?
rescue => e
logger.error("Unable to poll Kafka consumer",
:kafka_error_message => e,
Expand Down Expand Up @@ -389,7 +400,7 @@ def maybe_commit_offset(consumer)
end

private
def create_consumer(client_id)
def create_consumer(client_id, group_instance_id)
begin
props = java.util.Properties.new
kafka = org.apache.kafka.clients.consumer.ConsumerConfig
Expand All @@ -407,6 +418,10 @@ def create_consumer(client_id)
props.put(kafka::FETCH_MAX_WAIT_MS_CONFIG, fetch_max_wait_ms.to_s) unless fetch_max_wait_ms.nil?
props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes.to_s) unless fetch_min_bytes.nil?
props.put(kafka::GROUP_ID_CONFIG, group_id)
if group_instance_id
# group_instance_id in case of nil can't be set
props.put(kafka::GROUP_INSTANCE_ID_CONFIG, group_instance_id)
end
props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms.to_s) unless heartbeat_interval_ms.nil?
props.put(kafka::ISOLATION_LEVEL_CONFIG, isolation_level)
props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
Expand Down
100 changes: 100 additions & 0 deletions spec/integration/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def send_message(record)
producer = org.apache.kafka.clients.producer.KafkaProducer.new(props)

producer.send(record)
producer.flush
producer.close
end

Expand Down Expand Up @@ -185,10 +186,109 @@ def send_message(record)
end
end
end

context "static membership 'group.instance.id' setting" do
let(:consumer_config) do
{
"topics" => ["logstash_integration_static_membership_topic"],
"group_id" => "logstash",
"consumer_threads" => 1,
# this is needed because the worker thread could be executed little after the producer sent the "up" message
"auto_offset_reset" => "earliest",
"group_instance_id" => "test_static_group_id"
}
end

let(:logger) { double("logger") }
before :each do
allow(LogStash::Inputs::Kafka).to receive(:logger).and_return(logger)
[:error, :warn, :info, :debug].each do |level|
allow(logger).to receive(level)
end
end

it "input plugin disconnects from the broker when another client with same static membership connects" do
queue = java.util.concurrent.ArrayBlockingQueue.new(10)
kafka_input = LogStash::Inputs::Kafka.new(consumer_config)
kafka_input.register

expect(logger).to receive(:error).with("Another consumer with same group.instance.id has connected")

input_worker = java.lang.Thread.new { kafka_input.run(queue) }
begin
input_worker.start
wait_kafka_input_is_ready("logstash_integration_static_membership_topic", queue)
saboteur_kafka_consumer = create_consumer_and_start_consuming("test_static_group_id")
saboteur_kafka_consumer.run # ask to be scheduled
saboteur_kafka_consumer.join

expect(saboteur_kafka_consumer.value).to eq("saboteur exited")
ensure
input_worker.join(30_000)
end
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move some things outside of the test by introducing a context:

Suggested change
it "input plugin disconnects from the broker when another client with same static membership connects" do
queue = java.util.concurrent.ArrayBlockingQueue.new(10)
kafka_input = LogStash::Inputs::Kafka.new(consumer_config)
kafka_input.register
expect(logger).to receive(:error).with("Another consumer with same group.instance.id has connected")
input_worker = java.lang.Thread.new { kafka_input.run(queue) }
begin
input_worker.start
wait_kafka_input_is_ready("logstash_integration_static_membership_topic", queue)
saboteur_kafka_consumer = create_consumer_and_start_consuming("test_static_group_id")
saboteur_kafka_consumer.run # ask to be scheduled
saboteur_kafka_consumer.join
expect(saboteur_kafka_consumer.value).to eq("saboteur exited")
ensure
input_worker.join(30_000)
end
end
context "when another client with same static membership connects" do
let(:queue) { java.util.concurrent.ArrayBlockingQueue.new(10) }
let(:kafka_input) { LogStash::Inputs::Kafka.new(consumer_config) }
before(:each) { kafka_input.register }
it "input plugin disconnects from the broker" do
expect(logger).to receive(:error).with("Another consumer with same group.instance.id has connected")
input_worker = java.lang.Thread.new { kafka_input.run(queue) }
begin
input_worker.start
wait_kafka_input_is_ready("logstash_integration_static_membership_topic", queue)
saboteur_kafka_consumer = create_consumer_and_start_consuming("test_static_group_id")
saboteur_kafka_consumer.run # ask to be scheduled
saboteur_kafka_consumer.join
expect(saboteur_kafka_consumer.value).to eq("saboteur exited")
ensure
input_worker.join(30_000)
end
end
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion thanks, integrated with commit 97f35b3.


context "when the plugin is configured with multiple consumer threads" do
let(:multi_consumer_config) { consumer_config.merge({"consumer_threads" => 2}) }

it "should avoid to connect with same 'group.instance.id'" do
queue = java.util.concurrent.ArrayBlockingQueue.new(10)
kafka_input = LogStash::Inputs::Kafka.new(multi_consumer_config)
kafka_input.register

expect(logger).to_not receive(:error).with("Another consumer with same group.instance.id has connected")

input_worker = java.lang.Thread.new { kafka_input.run(queue) }
begin
input_worker.start
wait_kafka_input_is_ready("logstash_integration_static_membership_topic", queue)
ensure
kafka_input.stop
input_worker.join(1_000)
end
end
end
end
end

# return consumer Ruby Thread
def create_consumer_and_start_consuming(static_group_id)
props = java.util.Properties.new
kafka = org.apache.kafka.clients.consumer.ConsumerConfig
props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, LogStash::Inputs::Kafka::DEFAULT_DESERIALIZER_CLASS)
props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, LogStash::Inputs::Kafka::DEFAULT_DESERIALIZER_CLASS)
props.put(kafka::GROUP_ID_CONFIG, "logstash")
props.put(kafka::GROUP_INSTANCE_ID_CONFIG, static_group_id)
consumer = org.apache.kafka.clients.consumer.KafkaConsumer.new(props)

Thread.new do
LogStash::Util::set_thread_name("integration_test_simple_consumer")
begin
consumer.subscribe(["logstash_integration_static_membership_topic"])
records = consumer.poll(java.time.Duration.ofSeconds(3))
"saboteur exited"
rescue => e
e # return the exception reached in thread.value
ensure
consumer.close
end
end
end

private

def wait_kafka_input_is_ready(topic, queue)
# this is needed to give time to the kafka input to be up and running
header = org.apache.kafka.common.header.internals.RecordHeader.new("name", "Ping Up".to_java_bytes)
record = org.apache.kafka.clients.producer.ProducerRecord.new(topic, 0, "key", "value", [header])
send_message(record)

# Wait the message is processed
message = queue.poll(1, java.util.concurrent.TimeUnit::MINUTES)
expect(message).to_not eq(nil)
end

def consume_messages(config, queue: Queue.new, timeout:, event_count:)
kafka_input = LogStash::Inputs::Kafka.new(config)
kafka_input.register
Expand Down
10 changes: 5 additions & 5 deletions spec/unit/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@
to receive(:new).with(hash_including('client.rack' => 'EU-R1')).
and_return kafka_client = double('kafka-consumer')

expect( subject.send(:create_consumer, 'sample_client-0') ).to be kafka_client
expect( subject.send(:create_consumer, 'sample_client-0', 'group_instance_id') ).to be kafka_client
end
end

Expand All @@ -309,7 +309,7 @@
to receive(:new).with(hash_including('session.timeout.ms' => '25000', 'max.poll.interval.ms' => '345000')).
and_return kafka_client = double('kafka-consumer')

expect( subject.send(:create_consumer, 'sample_client-1') ).to be kafka_client
expect( subject.send(:create_consumer, 'sample_client-1', 'group_instance_id') ).to be kafka_client
end
end

Expand All @@ -321,7 +321,7 @@
to receive(:new).with(hash_including('session.timeout.ms' => '25200', 'max.poll.interval.ms' => '123000')).
and_return kafka_client = double('kafka-consumer')

expect( subject.send(:create_consumer, 'sample_client-2') ).to be kafka_client
expect( subject.send(:create_consumer, 'sample_client-2', 'group_instance_id') ).to be kafka_client
end
end

Expand All @@ -333,7 +333,7 @@
to receive(:new).with(hash_including('enable.auto.commit' => 'false', 'check.crcs' => 'true')).
and_return kafka_client = double('kafka-consumer')

expect( subject.send(:create_consumer, 'sample_client-3') ).to be kafka_client
expect( subject.send(:create_consumer, 'sample_client-3', 'group_instance_id') ).to be kafka_client
expect( subject.enable_auto_commit ).to be false
end
end
Expand All @@ -346,7 +346,7 @@
to receive(:new).with(hash_including('enable.auto.commit' => 'true', 'check.crcs' => 'false')).
and_return kafka_client = double('kafka-consumer')

expect( subject.send(:create_consumer, 'sample_client-4') ).to be kafka_client
expect( subject.send(:create_consumer, 'sample_client-4', 'group_instance_id') ).to be kafka_client
expect( subject.enable_auto_commit ).to be true
end
end
Expand Down