diff --git a/Gemfile b/Gemfile index db01f5a..0e3cf2b 100644 --- a/Gemfile +++ b/Gemfile @@ -3,4 +3,5 @@ source 'https://rubygems.org' # Specify your gem's dependencies in fluent-plugin-kafka.gemspec gemspec +gem 'json', '2.7.3' # override of 2.7.4 version gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA'] diff --git a/README.md b/README.md index 814d40e..9147728 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,45 @@ If you want to use zookeeper related parameters, you also need to install zookee ## Usage +### :exclamation: In this fork: MSK IAM Authentication Support for `rdkafka2` Output Type + +This fork adds support for using MSK IAM authentication with the `rdkafka2` output type in Fluentd. Authentication and authorization with an MSK cluster are facilitated through a base64-encoded signed URL, which is generated by the [aws-msk-iam-sasl-signer-ruby](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby) library. + +The `aws-msk-iam-sasl-signer-ruby` library provides an [example](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby/tree/main/examples/rdkafka) for generating the OAuthBearer token using rdkafka, which is one of the core Kafka libraries supported by the Fluentd fluent-plugin-kafka plugin. This fork integrates that example into the `Fluent::Rdkafka2Output` class, enabling AWS IAM authentication. + +The key change is the inclusion of a refresh callback: +```ruby +Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) +``` +This callback triggers token generation when needed, ensuring continuous authentication with the MSK cluster. + +#### Configuration Example +To enable this feature, configure your Fluentd input as follows: + +``` + + @type rdkafka2 + # Kafka brokers to connect to (typically port 9098 or 9198 for IAM authentication) + brokers + # Topic to write events to + topic_key test-topic-1 + default_topic test-topic-1 + + # AWS Region (required) + aws_msk_region us-east-1 + + # Use a shared producer for the connection (required) + share_producer true + + # MSK IAM authentication settings (required) + rdkafka_options { + "security.protocol": "sasl_ssl", + "sasl.mechanisms": "OAUTHBEARER" + } + +``` +With this configuration, Fluentd will handle the token refresh and manage the connection to your MSK cluster using AWS IAM authentication. + ### Common parameters #### SSL authentication @@ -563,7 +602,7 @@ You need to install rdkafka gem. `rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter: -- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`. +- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`. If you use v0.12, use `rdkafka` instead. diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index ea118bb..ad57ea9 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -21,7 +21,6 @@ Gem::Specification.new do |gem| gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' gem.add_dependency 'rdkafka' gem.add_dependency 'aws-msk-iam-sasl-signer' - gem.add_dependency 'json', '2.7.3' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 2038d90..c76e3f5 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -4,6 +4,7 @@ require 'fluent/plugin/kafka_plugin_util' require 'rdkafka' +require 'aws_msk_iam_sasl_signer' begin rdkafka_version = Gem::Version::create(Rdkafka::VERSION) @@ -307,8 +308,14 @@ def build_config end def refresh_token(_config, _client_name) - print "refreshing token\n" + log.info("+--- Refreshing token") client = get_producer + # This will happen once upon initialization and is expected to fail, as the producer isnt set yet + # We will set the token manually after creation and after that this refresh method will work + unless client + log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)") + return + end signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region) token = signer.generate_auth_token @@ -325,10 +332,14 @@ def refresh_token(_config, _client_name) end end - # HERE ----------------- def start if @share_producer @shared_producer = @rdkafka.producer + log.info("Created shared producer") + if @aws_msk_region + refresh_token(nil, nil) + log.info("Set initial token for shared producer") + end else @producers = {} @producers_mutex = Mutex.new @@ -336,7 +347,6 @@ def start super end - # HERE ----------------- def multi_workers_ready? true