Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for MSK with IAM authentication #521

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
2 changes: 0 additions & 2 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,3 @@ source 'https://rubygems.org'

# Specify your gem's dependencies in fluent-plugin-kafka.gemspec
gemspec

gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA']
41 changes: 40 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,45 @@ If you want to use zookeeper related parameters, you also need to install zookee

## Usage

### :exclamation: In this fork: MSK IAM Authentication Support for `rdkafka2` Output Type

This fork adds support for using MSK IAM authentication with the `rdkafka2` output type in Fluentd. Authentication and authorization with an MSK cluster are facilitated through a base64-encoded signed URL, which is generated by the [aws-msk-iam-sasl-signer-ruby](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby) library.

The `aws-msk-iam-sasl-signer-ruby` library provides an [example](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby/tree/main/examples/rdkafka) for generating the OAuthBearer token using rdkafka, which is one of the core Kafka libraries supported by the Fluentd fluent-plugin-kafka plugin. This fork integrates that example into the `Fluent::Rdkafka2Output` class, enabling AWS IAM authentication.

The key change is the inclusion of a refresh callback:
```ruby
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
```
This callback triggers token generation when needed, ensuring continuous authentication with the MSK cluster.

#### Configuration Example
To enable this feature, configure your Fluentd input as follows:

```
<match *>
@type rdkafka2
# Kafka brokers to connect to (typically port 9098 or 9198 for IAM authentication)
brokers <broker_addresses>
# Topic to write events to
topic_key test-topic-1
default_topic test-topic-1

# AWS Region (required)
aws_msk_region us-east-1

# Use a shared producer for the connection (required)
share_producer true

# MSK IAM authentication settings (required)
rdkafka_options {
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "OAUTHBEARER"
}
</match>
```
With this configuration, Fluentd will handle the token refresh and manage the connection to your MSK cluster using AWS IAM authentication.

### Common parameters

#### SSL authentication
Expand Down Expand Up @@ -563,7 +602,7 @@ You need to install rdkafka gem.
</match>

`rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter:
- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`.
- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`.

If you use v0.12, use `rdkafka` instead.

Expand Down
8 changes: 8 additions & 0 deletions fluent-plugin-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ Gem::Specification.new do |gem|
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
gem.add_dependency 'ltsv'
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'

if ENV['USE_RDKAFKA']
gem.add_dependency 'rdkafka', [ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE']]
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0')
gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1'
end
end

gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", "~> 1.0"
Expand Down
42 changes: 42 additions & 0 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
raise "unable to patch rdkafka."
end

if Gem::Version.create(RUBY_VERSION) >= Gem::Version.create('3.0')
require 'aws-msk-iam-sasl-signer'
end

module Fluent::Plugin
class Fluent::Rdkafka2Output < Output
Fluent::Plugin.register_output('rdkafka2', self)
Expand Down Expand Up @@ -101,6 +105,8 @@ class Fluent::Rdkafka2Output < Output
config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"],
:desc => 'Handle some of the error codes should be unrecoverable if specified'

config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK'

config_section :buffer do
config_set_default :chunk_keys, ["topic"]
end
Expand Down Expand Up @@ -209,6 +215,11 @@ def add(level, message = nil)
config = build_config
@rdkafka = Rdkafka::Config.new(config)


if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER"
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
end

if @default_topic.nil?
if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true"
Expand Down Expand Up @@ -289,16 +300,47 @@ def build_config
config[:"sasl.password"] = @password if @password
config[:"enable.idempotence"] = @idempotent if @idempotent

# sasl.mechnisms and security.protocol are set as rdkafka_options
@rdkafka_options.each { |k, v|
config[k.to_sym] = v
}

config
end

def refresh_token(_config, _client_name)
log.info("+--- Refreshing token")
client = get_producer
# This will happen once upon initialization and is expected to fail, as the producer isnt set yet
# We will set the token manually after creation and after that this refresh method will work
unless client
log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)")
return
end
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region)
token = signer.generate_auth_token

if token
client.oauthbearer_set_token(
token: token.token,
lifetime_ms: token.expiration_time_ms,
principal_name: "kafka-cluster"
)
else
client.oauthbearer_set_token_failure(
"Failed to generate token."
)
end
end

def start
if @share_producer
@shared_producer = @rdkafka.producer
log.info("Created shared producer")
if @aws_msk_region
refresh_token(nil, nil)
log.info("Set initial token for shared producer")
end
else
@producers = {}
@producers_mutex = Mutex.new
Expand Down
Loading