-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Logstash is not able to connect to Kafka if Kafka host name is not resolved when Logstash starts #155
Comments
I have a similar issue. |
We've got similar issue: Logstash initialises fine, Kafka plugin behaves properly, until Kafka brokers DNS record changes, and logstash constantly fails insisting on previously resolved IP address. We would expect the plugin to refresh the IP address for those hostnames, but it didn't. EDIT: we're trialling version 8.1.0, after the kafka client was bumped from 2.1.0 to 2.3.0, and shall have fixed https://cwiki.apache.org/confluence/display/KAFKA/KIP-302+-+Enable+Kafka+clients+to+use+all+DNS+resolved+IP+addresses |
I am facing exactly the same issue on our k8s setup. After kafka brokers restart, they get new IPs and logstash keeps failing because it has cached the old IP address. Is there any known solution for this? |
As an interim fix ( more of a hack ) we ended up writing the custom fix. We did a TEE for the logs that are going on STDOUT and started writing the same into file as well. And we have a shell that greps "Broker may not be available" from that log file, the same shell script is a livenessProbe as well. This way, whenever there are more than n lines of logs that say broker may not be available, livenessProbe fails and k8s restarts the container. Ugly - but works! As a permanent fix kafka version of this plugin needs to be bumped up as this issue is in underlying version of kafka client version. The same has been addressed here ( there are a few tickets that reported similar issues including the one mentioned by JoseAlban above : https://issues.apache.org/jira/browse/KAFKA-7755 |
It looks like the call to Correct me if I'm wrong |
FYI: we worked around this issue by updating the kafka plugins in the container...
|
Stumbled upon this problem today. Our configs leverage a static IP for kafka, however, we noticed the host started to not be able to reach it's configured DNS (in /etc/resolv.conf). Simply removing the bad entry fixed our problems with this plugin. |
Tried to update logstash kafka plugins, did not work: Also tried logstash image 7.14.1, with this, still did not work, oh my: |
Mine, refer to @buch11 :
|
Description of the problem:
Steps 1
Kafka and Logstash are both shutdown
Configure Logstash output to connect to some unresolved Kafka host.
Run Logstash
Result:
Logstash is up and running. "Successfully started Logstash API endpoint" is printed
Expected behaviour:
Either perform retry to connect or shutdown Logstash
Step 2
Configure /etc/hosts to resolve the unknown host
Run Kafka (with default configuration)
Result:
Logstash is still unable to connect
Expected result:
Logstash, if running should be able to resolve the host and reconnect on next retry
Observation 1
The same does not happen if I configure Logstash to connect to the Kafka IP or localhost (I run both on the same host). For some reason, host name resolution affect plugin behavior.
Observation 2
When running without --config.reload.automatic flag, logstash shutdown when not able to resolve the host name. Not clear why --config.reload.automatic have effect on this behaviour
Affect on Docker deployments
The problem was first observed when running Logstash and Kafka in Docker (Swarm). When using overlay network, Logstash configuration is configured to resolve Kafka domain name in Docker internal DNS. In some cases, for example when rebooting the host, when Logstash starts before Kafka, Kafka DNS entry does not exists. Thus Logstash starts and is not able to resolve the host name. Later on when Kafka is up and the DNS holds a record for it, Logstash still not able to reconnect running Kafka service.
Seems that there is not retry mechanism for this plugin.
Issue Information:
OS Version: CentOS 7
uname -a: Linux localhost.localdomain 3.10.0-514.26.2.el7.x86_64 #1 SMP Tue Jul 4 15:04:05 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
Logstash Version: logstash-5.5.2
Kafka Version: kafka_2.11-0.11.0.0
Kafka Output Plugin Version: logstash-output-kafka (5.1.7)
Logstash Config:
input {
kafka {
topics => ["input"]
bootstrap_servers => "localhost:9092"
auto_offset_reset => "earliest"
}
}
output {
kafka
{
topic_id => "output"
bootstrap_servers => "unresolvedomain:9092"
codec => rubydebug
}
}
Running Logstash when Kafka is down:
[root@localhost logstash-5.5.2]# bin/logstash -f ./conf.conf --config.reload.automatic
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to /root/LS/logstash-5.5.2/logs which is now configured via log4j2.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/LS/logstash-5.5.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.8/vendor/jar-dependencies/runtime-jars/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/LS/logstash-5.5.2/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/vendor/jar-dependencies/runtime-jars/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[2017-09-13T00:34:52,023][ERROR][logstash.outputs.kafka ] Unable to create Kafka producer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer}
[2017-09-13T00:34:52,037][ERROR][logstash.pipeline ] Error registering plugin {:plugin=>"#<LogStash::OutputDelegator:0xd5acf1b @namespaced_metric=#<LogStash::Instrument::NamespacedMetric:0x43a2cdff @Metric=#<LogStash::Instrument::Metric:0x1e3c02c0 @collector=#<LogStash::Instrument::Collector:0x53a8927f @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x8d1bd06 @store=#<Concurrent::Map:0x00000000062668 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x5dc954d6, @fast_lookup=#<Concurrent::Map:0x0000000006266c entries=58 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :"fa0c0cda67c27f0d75547b7dfe084d7e3ce7d1aa-2"]>, @Metric=#<LogStash::Instrument::NamespacedMetric:0x34b132db @Metric=#<LogStash::Instrument::Metric:0x1e3c02c0 @collector=#<LogStash::Instrument::Collector:0x53a8927f @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x8d1bd06 @store=#<Concurrent::Map:0x00000000062668 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x5dc954d6, @fast_lookup=#<Concurrent::Map:0x0000000006266c entries=58 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs]>, @logger=#<LogStash::Logging::Logger:0x26f50dc5 @logger=#Java::OrgApacheLoggingLog4jCore::Logger:0x1944b06d>, @strategy=#<LogStash::OutputDelegatorStrategies::Shared:0x5387765b @output=<LogStash::Outputs::Kafka topic_id=>"output", bootstrap_servers=>"unresolvedomain:9092", codec=><LogStash::Codecs::RubyDebug id=>"rubydebug_4ea344bd-89e9-4484-a229-e79825f5e8e4", enable_metric=>true, metadata=>false>, id=>"fa0c0cda67c27f0d75547b7dfe084d7e3ce7d1aa-2", enable_metric=>true, workers=>1, acks=>"1", batch_size=>16384, block_on_buffer_full=>true, buffer_memory=>33554432, compression_type=>"none", key_serializer=>"org.apache.kafka.common.serialization.StringSerializer", linger_ms=>0, max_request_size=>1048576, metadata_fetch_timeout_ms=>60000, metadata_max_age_ms=>300000, receive_buffer_bytes=>32768, reconnect_backoff_ms=>10, retries=>0, retry_backoff_ms=>100, send_buffer_bytes=>131072, ssl=>false, security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", timeout_ms=>30000, value_serializer=>"org.apache.kafka.common.serialization.StringSerializer">>, @id="fa0c0cda67c27f0d75547b7dfe084d7e3ce7d1aa-2", @metric_events=#<LogStash::Instrument::NamespacedMetric:0xe51575d @Metric=#<LogStash::Instrument::Metric:0x1e3c02c0 @collector=#<LogStash::Instrument::Collector:0x53a8927f @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x8d1bd06 @store=#<Concurrent::Map:0x00000000062668 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x5dc954d6, @fast_lookup=#<Concurrent::Map:0x0000000006266c entries=58 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :"fa0c0cda67c27f0d75547b7dfe084d7e3ce7d1aa-2", :events]>, @output_class=LogStash::Outputs::Kafka>", :error=>"Failed to construct kafka producer"}
[2017-09-13T00:34:52,049][ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:335)", "org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/root/LS/logstash-5.5.2/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:242)", "RUBY.register(/root/LS/logstash-5.5.2/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:178)", "RUBY.register(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:9)", "RUBY.register(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/output_delegator.rb:41)", "RUBY.register_plugin(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:281)", "RUBY.register_plugins(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:292)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:292)", "RUBY.start_workers(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:301)", "RUBY.run(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:226)", "RUBY.start_pipeline(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/agent.rb:398)", "java.lang.Thread.run(java/lang/Thread.java:748)"]}
[2017-09-13T00:34:52,101][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
The text was updated successfully, but these errors were encountered: