Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added aws-sdk-go-v2 IAM Mechanism for MSK authentication #937

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions sasl/aws_msk_iam_v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# AWS MSK IAM V2

This extension provides a capability to get authenticated with [AWS Managed Apache Kafka](https://aws.amazon.com/msk/)
through AWS IAM.

## How to use

This module is an extension for MSK users and thus this is isolated from `kafka-go` module.
You can add this module to your dependency by running the command below.

```shell
go get github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2
```

You can use the `Mechanism` for SASL authentication, like below.

```go
package main

import (
"context"
"crypto/tls"
"time"

signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
)

func main() {
ctx := context.Background()

// using aws-sdk-go-v2
// NOTE: address error properly

cfg, _ := awsCfg.LoadDefaultConfig(ctx)
creds, _ := cfg.Credentials.Retrieve(ctx)
m := &aws_msk_iam_v2.Mechanism{
Signer: signer.NewSigner(),
Credentials: creds,
Region: "us-east-1",
SignTime: time.Now(),
Expiry: time.Minute * 5,
}
config := kafka.ReaderConfig{
Brokers: []string{"https://localhost"},
GroupID: "some-consumer-group",
GroupTopics: []string{"some-topic"},
Dialer: &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: m,
TLS: &tls.Config{},
},
}
}


```
10 changes: 10 additions & 0 deletions sasl/aws_msk_iam_v2/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2

go 1.15

require (
github.com/aws/aws-sdk-go-v2 v1.16.7
github.com/aws/aws-sdk-go-v2/credentials v1.12.9
github.com/segmentio/kafka-go v0.4.32
github.com/stretchr/testify v1.7.1
)
49 changes: 49 additions & 0 deletions sasl/aws_msk_iam_v2/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
github.com/aws/aws-sdk-go-v2 v1.16.7 h1:zfBwXus3u14OszRxGcqCDS4MfMCv10e8SMJ2r8Xm0Ns=
github.com/aws/aws-sdk-go-v2 v1.16.7/go.mod h1:6CpKuLXg2w7If3ABZCl/qZ6rEgwtjZTn4eAf4RcEyuw=
github.com/aws/aws-sdk-go-v2/credentials v1.12.9 h1:DloAJr0/jbvm0iVRFDFh8GlWxrOd9XKyX82U+dfVeZs=
github.com/aws/aws-sdk-go-v2/credentials v1.12.9/go.mod h1:2Vavxl1qqQXJ8MUcQZTsIEW8cwenFCWYXtLRPba3L/o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.8/go.mod h1:oL1Q3KuCq1D4NykQnIvtRiBGLUXhcpY5pl6QZB2XEPU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.14/go.mod h1:kdjrMwHwrC3+FsKhNcCMJ7tUVj/8uSD5CZXeQ4wV6fM=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.8/go.mod h1:ZIV8GYoC6WLBW5KGs+o4rsc65/ozd+eQ0L31XF5VDwk=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.8/go.mod h1:rDVhIMAX9N2r8nWxDUlbubvvaFMnfsm+3jAV7q+rpM4=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.12/go.mod h1:MO4qguFjs3wPGcCSpQ7kOFTwRvb+eu+fn+1vKleGHUk=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.9/go.mod h1:O1IvkYxr+39hRf960Us6j0x1P8pDqhTX+oXM5kQNl/Y=
github.com/aws/smithy-go v1.12.0 h1:gXpeZel/jPoWQ7OEmLIgCUnhkFftqNfwWUwAHSlp1v0=
github.com/aws/smithy-go v1.12.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.32 h1:Ohr+9E+kDv/Ld2UPJN9hnKZRd2qgiqCmI8v2e1qlfLM=
github.com/segmentio/kafka-go v0.4.32/go.mod h1:JAPPIiY3MQIwVHj64CWOP0LsFFfQ7H0w69kuoxnMIS0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 h1:dbuHpmKjkDzSOMKAWl10QNlgaZUd3V1q99xc81tt2Kc=
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
166 changes: 166 additions & 0 deletions sasl/aws_msk_iam_v2/msk_iam.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package aws_msk_iam_v2

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"runtime"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/segmentio/kafka-go/sasl"
)

const (
// These constants come from https://github.com/aws/aws-msk-iam-auth#details and
// https://github.com/aws/aws-msk-iam-auth/blob/main/src/main/java/software/amazon/msk/auth/iam/internals/AWS4SignedPayloadGenerator.java.
signAction = "kafka-cluster:Connect"
signPayload = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" // the hex encoded SHA-256 of an empty string
signService = "kafka-cluster"
signVersion = "2020_10_22"
signActionKey = "action"
signHostKey = "host"
signUserAgentKey = "user-agent"
signVersionKey = "version"
queryActionKey = "Action"
queryExpiryKey = "X-Amz-Expires"
)

var signUserAgent = fmt.Sprintf("kafka-go/sasl/aws_msk_iam/%s", runtime.Version())

// Mechanism implements sasl.Mechanism for the AWS_MSK_IAM mechanism, based on the official java implementation:
// https://github.com/aws/aws-msk-iam-auth
type Mechanism struct {
// The sigv4.Signer of aws-sdk-go-v2 to use when signing the request. Required.
Signer *signer.Signer
// The aws.Credentials of aws-sdk-go-v2. Required.
Credentials aws.Credentials
// The region where the msk cluster is hosted, e.g. "us-east-1". Required.
Region string
// The time the request is planned for. Optional, defaults to time.Now() at time of authentication.
SignTime time.Time
// The duration for which the presigned request is active. Optional, defaults to 5 minutes.
Expiry time.Duration
}

func (m *Mechanism) Name() string {
return "AWS_MSK_IAM"
}

func (m *Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) {
// After the initial step, the authentication is complete
// kafka will return error if it rejected the credentials, so we'll only
// arrive here on success.
return true, nil, nil
}

// Start produces the authentication values required for AWS_MSK_IAM. It produces the following json as a byte array,
// making use of the aws-sdk to produce the signed output.
// {
// "version" : "2020_10_22",
// "host" : "<broker host>",
// "user-agent": "<user agent string from the client>",
// "action": "kafka-cluster:Connect",
// "x-amz-algorithm" : "<algorithm>",
// "x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request",
// "x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>",
// "x-amz-security-token" : "<clientAWSSessionToken if any>",
// "x-amz-signedheaders" : "host",
// "x-amz-expires" : "<expiration in seconds>",
// "x-amz-signature" : "<AWS SigV4 signature computed by the client>"
// }
func (m *Mechanism) Start(ctx context.Context) (sess sasl.StateMachine, ir []byte, err error) {
signedMap, err := m.preSign(ctx)
if err != nil {
return nil, nil, err
}

signedJson, err := json.Marshal(signedMap)
return m, signedJson, err
}

// preSign produces the authentication values required for AWS_MSK_IAM.
func (m *Mechanism) preSign(ctx context.Context) (map[string]string, error) {
req, err := buildReq(ctx, defaultExpiry(m.Expiry))
if err != nil {
return nil, err
}

signedUrl, header, err := m.Signer.PresignHTTP(ctx, m.Credentials, req, signPayload, signService, m.Region, defaultSignTime(m.SignTime))
if err != nil {
return nil, err
}

u, err := url.Parse(signedUrl)
if err != nil {
return nil, err
}
return buildSignedMap(u, header), nil
}

// buildReq builds http.Request for aws PreSign.
func buildReq(ctx context.Context, expiry time.Duration) (*http.Request, error) {
query := url.Values{
queryActionKey: {signAction},
queryExpiryKey: {strconv.FormatInt(int64(expiry/time.Second), 10)},
}
saslMeta := sasl.MetadataFromContext(ctx)
if saslMeta == nil {
return nil, errors.New("missing sasl metadata")
}

signUrl := url.URL{
Scheme: "kafka",
Host: saslMeta.Host,
Path: "/",
RawQuery: query.Encode(),
}

req, err := http.NewRequest(http.MethodGet, signUrl.String(), nil)
if err != nil {
return nil, err
}

return req, nil
}

// buildSignedMap builds signed string map which will be used to authenticate with MSK.
func buildSignedMap(u *url.URL, header http.Header) map[string]string {
signedMap := map[string]string{
signVersionKey: signVersion,
signHostKey: u.Host,
signUserAgentKey: signUserAgent,
signActionKey: signAction,
}
// The protocol requires lowercase keys.
for key, vals := range header {
signedMap[strings.ToLower(key)] = vals[0]
}
for key, vals := range u.Query() {
signedMap[strings.ToLower(key)] = vals[0]
}

return signedMap
}

// defaultExpiry set default expiration time if user doesn't define Mechanism.Expiry.
func defaultExpiry(v time.Duration) time.Duration {
if v == 0 {
return 5 * time.Minute
}
return v
}

// defaultSignTime set default sign time if user doesn't define Mechanism.SignTime.
func defaultSignTime(v time.Time) time.Time {
if v.IsZero() {
return time.Now()
}
return v
}
Loading