diff --git a/go.mod b/go.mod index a6f67e13..76db20c4 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( require ( github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect + github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 4437ca40..f6eb9d2f 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjH github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE= github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 h1:t3eaIm0rUkzbrIewtiFmMK5RXHej2XnoXNhxVsAYUfg= github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/kafka_exporter.go b/kafka_exporter.go index 1122501b..8f02b6a1 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -1,6 +1,7 @@ package main import ( + "context" "crypto/tls" "crypto/x509" "flag" @@ -16,7 +17,7 @@ import ( "github.com/IBM/sarama" kingpin "github.com/alecthomas/kingpin/v2" - "github.com/krallistic/kazoo-go" + "github.com/aws/aws-msk-iam-sasl-signer-go/signer" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -25,8 +26,6 @@ import ( plogflag "github.com/prometheus/common/promlog/flag" "github.com/prometheus/common/version" - "github.com/rcrowley/go-metrics" - "k8s.io/klog/v2" ) const ( @@ -89,6 +88,7 @@ type kafkaOpts struct { saslPassword string saslMechanism string saslDisablePAFXFast bool + saslAwsRegion string useTLS bool tlsServerName string tlsCAFile string @@ -117,6 +117,15 @@ type kafkaOpts struct { verbosityLogLevel int } +type MSKAccessTokenProvider struct { + region string +} + +func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) { + token, _, err := signer.GenerateAuthToken(context.TODO(), m.region) + return &sarama.AccessToken{Token: token}, err +} + // CanReadCertAndKey returns true if the certificate and key files already exists, // otherwise returns false. If lost one of cert and key, returns error. func CanReadCertAndKey(certPath, keyPath string) (bool, error) { @@ -188,6 +197,9 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF if opts.saslDisablePAFXFast { config.Net.SASL.GSSAPI.DisablePAFXFAST = true } + case "awsiam": + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth) + config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{region: opts.saslAwsRegion} case "plain": default: return nil, fmt.Errorf( @@ -749,7 +761,8 @@ func main() { toFlagBoolVar("sasl.handshake", "Only set this to false if using a non-Kafka SASL proxy, default is true.", true, "true", &opts.useSASLHandshake) toFlagStringVar("sasl.username", "SASL user name.", "", &opts.saslUsername) toFlagStringVar("sasl.password", "SASL user password.", "", &opts.saslPassword) - toFlagStringVar("sasl.mechanism", "The SASL SCRAM SHA algorithm sha256 or sha512 or gssapi as mechanism", "", &opts.saslMechanism) + toFlagStringVar("sasl.aws-region", "The AWS region for IAM SASL authentication", os.Getenv("AWS_REGION"), &opts.saslAwsRegion) + toFlagStringVar("sasl.mechanism", "The SASL mechanism: gssapi, awsiam or plain or SASL SCRAM SHA algorithm: sha256 or sha512", "", &opts.saslMechanism) toFlagStringVar("sasl.service-name", "Service name when using kerberos Auth", "", &opts.serviceName) toFlagStringVar("sasl.kerberos-config-path", "Kerberos config path", "", &opts.kerberosConfigPath) toFlagStringVar("sasl.realm", "Kerberos realm", "", &opts.realm)