Skip to content

Commit

Permalink
Migrate dynamodb engine to AWS SDK v2
Browse files Browse the repository at this point in the history
This migrates the Database Service engine for DynamoDB to use AWS SDK
v2.
FIPS endpoint resolution has also been enabled.

Finally, the engine will now resolve to the AWS-account-based endpoint for
DynamoDB operations in supported regions.
  • Loading branch information
GavinFrazar committed Dec 13, 2024
1 parent 821708e commit b113763
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 110 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.43
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8
github.com/aws/aws-sdk-go-v2/service/ec2 v1.195.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,8 @@ github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1 h1:8EwNbY+A/
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1/go.mod h1:2mMP2R86zLPAUz0TpJdsKW8XawHgs9Nk97fYJomO3o8=
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0 h1:D+iatX9gV6gCuNd6BnUkfwfZJw/cXlEk+LwwDdSMdtw=
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0/go.mod h1:27ljwDsnZvfrZKsLzWD4WFjI4OZutEFIjvVtYfj9gHc=
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7 h1:hZg1sHhWXGZShzHGpwcaOT8HZfx26kkbRDNZgZda4xI=
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7/go.mod h1:fYBjETTq8hZfirBEgXM1xIMy+tvCGYZTeWpjeKKp0bU=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 h1:isKhHsjpQR3CypQJ4G1g8QWx7zNpiC/xKw1zjgJYVno=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0/go.mod h1:xDvUyIkwBwNtVZJdHEwAuhFly3mezwdEWkbJ5oNYwIw=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 h1:ntqHwZb+ZyVz0CFYUG0sQ02KMMJh+iXeV3bXoba+s4A=
Expand Down
2 changes: 2 additions & 0 deletions integrations/terraform/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,8 @@ github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1 h1:8EwNbY+A/
github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.34.1/go.mod h1:2mMP2R86zLPAUz0TpJdsKW8XawHgs9Nk97fYJomO3o8=
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0 h1:D+iatX9gV6gCuNd6BnUkfwfZJw/cXlEk+LwwDdSMdtw=
github.com/aws/aws-sdk-go-v2/service/athena v1.49.0/go.mod h1:27ljwDsnZvfrZKsLzWD4WFjI4OZutEFIjvVtYfj9gHc=
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7 h1:hZg1sHhWXGZShzHGpwcaOT8HZfx26kkbRDNZgZda4xI=
github.com/aws/aws-sdk-go-v2/service/dax v1.23.7/go.mod h1:fYBjETTq8hZfirBEgXM1xIMy+tvCGYZTeWpjeKKp0bU=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 h1:isKhHsjpQR3CypQJ4G1g8QWx7zNpiC/xKw1zjgJYVno=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0/go.mod h1:xDvUyIkwBwNtVZJdHEwAuhFly3mezwdEWkbJ5oNYwIw=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 h1:ntqHwZb+ZyVz0CFYUG0sQ02KMMJh+iXeV3bXoba+s4A=
Expand Down
174 changes: 111 additions & 63 deletions lib/srv/db/dynamodb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/service/dax"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dax"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"

Expand All @@ -43,6 +43,7 @@ import (
"github.com/gravitational/teleport/lib/cloud"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/gravitational/teleport/lib/srv/db/common/role"
"github.com/gravitational/teleport/lib/utils"
Expand All @@ -54,6 +55,7 @@ func NewEngine(ec common.EngineConfig) common.Engine {
return &Engine{
EngineConfig: ec,
RoundTrippers: make(map[string]http.RoundTripper),
UseFIPS: modules.GetModules().IsBoringBinary(),
}
}

Expand All @@ -71,6 +73,8 @@ type Engine struct {
RoundTrippers map[string]http.RoundTripper
// CredentialsGetter is used to obtain STS credentials.
CredentialsGetter libaws.CredentialsGetter
// UseFIPS will ensure FIPS endpoint resolution.
UseFIPS bool
}

var _ common.Engine = (*Engine)(nil)
Expand Down Expand Up @@ -194,7 +198,7 @@ func (e *Engine) process(ctx context.Context, req *http.Request, signer *libaws.
// emit an audit event regardless of failure, but using the resolved endpoint.
var responseStatusCode uint32
defer func() {
e.emitAuditEvent(req, re.URL, responseStatusCode, err)
e.emitAuditEvent(req, re.URL.String(), responseStatusCode, err)
}()

// try to read, close, and replace the incoming request body.
Expand Down Expand Up @@ -319,8 +323,8 @@ func (e *Engine) checkAccess(ctx context.Context, sessionCtx *common.Session) er
}

// getRoundTripper makes an HTTP round tripper with TLS config based on the given URL.
func (e *Engine) getRoundTripper(ctx context.Context, URL string) (http.RoundTripper, error) {
if rt, ok := e.RoundTrippers[URL]; ok {
func (e *Engine) getRoundTripper(ctx context.Context, u *url.URL) (http.RoundTripper, error) {
if rt, ok := e.RoundTrippers[u.String()]; ok {
return rt, nil
}
tlsConfig, err := e.Auth.GetTLSConfig(ctx, e.sessionCtx.GetExpiry(), e.sessionCtx.Database, e.sessionCtx.DatabaseUser)
Expand All @@ -329,55 +333,128 @@ func (e *Engine) getRoundTripper(ctx context.Context, URL string) (http.RoundTri
}
// We need to set the ServerName here because the AWS endpoint service prefix is not known in advance,
// and the TLS config we got does not set it.
host, err := getURLHostname(URL)
if err != nil {
return nil, trace.Wrap(err)
}
tlsConfig.ServerName = host
tlsConfig.ServerName = u.Hostname()

out, err := defaults.Transport()
if err != nil {
return nil, trace.Wrap(err)
}
out.TLSClientConfig = tlsConfig
e.RoundTrippers[URL] = out
e.RoundTrippers[u.String()] = out
return out, nil
}

// resolveEndpoint returns a resolved endpoint for either the configured URI or the AWS target service and region.
func (e *Engine) resolveEndpoint(req *http.Request) (*endpoints.ResolvedEndpoint, error) {
endpointID, err := extractEndpointID(req)
type endpoint struct {
URL *url.URL
SigningName string
SigningRegion string
}

// resolveEndpoint returns a resolved endpoint for either the configured URI or
// the AWS target service and region.
// For a target operation, the appropriate AWS service resolver is used.
// Targets look like one of DynamoDB_$version.$operation,
// DynamoDBStreams_$version.$operation, or AmazonDAX$version.$operation.
// For example: DynamoDBStreams_20120810.ListStreams
func (e *Engine) resolveEndpoint(req *http.Request) (*endpoint, error) {
target, err := getTargetHeader(req)
if err != nil {
return nil, trace.Wrap(err)
}
opts := func(opts *endpoints.Options) {
opts.ResolveUnknownService = true

awsMeta := e.sessionCtx.Database.GetAWS()

var re *endpoint
switch target := strings.ToLower(target); {
case strings.HasPrefix(target, "dynamodbstreams"):
re, err = resolveDynamoDBStreamsEndpoint(req.Context(), awsMeta.Region, e.UseFIPS)
case strings.HasPrefix(target, "dynamodb"):
re, err = resolveDynamoDBEndpoint(req.Context(), awsMeta.Region, awsMeta.AccountID, e.UseFIPS)
case strings.HasPrefix(target, "amazondax"):
re, err = resolveDaxEndpoint(req.Context(), awsMeta.Region, e.UseFIPS)
default:
return nil, trace.BadParameter("DynamoDB API target %q is not recognized", target)
}
re, err := endpoints.DefaultResolver().EndpointFor(endpointID, e.sessionCtx.Database.GetAWS().Region, opts)
if err != nil {
return nil, trace.Wrap(err)
}
uri := e.sessionCtx.Database.GetURI()
if uri != "" && uri != apiaws.DynamoDBURIForRegion(e.sessionCtx.Database.GetAWS().Region) {
if uri != "" && uri != apiaws.DynamoDBURIForRegion(awsMeta.Region) {
// Add a temporary schema to make a valid URL for url.Parse.
if !strings.Contains(uri, "://") {
uri = "schema://" + uri
}
u, err := url.Parse(uri)
if err != nil {
return nil, trace.Wrap(err)
}
// override the resolved endpoint URL with the user-configured URI.
re.URL = uri
re.URL = u
}
if !strings.Contains(re.URL, "://") {
re.URL = "https://" + re.URL
// Force HTTPS
re.URL.Scheme = "https"
return re, nil
}

func resolveDynamoDBStreamsEndpoint(ctx context.Context, region string, useFIPS bool) (*endpoint, error) {
params := dynamodbstreams.EndpointParameters{
Region: aws.String(region),
UseFIPS: aws.Bool(useFIPS),
}
ep, err := dynamodbstreams.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
if err != nil {
return nil, trace.Wrap(err)
}
return &re, nil
return &endpoint{
URL: &ep.URI,
SigningRegion: region,
// DynamoDB Streams uses the same signing name as DynamoDB.
SigningName: "dynamodb",
}, nil
}

// rewriteRequest clones a request, modifies the clone to rewrite its URL, and returns the modified request clone.
func rewriteRequest(ctx context.Context, r *http.Request, re *endpoints.ResolvedEndpoint, body []byte) (*http.Request, error) {
resolvedURL, err := url.Parse(re.URL)
func resolveDynamoDBEndpoint(ctx context.Context, region, accountID string, useFIPS bool) (*endpoint, error) {
params := dynamodb.EndpointParameters{
Region: aws.String(region),
AccountIdEndpointMode: aws.String(aws.AccountIDEndpointModePreferred),
UseFIPS: aws.Bool(useFIPS),
}
if accountID != "" {
params.AccountId = aws.String(accountID)
}
ep, err := dynamodb.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
if err != nil {
return nil, trace.Wrap(err)
}
return &endpoint{
URL: &ep.URI,
SigningRegion: region,
SigningName: "dynamodb",
}, nil
}

func resolveDaxEndpoint(ctx context.Context, region string, useFIPS bool) (*endpoint, error) {
params := dax.EndpointParameters{
Region: aws.String(region),
UseFIPS: aws.Bool(useFIPS),
}
ep, err := dax.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
if err != nil {
return nil, trace.Wrap(err)
}
return &endpoint{
URL: &ep.URI,
SigningRegion: region,
SigningName: "dax",
}, nil
}

// rewriteRequest clones a request, modifies the clone to rewrite its URL, and returns the modified request clone.
func rewriteRequest(ctx context.Context, r *http.Request, re *endpoint, body []byte) (*http.Request, error) {
reqCopy := r.Clone(ctx)
// set url and host header to match the resolved endpoint.
reqCopy.URL = resolvedURL
reqCopy.Host = resolvedURL.Host
reqCopy.URL = re.URL
reqCopy.Host = re.URL.Host
if body == nil {
// no body is fine, skip copying it.
return reqCopy, nil
Expand All @@ -388,42 +465,13 @@ func rewriteRequest(ctx context.Context, r *http.Request, re *endpoints.Resolved
return reqCopy, nil
}

// extractEndpointID extracts the AWS endpoint ID from the request header X-Amz-Target.
func extractEndpointID(req *http.Request) (string, error) {
// getTargetHeader gets the X-Amz-Target header or returns an error if it is not
// present, as we rely on this header for endpoint resolution.
// See X-Amz-Target: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.LowLevelAPI.html
func getTargetHeader(req *http.Request) (string, error) {
target := req.Header.Get(libaws.AmzTargetHeader)
if target == "" {
return "", trace.BadParameter("missing %q header in http request", libaws.AmzTargetHeader)
}
endpointID, err := endpointIDForTarget(target)
return endpointID, trace.Wrap(err)
}

// endpointIDForTarget converts a target operation into the appropriate the AWS endpoint ID.
// Target looks like one of DynamoDB_$version.$operation, DynamoDBStreams_$version.$operation, AmazonDAX$version.$operation,
// for example: DynamoDBStreams_20120810.ListStreams
// See X-Amz-Target: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.LowLevelAPI.html
func endpointIDForTarget(target string) (string, error) {
t := strings.ToLower(target)
switch {
case strings.HasPrefix(t, "dynamodbstreams"):
return dynamodbstreams.EndpointsID, nil
case strings.HasPrefix(t, "dynamodb"):
return dynamodb.EndpointsID, nil
case strings.HasPrefix(t, "amazondax"):
return dax.EndpointsID, nil
default:
return "", trace.BadParameter("DynamoDB API target %q is not recognized", target)
}
}

// getURLHostname parses a URL to extract its hostname.
func getURLHostname(uri string) (string, error) {
if !strings.Contains(uri, "://") {
uri = "schema://" + uri
}
parsed, err := url.Parse(uri)
if err != nil {
return "", trace.Wrap(err)
}
return parsed.Hostname(), nil
return target, nil
}
Loading

0 comments on commit b113763

Please sign in to comment.