From 004a5444741d3e7d6fa1020449ce3ab6be6e5213 Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 22 Jul 2022 09:30:39 -0500 Subject: [PATCH] feat: Remove non standard List/Detail implementations (#1237) --- .pre-commit-config.yaml | 1 - resources/services/athena/data_catalogs.go | 47 ++++++----------- resources/services/ecs/task_definitions.go | 52 ++++++------------- resources/services/sagemaker/training_jobs.go | 44 ++++++---------- 4 files changed, 48 insertions(+), 96 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a769f3b85..48fc5a1f2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,5 +2,4 @@ repos: - repo: https://github.com/dnephin/pre-commit-golang rev: v0.5.0 hooks: - - id: go-mod-tidy - id: golangci-lint \ No newline at end of file diff --git a/resources/services/athena/data_catalogs.go b/resources/services/athena/data_catalogs.go index 901db16ce..2bc66bb54 100644 --- a/resources/services/athena/data_catalogs.go +++ b/resources/services/athena/data_catalogs.go @@ -9,18 +9,16 @@ import ( "github.com/cloudquery/cq-provider-aws/client" "github.com/cloudquery/cq-provider-sdk/provider/diag" "github.com/cloudquery/cq-provider-sdk/provider/schema" - "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" ) -const MAX_GOROUTINES = 10 - //go:generate cq-gen --resource data_catalogs --config gen.hcl --output . func DataCatalogs() *schema.Table { return &schema.Table{ - Name: "aws_athena_data_catalogs", - Description: "Contains information about a data catalog in an Amazon Web Services account", - Resolver: fetchAthenaDataCatalogs, + Name: "aws_athena_data_catalogs", + Description: "Contains information about a data catalog in an Amazon Web Services account", + Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error { + return diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listDataCatalogs, dataCatalogDetail)) + }, Multiplex: client.ServiceAccountRegionMultiplexer("athena"), IgnoreError: client.IgnoreCommonErrors, DeleteFilter: client.DeleteAccountRegionFilter, @@ -214,11 +212,10 @@ func DataCatalogs() *schema.Table { // Table Resolver Functions // ==================================================================================================================== -func fetchAthenaDataCatalogs(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error { +func listDataCatalogs(ctx context.Context, meta schema.ClientMeta, detailChan chan<- interface{}) error { c := meta.(*client.Client) svc := c.Services().Athena input := athena.ListDataCatalogsInput{} - var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES)) for { response, err := svc.ListDataCatalogs(ctx, &input, func(options *athena.Options) { options.Region = c.Region @@ -226,21 +223,8 @@ func fetchAthenaDataCatalogs(ctx context.Context, meta schema.ClientMeta, parent if err != nil { return diag.WrapError(err) } - errs, ctx := errgroup.WithContext(ctx) - for _, d := range response.DataCatalogsSummary { - if err := sem.Acquire(ctx, 1); err != nil { - return diag.WrapError(err) - } - func(summary types.DataCatalogSummary) { - errs.Go(func() error { - defer sem.Release(1) - return fetchDataCatalog(ctx, res, c, summary) - }) - }(d) - } - err = errs.Wait() - if err != nil { - return diag.WrapError(err) + for _, item := range response.DataCatalogsSummary { + detailChan <- item } if aws.ToString(response.NextToken) == "" { break @@ -335,7 +319,9 @@ func fetchAthenaDataCatalogDatabaseTablePartitionKeys(ctx context.Context, meta // User Defined Helpers // ==================================================================================================================== -func fetchDataCatalog(ctx context.Context, res chan<- interface{}, c *client.Client, catalogSummary types.DataCatalogSummary) error { +func dataCatalogDetail(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, listInfo interface{}) { + c := meta.(*client.Client) + catalogSummary := listInfo.(types.DataCatalogSummary) svc := c.Services().Athena dc, err := svc.GetDataCatalog(ctx, &athena.GetDataCatalogInput{ Name: catalogSummary.CatalogName, @@ -346,16 +332,15 @@ func fetchDataCatalog(ctx context.Context, res chan<- interface{}, c *client.Cli // retrieving of default data catalog (AwsDataCatalog) returns "not found error" but it exists and its // relations can be fetched by its name if *catalogSummary.CatalogName == "AwsDataCatalog" { - res <- types.DataCatalog{Name: catalogSummary.CatalogName, Type: catalogSummary.Type} - return nil + resultsChan <- types.DataCatalog{Name: catalogSummary.CatalogName, Type: catalogSummary.Type} + return } if c.IsNotFoundError(err) { - return nil + return } - return diag.WrapError(err) + errorChan <- diag.WrapError(err) } - res <- *dc.DataCatalog - return nil + resultsChan <- *dc.DataCatalog } func createDataCatalogArn(cl *client.Client, catalogName string) string { diff --git a/resources/services/ecs/task_definitions.go b/resources/services/ecs/task_definitions.go index ecec620de..45d2723ec 100644 --- a/resources/services/ecs/task_definitions.go +++ b/resources/services/ecs/task_definitions.go @@ -10,8 +10,6 @@ import ( "github.com/cloudquery/cq-provider-aws/client" "github.com/cloudquery/cq-provider-sdk/provider/diag" "github.com/cloudquery/cq-provider-sdk/provider/schema" - "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" ) type TaskDefinitionWrapper struct { @@ -19,13 +17,13 @@ type TaskDefinitionWrapper struct { Tags []types.Tag } -const MAX_GOROUTINES = 10 - func EcsTaskDefinitions() *schema.Table { return &schema.Table{ - Name: "aws_ecs_task_definitions", - Description: "The details of a task definition which describes the container and volume definitions of an Amazon Elastic Container Service task", - Resolver: listEcsTaskDefinitions, + Name: "aws_ecs_task_definitions", + Description: "The details of a task definition which describes the container and volume definitions of an Amazon Elastic Container Service task", + Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error { + return diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listEcsTaskDefinitions, ecsTaskDefinitionDetail)) + }, Multiplex: client.ServiceAccountRegionMultiplexer("ecs"), IgnoreError: client.IgnoreCommonErrors, DeleteFilter: client.DeleteAccountRegionFilter, @@ -614,34 +612,35 @@ func EcsTaskDefinitions() *schema.Table { }, } } - -func fetchEcsTaskDefinition(ctx context.Context, res chan<- interface{}, svc client.EcsClient, region, taskArn string) error { +func ecsTaskDefinitionDetail(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, detail interface{}) { + c := meta.(*client.Client) + svc := c.Services().ECS + taskArn := detail.(string) describeTaskDefinitionOutput, err := svc.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{ TaskDefinition: aws.String(taskArn), Include: []types.TaskDefinitionField{types.TaskDefinitionFieldTags}, }, func(o *ecs.Options) { - o.Region = region + o.Region = c.Region }) if err != nil { - return diag.WrapError(err) + errorChan <- diag.WrapError(err) + return } if describeTaskDefinitionOutput.TaskDefinition == nil { - return nil + return } - res <- TaskDefinitionWrapper{ + resultsChan <- TaskDefinitionWrapper{ TaskDefinition: describeTaskDefinitionOutput.TaskDefinition, Tags: describeTaskDefinitionOutput.Tags, } - return nil } // ==================================================================================================================== // Table Resolver Functions // ==================================================================================================================== -func listEcsTaskDefinitions(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error { +func listEcsTaskDefinitions(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}) error { var config ecs.ListTaskDefinitionsInput - var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES)) region := meta.(*client.Client).Region svc := meta.(*client.Client).Services().ECS for { @@ -651,26 +650,9 @@ func listEcsTaskDefinitions(ctx context.Context, meta schema.ClientMeta, parent if err != nil { return diag.WrapError(err) } - if len(listClustersOutput.TaskDefinitionArns) == 0 { - return nil + for _, taskDefinitionArn := range listClustersOutput.TaskDefinitionArns { + res <- taskDefinitionArn } - errs, ctx := errgroup.WithContext(ctx) - for _, t := range listClustersOutput.TaskDefinitionArns { - if err := sem.Acquire(ctx, 1); err != nil { - return diag.WrapError(err) - } - func(arn string) { - errs.Go(func() error { - defer sem.Release(1) - return fetchEcsTaskDefinition(ctx, res, svc, region, arn) - }) - }(t) - } - err = errs.Wait() - if err != nil { - return diag.WrapError(err) - } - if listClustersOutput.NextToken == nil { break } diff --git a/resources/services/sagemaker/training_jobs.go b/resources/services/sagemaker/training_jobs.go index b71752e2b..e75972ea2 100644 --- a/resources/services/sagemaker/training_jobs.go +++ b/resources/services/sagemaker/training_jobs.go @@ -10,17 +10,15 @@ import ( "github.com/cloudquery/cq-provider-aws/client" "github.com/cloudquery/cq-provider-sdk/provider/diag" "github.com/cloudquery/cq-provider-sdk/provider/schema" - "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" ) -const MAX_GOROUTINES = 10 - func SagemakerTrainingJobs() *schema.Table { return &schema.Table{ - Name: "aws_sagemaker_training_jobs", - Description: "Provides summary information about a training job.", - Resolver: fetchSagemakerTrainingJobs, + Name: "aws_sagemaker_training_jobs", + Description: "Provides summary information about a training job.", + Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error { + return diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listSagemakerTrainingJobs, sagemakerTrainingJobsDetail)) + }, Multiplex: client.ServiceAccountRegionMultiplexer("api.sagemaker"), IgnoreError: client.IgnoreAccessDeniedServiceDisabled, DeleteFilter: client.DeleteAccountRegionFilter, @@ -566,26 +564,27 @@ func SagemakerTrainingJobs() *schema.Table { // Table Resolver Functions // ==================================================================================================================== -func fetchTrainingJobDefinition(ctx context.Context, res chan<- interface{}, svc client.SageMakerClient, region string, n types.TrainingJobSummary) error { +func sagemakerTrainingJobsDetail(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, detail interface{}) { + c := meta.(*client.Client) + svc := c.Services().SageMaker + n := detail.(types.TrainingJobSummary) config := sagemaker.DescribeTrainingJobInput{ TrainingJobName: n.TrainingJobName, } response, err := svc.DescribeTrainingJob(ctx, &config, func(options *sagemaker.Options) { - options.Region = region + options.Region = c.Region }) if err != nil { - return diag.WrapError(err) + errorChan <- diag.WrapError(err) + return } - - res <- response - return nil + resultsChan <- response } -func fetchSagemakerTrainingJobs(ctx context.Context, meta schema.ClientMeta, _ *schema.Resource, res chan<- interface{}) error { +func listSagemakerTrainingJobs(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}) error { c := meta.(*client.Client) svc := c.Services().SageMaker config := sagemaker.ListTrainingJobsInput{} - var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES)) for { response, err := svc.ListTrainingJobs(ctx, &config, func(options *sagemaker.Options) { @@ -594,21 +593,8 @@ func fetchSagemakerTrainingJobs(ctx context.Context, meta schema.ClientMeta, _ * if err != nil { return diag.WrapError(err) } - errs, ctx := errgroup.WithContext(ctx) for _, d := range response.TrainingJobSummaries { - if err := sem.Acquire(ctx, 1); err != nil { - return diag.WrapError(err) - } - func(summary types.TrainingJobSummary) { - errs.Go(func() error { - defer sem.Release(1) - return fetchTrainingJobDefinition(ctx, res, svc, c.Region, summary) - }) - }(d) - } - err = errs.Wait() - if err != nil { - return diag.WrapError(err) + res <- d } if aws.ToString(response.NextToken) == "" { break