diff --git a/aws/service.go b/aws/service.go index ea08815c2..452e36421 100644 --- a/aws/service.go +++ b/aws/service.go @@ -6,12 +6,15 @@ import ( "log" "math" "math/rand" + "net" + "net/http" "os" "strconv" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/retry" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/accessanalyzer" @@ -126,6 +129,9 @@ import ( "github.com/aws/aws-sdk-go-v2/service/wafv2" "github.com/aws/aws-sdk-go-v2/service/wellarchitected" "github.com/aws/aws-sdk-go-v2/service/workspaces" + "github.com/rs/dnscache" + "golang.org/x/sync/semaphore" + "github.com/turbot/go-kit/helpers" "github.com/turbot/steampipe-plugin-sdk/v5/memoize" "github.com/turbot/steampipe-plugin-sdk/v5/plugin" @@ -1734,6 +1740,140 @@ func getBaseClientForAccount(ctx context.Context, d *plugin.QueryData) (*aws.Con return tmp.(*aws.Config), nil } +// Initialize a single HTTP client that is optimized for Steampipe and shared +// across all AWS SDK clients. We have hundreds of AWS SDK clients (one per +// account region) that are all sharing this same HTTP client - creating shared +// caching and controls over parallelism. +// +// The AWS SDK defaults are good, but not great for our highly parallel use in +// Steampipe. Specific problems this client aims to solve: +// 1. DNS floods - performing thousands of simultaneous API calls creates a DNS +// lookup for each one (even if the same domain). This can overwhelm the DNS +// server and cause "no such host" errors. +// 2. HTTP connection floods - the AWS SDK defaults to no limit on the number of +// HTTP connections per host. Thousands of connections created simultaneously to +// the same host is hard on both the client and the target server. +// 3. DNS caching - Golang does not cache DNS lookups by default. We end up +// looking up the same host thousands of times both within a query and across +// queries. +func initializeHTTPClient() *http.Client { + + // DNS lookup floods are a real problem with highly parallel AWS SDK calls. Every + // API request leads to a DNS lookup by default (since Go doesn't cache them). We + // employ a DNS lookup cache, but we also need to limit the number of parallel DNS + // requests to avoid overwhelming the underlying DNS server. For example, listing + // S3 buckets will create 2 DNS lookup requests per bucket which is a lot of + // pressure on the DNS layer of your network. + // This setting will limit the number of parallel DNS lookups. An appropriate setting + // depends on the capabilities of your DNS server. The default is 25, which is low + // enough for a Macbook M1 to work without "no such host" errors when using the cgo + // network stack. It's high enough to work great in most cases, except maybe massive + // S3 bucket listing (which is rare). Notably on the same Macbook M1, when the plugin + // is compiled using netgo (our default on Mac) DNS lookups will succeed with virtually + // no upper limit on this setting. So, bottom line, 25 is a guess to try and ensure + // it works reliably and optimally enough. + dnsLookupMaxParallel := readEnvVarToInt("STEAMPIPE_AWS_DNS_LOOKUP_MAX_PARALLEL", 25) + + // The DNS cache will be refreshed at this interval. A refresh means that + // any unused entries are removed and any entries that were used since the + // last refresh will be re-looked up to ensure they are current. + // This setting should be large enough to get the benefit of caching and short + // enough to prevent stale entries from being used for too long. + // Set to 0 to disable the refresh completely (not a good idea). + // Set to -1 to disable the DNS cache completely (the AWS default). + dnsCacheRefreshIntervalSecs := readEnvVarToInt("STEAMPIPE_AWS_DNS_CACHE_REFRESH_INTERVAL_SECS", 300) + + // This is the maximum number of HTTPS API connections used for each host + // (e.g. iam.amazonaws.com). We want a number that is high enough to do a + // lot of parallel work, but not so high that we have an excess number of + // sockets open. + // There is a trade off here. Tables like S3 have a lot of hosts - i.e. two + // per bucket (one for the central region to get the creation time and one + // for the actual bucket region), while services like IAM use a single host + // for all queries. + // Set to 0 to remove the limit (which is the AWS SDK default). + httpTransportMaxConnsPerHost := readEnvVarToInt("STEAMPIPE_AWS_HTTP_TRANSPORT_MAX_CONNS_PER_HOST", 5000) + + // Our DNS resolver should automatically refresh itself on this schedule. + var resolver = &dnscache.Resolver{} + if dnsCacheRefreshIntervalSecs > 0 { + go func() { + t := time.NewTicker(time.Duration(dnsCacheRefreshIntervalSecs) * time.Second) + defer t.Stop() + for range t.C { + resolver.Refresh(true) + } + }() + } + + // Use the AWS defaults as much as possible for both the HTTP transport and + // dialer layers. They have carefully crafted default settings for timeouts + // etc that we don't want to change. Our goal here is to just change behavior + // of parallelism for DNS lookups and HTTP requests. + defaultAwsClient := awshttp.NewBuildableClient() + transport := defaultAwsClient.GetTransport() + dialer := defaultAwsClient.GetDialer() + + // Limit the max connections per host, but only if set. The AWS SDK default + // is no limit. + if httpTransportMaxConnsPerHost > 0 { + transport.MaxConnsPerHost = httpTransportMaxConnsPerHost + } + + // Use a DNS cache if it's set, otherwise we just avoid changing the dialer behavior + // of the AWS HTTP client. + if dnsCacheRefreshIntervalSecs >= 0 { + + // A semaphore is used to control the number of parallel DNS lookups. + var sem = semaphore.NewWeighted(int64(dnsLookupMaxParallel)) + + transport.DialContext = func(ctx context.Context, network string, addr string) (conn net.Conn, err error) { + + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + + // Acquire a semaphore slot, blocking until one is available. + if err := sem.Acquire(ctx, 1); err != nil { + return nil, err + } + + // Actually resolve the host, using a cached result if possible. + // Returns an array of IPs for the host. + ips, err := resolver.LookupHost(ctx, host) + + // Release the semaphore, even if there was an error. + sem.Release(1) + + // If there was an error during lookup, we give up immediately. + if err != nil { + return nil, err + } + + // Now, look through the IP addresses until we manage to create a good connection. + // This is less optimal than the parallelized native golang approach, but good + // enough and much simpler. Comparison - https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/net/dial.go;l=454-507 + for _, ip := range ips { + conn, err = dialer.DialContext(ctx, network, net.JoinHostPort(ip, port)) + if err == nil { + break + } + } + + return + } + } + + client := &http.Client{ + Transport: transport, + } + + return client +} + +var sharedHTTPClient = initializeHTTPClient() + // Cached form of the base client. // This cache HAS A 30 DAY EXPIRATION! This is because the AWS SDK will // automatically refresh credentials as needed from this cached object. @@ -1745,7 +1885,7 @@ var getBaseClientForAccountCached = plugin.HydrateFunc(getBaseClientForAccountUn // Do the actual work of creating an AWS config object for reuse across many // regions. This client has the minimal reusable configuration on it, so it // can be modified in the higher level client functions. -func getBaseClientForAccountUncached(ctx context.Context, d *plugin.QueryData, _ *plugin.HydrateData) (interface{}, error) { +func getBaseClientForAccountUncached(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) { plugin.Logger(ctx).Info("getBaseClientForAccountUncached", "connection_name", d.Connection.Name, "status", "starting") @@ -1811,6 +1951,8 @@ func getBaseClientForAccountUncached(ctx context.Context, d *plugin.QueryData, _ // opts.Client = imds.New(imds.Options{Retryer: retryer, ClientLogMode: aws.LogRetries | aws.LogRequest}, withDebugHTTPClient()) // })) + configOptions = append(configOptions, config.WithHTTPClient(sharedHTTPClient)) + cfg, err := config.LoadDefaultConfig(ctx, configOptions...) if err != nil { plugin.Logger(ctx).Error("getBaseClientForAccountUncached", "connection_name", d.Connection.Name, "load_default_config_error", err) diff --git a/aws/utils.go b/aws/utils.go index 32c03a425..c4163036e 100644 --- a/aws/utils.go +++ b/aws/utils.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "net/url" + "os" "regexp" "strconv" "strings" @@ -231,3 +232,16 @@ func isSuppportedRDSEngine(engine string) bool { return helpers.StringSliceContains(supportedEngines, engine) } + +// Helper function for integer based environment variables. +func readEnvVarToInt(name string, defaultVal int) int { + val := defaultVal + envValue := os.Getenv(name) + if envValue != "" { + i, err := strconv.Atoi(envValue) + if err == nil { + val = i + } + } + return val +} diff --git a/go.mod b/go.mod index d1e6f8a07..4319c4257 100644 --- a/go.mod +++ b/go.mod @@ -203,6 +203,7 @@ require ( github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect + github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 // indirect github.com/sethvargo/go-retry v0.2.4 // indirect github.com/stevenle/topsort v0.2.0 // indirect github.com/tkrajina/go-reflector v0.5.6 // indirect diff --git a/go.sum b/go.sum index d1091840d..be71b6578 100644 --- a/go.sum +++ b/go.sum @@ -808,6 +808,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8= +github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=