-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the static membership
feature to Kafka input
#135
Implement the static membership
feature to Kafka input
#135
Conversation
…colliding static membership
…case another client with same group.instance.id connect to same topic within same group
…om the topic after the producer sent the message
static membership
feature to Kafka input
…all the clients belogning to the same consumer group
it "input plugin disconnects from the broker when another client with same static membership connects" do | ||
queue = java.util.concurrent.ArrayBlockingQueue.new(10) | ||
kafka_input = LogStash::Inputs::Kafka.new(consumer_config) | ||
kafka_input.register | ||
|
||
expect(logger).to receive(:error).with("Another consumer with same group.instance.id has connected") | ||
|
||
input_worker = java.lang.Thread.new { kafka_input.run(queue) } | ||
begin | ||
input_worker.start | ||
wait_kafka_input_is_ready("logstash_integration_static_membership_topic", queue) | ||
saboteur_kafka_consumer = create_consumer_and_start_consuming("test_static_group_id") | ||
saboteur_kafka_consumer.run # ask to be scheduled | ||
saboteur_kafka_consumer.join | ||
|
||
expect(saboteur_kafka_consumer.value).to eq("saboteur exited") | ||
ensure | ||
input_worker.join(30_000) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move some things outside of the test by introducing a context:
it "input plugin disconnects from the broker when another client with same static membership connects" do | |
queue = java.util.concurrent.ArrayBlockingQueue.new(10) | |
kafka_input = LogStash::Inputs::Kafka.new(consumer_config) | |
kafka_input.register | |
expect(logger).to receive(:error).with("Another consumer with same group.instance.id has connected") | |
input_worker = java.lang.Thread.new { kafka_input.run(queue) } | |
begin | |
input_worker.start | |
wait_kafka_input_is_ready("logstash_integration_static_membership_topic", queue) | |
saboteur_kafka_consumer = create_consumer_and_start_consuming("test_static_group_id") | |
saboteur_kafka_consumer.run # ask to be scheduled | |
saboteur_kafka_consumer.join | |
expect(saboteur_kafka_consumer.value).to eq("saboteur exited") | |
ensure | |
input_worker.join(30_000) | |
end | |
end | |
context "when another client with same static membership connects" do | |
let(:queue) { java.util.concurrent.ArrayBlockingQueue.new(10) } | |
let(:kafka_input) { LogStash::Inputs::Kafka.new(consumer_config) } | |
before(:each) { kafka_input.register } | |
it "input plugin disconnects from the broker" do | |
expect(logger).to receive(:error).with("Another consumer with same group.instance.id has connected") | |
input_worker = java.lang.Thread.new { kafka_input.run(queue) } | |
begin | |
input_worker.start | |
wait_kafka_input_is_ready("logstash_integration_static_membership_topic", queue) | |
saboteur_kafka_consumer = create_consumer_and_start_consuming("test_static_group_id") | |
saboteur_kafka_consumer.run # ask to be scheduled | |
saboteur_kafka_consumer.join | |
expect(saboteur_kafka_consumer.value).to eq("saboteur exited") | |
ensure | |
input_worker.join(30_000) | |
end | |
end | |
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice suggestion thanks, integrated with commit 97f35b3.
Co-authored-by: João Duarte <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…nce.id in case of collision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments for your consideration. In places, I made some guesses rather than asking questions to help us reduce the back-and-forth of multiple review cycles. Please keep me honest, and let me know if you'd like to discuss. :-)
docs/input-kafka.asciidoc
Outdated
NOTE: It has to be unique across all the clients belonging to the same <<plugins-{type}s-{plugin}-group_id>>, in case another client connects | ||
with same `group.instance.id` value then the oldest is kicked off. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: It has to be unique across all the clients belonging to the same <<plugins-{type}s-{plugin}-group_id>>, in case another client connects | |
with same `group.instance.id` value then the oldest is kicked off. | |
NOTE: The `group_instance_id` setting must be unique across all the clients belonging to the same <<plugins-{type}s-{plugin}-group_id>>. Otherwise, another client connecting | |
with same `group.instance.id` value would cause the oldest instance to be removed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I got all the details right. I'm trying to add precise language by restating the name of the option rather than just saying "it".
Also, "kicked off" is ambiguous in English. It can mean to start something (like kicking off a process" or deleting/removing something. I'm guessing you mean the latter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you guessed right :-) I thinks that instead of
instance to be removed
would be more appropriate
instance to be disconnected
Co-authored-by: Karen Metts <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Release notes
Added config
group_instance_id
to use the Kafka's consumer static membership featureWhat does this PR do?
Static membership is reflected in the Kafka property
group.instance.id
, which has to be a unique identifier of the consumer instance provided by end user.If
consumer_threads
settings is 1 the value is passed directly down to the Kafka's consumer configuration, but if threads count is more than 1, as per KIP-345 it would clash, so in that case, a postfix-<thread-index>
is added.Why is it important/What is the impact to the user?
The static membership feature offered by Kafka client's consumer is intended to bind a consumer to a partition, this is needed in cases where the cost of state replication between consumers during a rebalance, is high. This PR exposes the feature, which is optional, to Logstash's users.
Checklist
[ ] I have made corresponding change to the default configuration files (and/or docker env variables)Author's Checklist
How to test this PR locally
Runs a local Kafka cluster
Fro the clone of this repository launch the test Kafka script
Connect a producer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic "logstash_integration_static_membership_topic"
Setup Logstash Kafka input & run
Gemfile
test_pipeline.conf
fileVerify Logstash is receiving data
From the producer's console send some data and verify on the Logstash console the message is received.
Verify another consumer kicks off Logstash
Start another consumer with same
group.instance.id
in same consumer groupbuild/kafka the file
client_config.properties` with contentTest with multiple threads
consumer_threads
> 1 and verify that another consumer doesn't kicks off because thegroup.instance.id
has now been suffixed with-n
for each Logstash Kafka consumer threads.Related issues
static membership
Kafka feature to clients #134Use cases
As a Kafka uses that want to avoid that consumers spend a lot of time during rebalances, specially when they have heavy state, I want to be able to assign a "static membership" id to every consumer.