Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for AWS Web Identity auth flow #467

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
57 changes: 36 additions & 21 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,28 @@ import (
)

type Config struct {
BootstrapServers *[]string
Timeout int
CACert string
ClientCert string
ClientCertKey string
ClientCertKeyPassphrase string
KafkaVersion string
TLSEnabled bool
SkipTLSVerify bool
SASLUsername string
SASLPassword string
SASLMechanism string
SASLAWSRegion string
SASLAWSRoleArn string
SASLAWSProfile string
SASLAWSAccessKey string
SASLAWSSecretKey string
SASLAWSToken string
SASLAWSCredsDebug bool
SASLTokenUrl string
BootstrapServers *[]string
Timeout int
CACert string
ClientCert string
ClientCertKey string
ClientCertKeyPassphrase string
KafkaVersion string
TLSEnabled bool
SkipTLSVerify bool
SASLUsername string
SASLPassword string
SASLMechanism string
SASLAWSRegion string
SASLAWSRoleArn string
SASLAWSWebIdentityToken string
SASLAWSWebIdentityTokenFile string
SASLAWSProfile string
SASLAWSAccessKey string
SASLAWSSecretKey string
SASLAWSToken string
SASLAWSCredsDebug bool
SASLTokenUrl string
}

type OAuth2Config interface {
Expand Down Expand Up @@ -84,8 +86,19 @@ func (o *oauthbearerTokenProvider) Token() (*sarama.AccessToken, error) {
func (c *Config) Token() (*sarama.AccessToken, error) {
signer.AwsDebugCreds = c.SASLAWSCredsDebug
var token string
var webIdentityTokenBuffer []byte
var err error
if c.SASLAWSRoleArn != "" {
if c.SASLAWSRoleArn != "" && (c.SASLAWSWebIdentityToken != "" || c.SASLAWSWebIdentityTokenFile != "") {
log.Printf("[INFO] Generating auth token with a web identity role '%s' in '%s'", c.SASLAWSRoleArn, c.SASLAWSRegion)
if c.SASLAWSWebIdentityTokenFile != "" {
webIdentityTokenBuffer, err = os.ReadFile(c.SASLAWSWebIdentityTokenFile)
if err != nil {
return nil, err
}
c.SASLAWSWebIdentityToken = string(webIdentityTokenBuffer)
}
token, _, err = signer.GenerateAuthTokenFromWebIdentity(context.TODO(), c.SASLAWSRegion, c.SASLAWSRoleArn, c.SASLAWSWebIdentityToken, "terraform-kafka-provider")
} else if c.SASLAWSRoleArn != "" {
log.Printf("[INFO] Generating auth token with a role '%s' in '%s'", c.SASLAWSRoleArn, c.SASLAWSRegion)
token, _, err = signer.GenerateAuthTokenFromRole(context.TODO(), c.SASLAWSRegion, c.SASLAWSRoleArn, "terraform-kafka-provider")
} else if c.SASLAWSProfile != "" {
Expand Down Expand Up @@ -305,6 +318,8 @@ func (config *Config) copyWithMaskedSensitiveValues() Config {
config.SASLMechanism,
config.SASLAWSRegion,
config.SASLAWSRoleArn,
"*****",
config.SASLAWSWebIdentityTokenFile,
config.SASLAWSProfile,
config.SASLAWSAccessKey,
"*****",
Expand Down
54 changes: 34 additions & 20 deletions kafka/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ func Provider() *schema.Provider {
DefaultFunc: schema.EnvDefaultFunc("AWS_ROLE_ARN", nil),
Description: "Arn of an AWS IAM role to assume",
},
"sasl_aws_web_identity_token": {
Type: schema.TypeString,
Optional: true,
Default: "",
Description: "Arn of an AWS IAM role to assume",
},
"sasl_aws_web_identity_token_file": {
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("AWS_WEB_IDENTITY_TOKEN_FILE", nil),
Description: "Arn of an AWS IAM role to assume",
},
"sasl_aws_profile": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -179,26 +191,28 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {
}

config := &Config{
BootstrapServers: brokers,
CACert: d.Get("ca_cert").(string),
ClientCert: d.Get("client_cert").(string),
ClientCertKey: d.Get("client_key").(string),
ClientCertKeyPassphrase: d.Get("client_key_passphrase").(string),
KafkaVersion: d.Get("kafka_version").(string),
SkipTLSVerify: d.Get("skip_tls_verify").(bool),
SASLAWSRegion: d.Get("sasl_aws_region").(string),
SASLUsername: d.Get("sasl_username").(string),
SASLPassword: d.Get("sasl_password").(string),
SASLTokenUrl: d.Get("sasl_token_url").(string),
SASLAWSRoleArn: d.Get("sasl_aws_role_arn").(string),
SASLAWSProfile: d.Get("sasl_aws_profile").(string),
SASLAWSAccessKey: d.Get("sasl_aws_access_key").(string),
SASLAWSSecretKey: d.Get("sasl_aws_secret_key").(string),
SASLAWSToken: d.Get("sasl_aws_token").(string),
SASLAWSCredsDebug: d.Get("sasl_aws_creds_debug").(bool),
SASLMechanism: saslMechanism,
TLSEnabled: d.Get("tls_enabled").(bool),
Timeout: d.Get("timeout").(int),
BootstrapServers: brokers,
CACert: d.Get("ca_cert").(string),
ClientCert: d.Get("client_cert").(string),
ClientCertKey: d.Get("client_key").(string),
ClientCertKeyPassphrase: d.Get("client_key_passphrase").(string),
KafkaVersion: d.Get("kafka_version").(string),
SkipTLSVerify: d.Get("skip_tls_verify").(bool),
SASLAWSRegion: d.Get("sasl_aws_region").(string),
SASLUsername: d.Get("sasl_username").(string),
SASLPassword: d.Get("sasl_password").(string),
SASLTokenUrl: d.Get("sasl_token_url").(string),
SASLAWSRoleArn: d.Get("sasl_aws_role_arn").(string),
SASLAWSWebIdentityToken: d.Get("sasl_aws_web_identity_token").(string),
SASLAWSWebIdentityTokenFile: d.Get("sasl_aws_web_identity_token_file").(string),
SASLAWSProfile: d.Get("sasl_aws_profile").(string),
SASLAWSAccessKey: d.Get("sasl_aws_access_key").(string),
SASLAWSSecretKey: d.Get("sasl_aws_secret_key").(string),
SASLAWSToken: d.Get("sasl_aws_token").(string),
SASLAWSCredsDebug: d.Get("sasl_aws_creds_debug").(bool),
SASLMechanism: saslMechanism,
TLSEnabled: d.Get("tls_enabled").(bool),
Timeout: d.Get("timeout").(int),
}

if config.CACert == "" {
Expand Down