Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate dynamodb engine to AWS SDK v2 #50250

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.43
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8
github.com/aws/aws-sdk-go-v2/service/ec2 v1.195.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,8 @@ github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1 h1:8EwNbY+A/
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1/go.mod h1:2mMP2R86zLPAUz0TpJdsKW8XawHgs9Nk97fYJomO3o8=
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0 h1:D+iatX9gV6gCuNd6BnUkfwfZJw/cXlEk+LwwDdSMdtw=
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0/go.mod h1:27ljwDsnZvfrZKsLzWD4WFjI4OZutEFIjvVtYfj9gHc=
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7 h1:hZg1sHhWXGZShzHGpwcaOT8HZfx26kkbRDNZgZda4xI=
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7/go.mod h1:fYBjETTq8hZfirBEgXM1xIMy+tvCGYZTeWpjeKKp0bU=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 h1:isKhHsjpQR3CypQJ4G1g8QWx7zNpiC/xKw1zjgJYVno=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0/go.mod h1:xDvUyIkwBwNtVZJdHEwAuhFly3mezwdEWkbJ5oNYwIw=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 h1:ntqHwZb+ZyVz0CFYUG0sQ02KMMJh+iXeV3bXoba+s4A=
Expand Down
2 changes: 2 additions & 0 deletions integrations/terraform/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,8 @@ github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1 h1:8EwNbY+A/
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1/go.mod h1:2mMP2R86zLPAUz0TpJdsKW8XawHgs9Nk97fYJomO3o8=
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0 h1:D+iatX9gV6gCuNd6BnUkfwfZJw/cXlEk+LwwDdSMdtw=
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0/go.mod h1:27ljwDsnZvfrZKsLzWD4WFjI4OZutEFIjvVtYfj9gHc=
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7 h1:hZg1sHhWXGZShzHGpwcaOT8HZfx26kkbRDNZgZda4xI=
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7/go.mod h1:fYBjETTq8hZfirBEgXM1xIMy+tvCGYZTeWpjeKKp0bU=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 h1:isKhHsjpQR3CypQJ4G1g8QWx7zNpiC/xKw1zjgJYVno=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0/go.mod h1:xDvUyIkwBwNtVZJdHEwAuhFly3mezwdEWkbJ5oNYwIw=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 h1:ntqHwZb+ZyVz0CFYUG0sQ02KMMJh+iXeV3bXoba+s4A=
Expand Down
174 changes: 111 additions & 63 deletions lib/srv/db/dynamodb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/service/dax"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dax"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"

Expand All @@ -43,6 +43,7 @@ import (
"github.com/gravitational/teleport/lib/cloud"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/gravitational/teleport/lib/srv/db/common/role"
"github.com/gravitational/teleport/lib/utils"
Expand All @@ -54,6 +55,7 @@ func NewEngine(ec common.EngineConfig) common.Engine {
return &Engine{
EngineConfig: ec,
RoundTrippers: make(map[string]http.RoundTripper),
UseFIPS: modules.GetModules().IsBoringBinary(),
}
}

Expand All @@ -71,6 +73,8 @@ type Engine struct {
RoundTrippers map[string]http.RoundTripper
// CredentialsGetter is used to obtain STS credentials.
CredentialsGetter libaws.CredentialsGetter
// UseFIPS will ensure FIPS endpoint resolution.
UseFIPS bool
}

var _ common.Engine = (*Engine)(nil)
Expand Down Expand Up @@ -194,7 +198,7 @@ func (e *Engine) process(ctx context.Context, req *http.Request, signer *libaws.
// emit an audit event regardless of failure, but using the resolved endpoint.
var responseStatusCode uint32
defer func() {
e.emitAuditEvent(req, re.URL, responseStatusCode, err)
e.emitAuditEvent(req, re.URL.String(), responseStatusCode, err)
}()

// try to read, close, and replace the incoming request body.
Expand Down Expand Up @@ -319,8 +323,8 @@ func (e *Engine) checkAccess(ctx context.Context, sessionCtx *common.Session) er
}

// getRoundTripper makes an HTTP round tripper with TLS config based on the given URL.
func (e *Engine) getRoundTripper(ctx context.Context, URL string) (http.RoundTripper, error) {
if rt, ok := e.RoundTrippers[URL]; ok {
func (e *Engine) getRoundTripper(ctx context.Context, u *url.URL) (http.RoundTripper, error) {
if rt, ok := e.RoundTrippers[u.String()]; ok {
return rt, nil
}
tlsConfig, err := e.Auth.GetTLSConfig(ctx, e.sessionCtx.GetExpiry(), e.sessionCtx.Database, e.sessionCtx.DatabaseUser)
Expand All @@ -329,55 +333,128 @@ func (e *Engine) getRoundTripper(ctx context.Context, URL string) (http.RoundTri
}
// We need to set the ServerName here because the AWS endpoint service prefix is not known in advance,
// and the TLS config we got does not set it.
host, err := getURLHostname(URL)
if err != nil {
return nil, trace.Wrap(err)
}
tlsConfig.ServerName = host
tlsConfig.ServerName = u.Hostname()

out, err := defaults.Transport()
if err != nil {
return nil, trace.Wrap(err)
}
out.TLSClientConfig = tlsConfig
e.RoundTrippers[URL] = out
e.RoundTrippers[u.String()] = out
return out, nil
}

// resolveEndpoint returns a resolved endpoint for either the configured URI or the AWS target service and region.
func (e *Engine) resolveEndpoint(req *http.Request) (*endpoints.ResolvedEndpoint, error) {
endpointID, err := extractEndpointID(req)
type endpoint struct {
URL *url.URL
SigningName string
SigningRegion string
}

// resolveEndpoint returns a resolved endpoint for either the configured URI or
// the AWS target service and region.
// For a target operation, the appropriate AWS service resolver is used.
// Targets look like one of DynamoDB_$version.$operation,
// DynamoDBStreams_$version.$operation, or AmazonDAX$version.$operation.
// For example: DynamoDBStreams_20120810.ListStreams
func (e *Engine) resolveEndpoint(req *http.Request) (*endpoint, error) {
target, err := getTargetHeader(req)
if err != nil {
return nil, trace.Wrap(err)
}
opts := func(opts *endpoints.Options) {
opts.ResolveUnknownService = true

awsMeta := e.sessionCtx.Database.GetAWS()

var re *endpoint
switch target := strings.ToLower(target); {
case strings.HasPrefix(target, "dynamodbstreams"):
re, err = resolveDynamoDBStreamsEndpoint(req.Context(), awsMeta.Region, e.UseFIPS)
case strings.HasPrefix(target, "dynamodb"):
re, err = resolveDynamoDBEndpoint(req.Context(), awsMeta.Region, awsMeta.AccountID, e.UseFIPS)
case strings.HasPrefix(target, "amazondax"):
re, err = resolveDaxEndpoint(req.Context(), awsMeta.Region, e.UseFIPS)
default:
return nil, trace.BadParameter("DynamoDB API target %q is not recognized", target)
}
re, err := endpoints.DefaultResolver().EndpointFor(endpointID, e.sessionCtx.Database.GetAWS().Region, opts)
if err != nil {
return nil, trace.Wrap(err)
}
uri := e.sessionCtx.Database.GetURI()
if uri != "" && uri != apiaws.DynamoDBURIForRegion(e.sessionCtx.Database.GetAWS().Region) {
if uri != "" && uri != apiaws.DynamoDBURIForRegion(awsMeta.Region) {
// Add a temporary schema to make a valid URL for url.Parse.
if !strings.Contains(uri, "://") {
uri = "schema://" + uri
}
u, err := url.Parse(uri)
if err != nil {
return nil, trace.Wrap(err)
}
// override the resolved endpoint URL with the user-configured URI.
re.URL = uri
re.URL = u
}
if !strings.Contains(re.URL, "://") {
re.URL = "https://" + re.URL
// Force HTTPS
re.URL.Scheme = "https"
return re, nil
}

func resolveDynamoDBStreamsEndpoint(ctx context.Context, region string, useFIPS bool) (*endpoint, error) {
params := dynamodbstreams.EndpointParameters{
Region: aws.String(region),
UseFIPS: aws.Bool(useFIPS),
}
ep, err := dynamodbstreams.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
if err != nil {
return nil, trace.Wrap(err)
}
return &re, nil
return &endpoint{
URL: &ep.URI,
SigningRegion: region,
// DynamoDB Streams uses the same signing name as DynamoDB.
SigningName: "dynamodb",
}, nil
}

// rewriteRequest clones a request, modifies the clone to rewrite its URL, and returns the modified request clone.
func rewriteRequest(ctx context.Context, r *http.Request, re *endpoints.ResolvedEndpoint, body []byte) (*http.Request, error) {
resolvedURL, err := url.Parse(re.URL)
func resolveDynamoDBEndpoint(ctx context.Context, region, accountID string, useFIPS bool) (*endpoint, error) {
params := dynamodb.EndpointParameters{
Region: aws.String(region),
AccountIdEndpointMode: aws.String(aws.AccountIDEndpointModePreferred),
UseFIPS: aws.Bool(useFIPS),
}
if accountID != "" {
params.AccountId = aws.String(accountID)
}
ep, err := dynamodb.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
if err != nil {
return nil, trace.Wrap(err)
}
return &endpoint{
URL: &ep.URI,
SigningRegion: region,
SigningName: "dynamodb",
}, nil
}

func resolveDaxEndpoint(ctx context.Context, region string, useFIPS bool) (*endpoint, error) {
params := dax.EndpointParameters{
Region: aws.String(region),
UseFIPS: aws.Bool(useFIPS),
}
ep, err := dax.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
if err != nil {
return nil, trace.Wrap(err)
}
return &endpoint{
URL: &ep.URI,
SigningRegion: region,
SigningName: "dax",
}, nil
}

// rewriteRequest clones a request, modifies the clone to rewrite its URL, and returns the modified request clone.
func rewriteRequest(ctx context.Context, r *http.Request, re *endpoint, body []byte) (*http.Request, error) {
reqCopy := r.Clone(ctx)
// set url and host header to match the resolved endpoint.
reqCopy.URL = resolvedURL
reqCopy.Host = resolvedURL.Host
reqCopy.URL = re.URL
reqCopy.Host = re.URL.Host
if body == nil {
// no body is fine, skip copying it.
return reqCopy, nil
Expand All @@ -388,42 +465,13 @@ func rewriteRequest(ctx context.Context, r *http.Request, re *endpoints.Resolved
return reqCopy, nil
}

// extractEndpointID extracts the AWS endpoint ID from the request header X-Amz-Target.
func extractEndpointID(req *http.Request) (string, error) {
// getTargetHeader gets the X-Amz-Target header or returns an error if it is not
// present, as we rely on this header for endpoint resolution.
// See X-Amz-Target: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.LowLevelAPI.html
func getTargetHeader(req *http.Request) (string, error) {
target := req.Header.Get(libaws.AmzTargetHeader)
if target == "" {
return "", trace.BadParameter("missing %q header in http request", libaws.AmzTargetHeader)
}
endpointID, err := endpointIDForTarget(target)
return endpointID, trace.Wrap(err)
}

// endpointIDForTarget converts a target operation into the appropriate the AWS endpoint ID.
// Target looks like one of DynamoDB_$version.$operation, DynamoDBStreams_$version.$operation, AmazonDAX$version.$operation,
// for example: DynamoDBStreams_20120810.ListStreams
// See X-Amz-Target: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.LowLevelAPI.html
func endpointIDForTarget(target string) (string, error) {
t := strings.ToLower(target)
switch {
case strings.HasPrefix(t, "dynamodbstreams"):
return dynamodbstreams.EndpointsID, nil
case strings.HasPrefix(t, "dynamodb"):
return dynamodb.EndpointsID, nil
case strings.HasPrefix(t, "amazondax"):
return dax.EndpointsID, nil
default:
return "", trace.BadParameter("DynamoDB API target %q is not recognized", target)
}
}

// getURLHostname parses a URL to extract its hostname.
func getURLHostname(uri string) (string, error) {
if !strings.Contains(uri, "://") {
uri = "schema://" + uri
}
parsed, err := url.Parse(uri)
if err != nil {
return "", trace.Wrap(err)
}
return parsed.Hostname(), nil
return target, nil
}
Loading
Loading