diff --git a/Gemfile b/Gemfile index db01f5a..d5c272e 100644 --- a/Gemfile +++ b/Gemfile @@ -2,5 +2,3 @@ source 'https://rubygems.org' # Specify your gem's dependencies in fluent-plugin-kafka.gemspec gemspec - -gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA'] diff --git a/README.md b/README.md index 814d40e..9147728 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,45 @@ If you want to use zookeeper related parameters, you also need to install zookee ## Usage +### :exclamation: In this fork: MSK IAM Authentication Support for `rdkafka2` Output Type + +This fork adds support for using MSK IAM authentication with the `rdkafka2` output type in Fluentd. Authentication and authorization with an MSK cluster are facilitated through a base64-encoded signed URL, which is generated by the [aws-msk-iam-sasl-signer-ruby](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby) library. + +The `aws-msk-iam-sasl-signer-ruby` library provides an [example](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby/tree/main/examples/rdkafka) for generating the OAuthBearer token using rdkafka, which is one of the core Kafka libraries supported by the Fluentd fluent-plugin-kafka plugin. This fork integrates that example into the `Fluent::Rdkafka2Output` class, enabling AWS IAM authentication. + +The key change is the inclusion of a refresh callback: +```ruby +Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) +``` +This callback triggers token generation when needed, ensuring continuous authentication with the MSK cluster. + +#### Configuration Example +To enable this feature, configure your Fluentd input as follows: + +``` + + @type rdkafka2 + # Kafka brokers to connect to (typically port 9098 or 9198 for IAM authentication) + brokers + # Topic to write events to + topic_key test-topic-1 + default_topic test-topic-1 + + # AWS Region (required) + aws_msk_region us-east-1 + + # Use a shared producer for the connection (required) + share_producer true + + # MSK IAM authentication settings (required) + rdkafka_options { + "security.protocol": "sasl_ssl", + "sasl.mechanisms": "OAUTHBEARER" + } + +``` +With this configuration, Fluentd will handle the token refresh and manage the connection to your MSK cluster using AWS IAM authentication. + ### Common parameters #### SSL authentication @@ -563,7 +602,7 @@ You need to install rdkafka gem. `rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter: -- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`. +- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`. If you use v0.12, use `rdkafka` instead. diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index 3620ddb..ceace4f 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -19,6 +19,14 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.10.58", "< 2"] gem.add_dependency 'ltsv' gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' + + if ENV['USE_RDKAFKA'] + gem.add_dependency 'rdkafka', [ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE']] + if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0') + gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1' + end + end + gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index b306cfb..6301e19 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -20,6 +20,10 @@ raise "unable to patch rdkafka." end +if Gem::Version.create(RUBY_VERSION) >= Gem::Version.create('3.0') + require 'aws-msk-iam-sasl-signer' +end + module Fluent::Plugin class Fluent::Rdkafka2Output < Output Fluent::Plugin.register_output('rdkafka2', self) @@ -101,6 +105,8 @@ class Fluent::Rdkafka2Output < Output config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"], :desc => 'Handle some of the error codes should be unrecoverable if specified' + config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK' + config_section :buffer do config_set_default :chunk_keys, ["topic"] end @@ -209,6 +215,11 @@ def add(level, message = nil) config = build_config @rdkafka = Rdkafka::Config.new(config) + + if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER" + Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) + end + if @default_topic.nil? if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true" @@ -289,6 +300,7 @@ def build_config config[:"sasl.password"] = @password if @password config[:"enable.idempotence"] = @idempotent if @idempotent + # sasl.mechnisms and security.protocol are set as rdkafka_options @rdkafka_options.each { |k, v| config[k.to_sym] = v } @@ -296,9 +308,39 @@ def build_config config end + def refresh_token(_config, _client_name) + log.info("+--- Refreshing token") + client = get_producer + # This will happen once upon initialization and is expected to fail, as the producer isnt set yet + # We will set the token manually after creation and after that this refresh method will work + unless client + log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)") + return + end + signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region) + token = signer.generate_auth_token + + if token + client.oauthbearer_set_token( + token: token.token, + lifetime_ms: token.expiration_time_ms, + principal_name: "kafka-cluster" + ) + else + client.oauthbearer_set_token_failure( + "Failed to generate token." + ) + end + end + def start if @share_producer @shared_producer = @rdkafka.producer + log.info("Created shared producer") + if @aws_msk_region + refresh_token(nil, nil) + log.info("Set initial token for shared producer") + end else @producers = {} @producers_mutex = Mutex.new