Skip to content

Commit

Permalink
Add rate limiter to tables (#1905)
Browse files Browse the repository at this point in the history
Co-authored-by: John Smyth <[email protected]>
Co-authored-by: kai <[email protected]>
Co-authored-by: Graza <[email protected]>
Co-authored-by: sourav chakraborty <[email protected]>
Co-authored-by: Priyanka-Chatterjee-2000 <[email protected]>
Co-authored-by: rajmohanty17 <[email protected]>
Co-authored-by: misraved <[email protected]>
Co-authored-by: madhushreeray@30 <[email protected]>
Co-authored-by: Karan Popat <[email protected]>
  • Loading branch information
10 people authored Oct 2, 2023
1 parent 6ac435b commit df8322c
Show file tree
Hide file tree
Showing 453 changed files with 3,985 additions and 151 deletions.
30 changes: 28 additions & 2 deletions aws/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package aws

import (
"context"
"strings"

"github.com/turbot/steampipe-plugin-sdk/v5/plugin"
"github.com/turbot/steampipe-plugin-sdk/v5/plugin/transform"
Expand All @@ -17,8 +18,9 @@ const pluginName = "steampipe-plugin-aws"
// Plugin creates this (aws) plugin
func Plugin(ctx context.Context) *plugin.Plugin {
p := &plugin.Plugin{
Name: pluginName,
DefaultTransform: transform.FromCamel(),
Name: pluginName,
DefaultTransform: transform.FromCamel(),
DefaultRetryConfig: pluginRetryConfig(),
DefaultGetConfig: &plugin.GetConfig{
IgnoreConfig: &plugin.IgnoreConfig{
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{
Expand Down Expand Up @@ -500,3 +502,27 @@ func Plugin(ctx context.Context) *plugin.Plugin {

return p
}

func pluginRetryConfig() *plugin.RetryConfig {
return &plugin.RetryConfig{
MaxAttempts: 20,
BackoffAlgorithm: "Exponential",
RetryInterval: 1000,
CappedDuration: 240000,
ShouldRetryErrorFunc: pluginRetryError,
}
}

func pluginRetryError(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData, err error) bool {
if strings.Contains(err.Error(), "StatusCode: 408") {
plugin.Logger(ctx).Debug("pluginRetryError", "retrying 408", err.Error())
return true
}

if strings.Contains(err.Error(), "no such host") {
plugin.Logger(ctx).Debug("pluginRetryError", "no such host", err.Error())
return true
}

return false
}
14 changes: 14 additions & 0 deletions aws/table_aws_accessanalyzer_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,24 @@ func tableAwsAccessAnalyzer(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"ResourceNotFoundException", "ValidationException", "InvalidParameter"}),
},
Hydrate: getAccessAnalyzer,
Tags: map[string]string{"service": "access-analyzer", "action": "GetAnalyzer"},
},
List: &plugin.ListConfig{
Hydrate: listAccessAnalyzers,
Tags: map[string]string{"service": "access-analyzer", "action": "ListAnalyzers"},
KeyColumns: []*plugin.KeyColumn{
{
Name: "type",
Require: plugin.Optional,
},
},
},
HydrateConfig: []plugin.HydrateConfig{
{
Func: listAccessAnalyzerFindings,
Tags: map[string]string{"service": "access-analyzer", "action": "ListFindings"},
},
},
GetMatrixItemFunc: SupportedRegionMatrix(accessanalyzerv1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
{
Expand Down Expand Up @@ -150,6 +158,9 @@ func listAccessAnalyzers(ctx context.Context, d *plugin.QueryData, _ *plugin.Hyd
})

for paginator.HasMorePages() {
// apply rate limiting
d.WaitForListRateLimit(ctx)

output, err := paginator.NextPage(ctx)
if err != nil {
plugin.Logger(ctx).Error("aws_accessanalyzer_analyzer.listAccessAnalyzers", "api_error", err)
Expand Down Expand Up @@ -218,6 +229,9 @@ func listAccessAnalyzerFindings(ctx context.Context, d *plugin.QueryData, h *plu
})

for paginator.HasMorePages() {
// apply rate limiting
d.WaitForListRateLimit(ctx)

output, err := paginator.NextPage(ctx)
if err != nil {
plugin.Logger(ctx).Error("aws_accessanalyzer_analyzer.listAccessAnalyzerFindings", "api_error", err)
Expand Down
7 changes: 7 additions & 0 deletions aws/table_aws_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ func tableAwsAccount(ctx context.Context) *plugin.Table {
Description: "AWS Account",
List: &plugin.ListConfig{
Hydrate: listAccountAlias,
Tags: map[string]string{"service": "iam", "action": "ListAccountAliases"},
},
HydrateConfig: []plugin.HydrateConfig{
{
Func: getOrganizationDetails,
Tags: map[string]string{"service": "organizations", "action": "DescribeOrganization"},
},
},
Columns: awsGlobalRegionColumns([]*plugin.Column{
{
Expand Down
1 change: 1 addition & 0 deletions aws/table_aws_account_alternate_contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func tableAwsAccountAlternateContact(_ context.Context) *plugin.Table {
Description: "AWS Account Alternate Contact",
List: &plugin.ListConfig{
Hydrate: listAwsAccountAlternateContacts,
Tags: map[string]string{"service": "account", "action": "GetAlternateContact"},
IgnoreConfig: &plugin.IgnoreConfig{
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"ResourceNotFoundException"}),
},
Expand Down
1 change: 1 addition & 0 deletions aws/table_aws_account_contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func tableAwsAccountContact(_ context.Context) *plugin.Table {
Description: "AWS Account Contact",
List: &plugin.ListConfig{
Hydrate: listAwsAccountContacts,
Tags: map[string]string{"service": "account", "action": "GetContactInformation"},
KeyColumns: []*plugin.KeyColumn{
{
Name: "linked_account_id",
Expand Down
42 changes: 39 additions & 3 deletions aws/table_aws_acm_certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package aws

import (
"context"
"errors"
"fmt"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/acm"
"github.com/aws/aws-sdk-go-v2/service/acm/types"
"github.com/aws/smithy-go"

acmv1 "github.com/aws/aws-sdk-go/service/acm"

Expand All @@ -25,9 +27,14 @@ func tableAwsAcmCertificate(_ context.Context) *plugin.Table {
Get: &plugin.GetConfig{
KeyColumns: plugin.SingleColumn("certificate_arn"),
Hydrate: getAwsAcmCertificateAttributes,
IgnoreConfig: &plugin.IgnoreConfig{
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"ResourceNotFoundException"}),
},
Tags: map[string]string{"service": "acm", "action": "DescribeCertificate"},
},
List: &plugin.ListConfig{
Hydrate: listAwsAcmCertificates,
Tags: map[string]string{"service": "acm", "action": "ListCertificates"},
KeyColumns: []*plugin.KeyColumn{
{
Name: "status",
Expand All @@ -39,6 +46,20 @@ func tableAwsAcmCertificate(_ context.Context) *plugin.Table {
},
},
},
HydrateConfig: []plugin.HydrateConfig{
{
Func: getAwsAcmCertificateAttributes,
Tags: map[string]string{"service": "acm", "action": "DescribeCertificate"},
},
{
Func: getAwsAcmCertificateProperties,
Tags: map[string]string{"service": "acm", "action": "GetCertificate"},
},
{
Func: listTagsForAcmCertificate,
Tags: map[string]string{"service": "acm", "action": "ListTagsForCertificate"},
},
},
GetMatrixItemFunc: SupportedRegionMatrix(acmv1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
{
Expand Down Expand Up @@ -280,6 +301,9 @@ func listAwsAcmCertificates(ctx context.Context, d *plugin.QueryData, _ *plugin.

// List call
for paginator.HasMorePages() {
// apply rate limiting
d.WaitForListRateLimit(ctx)

output, err := paginator.NextPage(ctx)
if err != nil {
plugin.Logger(ctx).Error("aws_acm_certificate.listAwsAcmCertificates", "api_error", err)
Expand Down Expand Up @@ -322,6 +346,12 @@ func getAwsAcmCertificateAttributes(ctx context.Context, d *plugin.QueryData, h

detail, err := svc.DescribeCertificate(ctx, params)
if err != nil {
var ae smithy.APIError
if errors.As(err, &ae) {
if ae.ErrorCode() == "ResourceNotFoundException" {
return nil, nil
}
}
plugin.Logger(ctx).Error("aws_acm_certificate.getAwsAcmCertificateAttributes", "api_error", err)
return nil, err
}
Expand Down Expand Up @@ -368,6 +398,10 @@ func listTagsForAcmCertificate(ctx context.Context, d *plugin.QueryData, h *plug
return nil, err
}

if arn == nil {
return nil, nil
}

// Create session
svc, err := ACMClient(ctx, d)
if err != nil {
Expand All @@ -389,11 +423,13 @@ func listTagsForAcmCertificate(ctx context.Context, d *plugin.QueryData, h *plug
}

func getCertificateArn(_ context.Context, d *plugin.QueryData, h *plugin.HydrateData) (*string, error) {
switch item := h.Item.(type) {
case *types.CertificateDetail:
if h.Item != nil {
switch item := h.Item.(type) {
case *types.CertificateDetail:
return item.CertificateArn, nil
case types.CertificateSummary:
case types.CertificateSummary:
return item.CertificateArn, nil
}
}
return nil, nil
}
Expand Down
5 changes: 5 additions & 0 deletions aws/table_aws_amplify_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ func tableAwsAmplifyApp(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"ValidationException", "NotFoundException"}),
},
Hydrate: getAmplifyApp,
Tags: map[string]string{"service": "amplify", "action": "GetApp"},
},
List: &plugin.ListConfig{
Hydrate: listAmplifyApps,
Tags: map[string]string{"service": "amplify", "action": "ListApps"},
},
GetMatrixItemFunc: SupportedRegionMatrix(amplifyv1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down Expand Up @@ -205,6 +207,9 @@ func listAmplifyApps(ctx context.Context, d *plugin.QueryData, h *plugin.Hydrate
pagesLeft := true

for pagesLeft {
// apply rate limiting
d.WaitForListRateLimit(ctx)

result, err := svc.ListApps(ctx, input)
if err != nil {
plugin.Logger(ctx).Error("aws_amplify_app.listAmplifyApps", "api_error", err)
Expand Down
2 changes: 2 additions & 0 deletions aws/table_aws_api_gateway_api_authorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ func tableAwsAPIGatewayAuthorizer(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"NotFoundException"}),
},
Hydrate: getRestAPIAuthorizer,
Tags: map[string]string{"service": "apigateway", "action": "GetAuthorizer"},
},
List: &plugin.ListConfig{
ParentHydrate: listRestAPI,
Hydrate: listRestAPIAuthorizers,
Tags: map[string]string{"service": "apigateway", "action": "GetAuthorizers"},
},
GetMatrixItemFunc: SupportedRegionMatrix(apigatewayv1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down
5 changes: 5 additions & 0 deletions aws/table_aws_api_gateway_api_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ func tableAwsAPIGatewayAPIKey(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"NotFoundException"}),
},
Hydrate: getAPIKey,
Tags: map[string]string{"service": "apigateway", "action": "GetApiKey"},
},
List: &plugin.ListConfig{
Hydrate: listAPIKeys,
Tags: map[string]string{"service": "apigateway", "action": "GetApiKeys"},
KeyColumns: []*plugin.KeyColumn{
{
Name: "customer_id",
Expand Down Expand Up @@ -153,6 +155,9 @@ func listAPIKeys(ctx context.Context, d *plugin.QueryData, _ *plugin.HydrateData
})

for paginator.HasMorePages() {
// apply rate limiting
d.WaitForListRateLimit(ctx)

output, err := paginator.NextPage(ctx)
if err != nil {
plugin.Logger(ctx).Error("aws_api_gateway_rest_api.listAPIKeys", "api_error", err)
Expand Down
5 changes: 5 additions & 0 deletions aws/table_aws_api_gateway_domain_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ func tableAwsAPIGatewayDomainName(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"NotFoundException"}),
},
Hydrate: getApiGatewayDomainName,
Tags: map[string]string{"service": "apigateway", "action": "GetDomainName"},
},
List: &plugin.ListConfig{
Hydrate: listApiGatewayDomainNames,
Tags: map[string]string{"service": "apigateway", "action": "GetDomainNames"},
},
GetMatrixItemFunc: SupportedRegionMatrix(apigatewayv1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down Expand Up @@ -170,6 +172,9 @@ func listApiGatewayDomainNames(ctx context.Context, d *plugin.QueryData, _ *plug

// List call
for paginator.HasMorePages() {
// apply rate limiting
d.WaitForListRateLimit(ctx)

output, err := paginator.NextPage(ctx)
if err != nil {
plugin.Logger(ctx).Error("aws_api_gateway_domain_name.listApiGatewayDomainNames", "api_error", err)
Expand Down
5 changes: 5 additions & 0 deletions aws/table_aws_api_gateway_rest_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ func tableAwsAPIGatewayRestAPI(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"NotFoundException"}),
},
Hydrate: getRestAPI,
Tags: map[string]string{"service": "apigateway", "action": "GetRestApi"},
},
List: &plugin.ListConfig{
Hydrate: listRestAPI,
Tags: map[string]string{"service": "apigateway", "action": "GetRestApis"},
},
GetMatrixItemFunc: SupportedRegionMatrix(apigatewayv1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down Expand Up @@ -174,6 +176,9 @@ func listRestAPI(ctx context.Context, d *plugin.QueryData, _ *plugin.HydrateData

// List call
for paginator.HasMorePages() {
// apply rate limiting
d.WaitForListRateLimit(ctx)

output, err := paginator.NextPage(ctx)
if err != nil {
plugin.Logger(ctx).Error("aws_api_gateway_rest_api.listRestAPI", "api_error", err)
Expand Down
2 changes: 2 additions & 0 deletions aws/table_aws_api_gateway_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ func tableAwsAPIGatewayStage(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"NotFoundException"}),
},
Hydrate: getAPIGatewayStage,
Tags: map[string]string{"service": "apigateway", "action": "GetStage"},
},
List: &plugin.ListConfig{
ParentHydrate: listRestAPI,
Hydrate: listAPIGatewayStage,
Tags: map[string]string{"service": "apigateway", "action": "GetStages"},
},
GetMatrixItemFunc: SupportedRegionMatrix(apigatewayv1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down
5 changes: 5 additions & 0 deletions aws/table_aws_api_gateway_usage_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ func tableAwsAPIGatewayUsagePlan(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"NotFoundException"}),
},
Hydrate: getUsagePlan,
Tags: map[string]string{"service": "apigateway", "action": "GetUsagePlan"},
},
List: &plugin.ListConfig{
Hydrate: listUsagePlans,
Tags: map[string]string{"service": "apigateway", "action": "GetUsagePlans"},
},
GetMatrixItemFunc: SupportedRegionMatrix(apigatewayv1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down Expand Up @@ -125,6 +127,9 @@ func listUsagePlans(ctx context.Context, d *plugin.QueryData, _ *plugin.HydrateD

// List call
for paginator.HasMorePages() {
// apply rate limiting
d.WaitForListRateLimit(ctx)

output, err := paginator.NextPage(ctx)
if err != nil {
plugin.Logger(ctx).Error("aws_api_gateway_rest_api.listUsagePlans", "api_error", err)
Expand Down
5 changes: 5 additions & 0 deletions aws/table_aws_api_gatewayv2_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ func tableAwsAPIGatewayV2Api(_ context.Context) *plugin.Table {
ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"NotFoundException"}),
},
Hydrate: getAPIGatewayV2API,
Tags: map[string]string{"service": "apigateway", "action": "GetApi"},
},
List: &plugin.ListConfig{
Hydrate: listAPIGatewayV2API,
Tags: map[string]string{"service": "apigateway", "action": "GetApis"},
},
GetMatrixItemFunc: SupportedRegionMatrix(apigatewayv2v1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down Expand Up @@ -135,6 +137,9 @@ func listAPIGatewayV2API(ctx context.Context, d *plugin.QueryData, _ *plugin.Hyd
}

for pagesLeft {
// apply rate limiting
d.WaitForListRateLimit(ctx)

result, err := svc.GetApis(ctx, params)
if err != nil {
logger.Error("aws_api_gatewayv2_api.listAPIGatewayV2API", "api_error", err)
Expand Down
Loading

0 comments on commit df8322c

Please sign in to comment.