Skip to content
This repository has been archived by the owner on Aug 16, 2022. It is now read-only.

Commit

Permalink
feat: Remove non standard List/Detail implementations (#1237)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbernays authored Jul 22, 2022
1 parent e2e1397 commit 004a544
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 96 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ repos:
- repo: https://github.com/dnephin/pre-commit-golang
rev: v0.5.0
hooks:
- id: go-mod-tidy
- id: golangci-lint
47 changes: 16 additions & 31 deletions resources/services/athena/data_catalogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ import (
"github.com/cloudquery/cq-provider-aws/client"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

const MAX_GOROUTINES = 10

//go:generate cq-gen --resource data_catalogs --config gen.hcl --output .
func DataCatalogs() *schema.Table {
return &schema.Table{
Name: "aws_athena_data_catalogs",
Description: "Contains information about a data catalog in an Amazon Web Services account",
Resolver: fetchAthenaDataCatalogs,
Name: "aws_athena_data_catalogs",
Description: "Contains information about a data catalog in an Amazon Web Services account",
Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
return diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listDataCatalogs, dataCatalogDetail))
},
Multiplex: client.ServiceAccountRegionMultiplexer("athena"),
IgnoreError: client.IgnoreCommonErrors,
DeleteFilter: client.DeleteAccountRegionFilter,
Expand Down Expand Up @@ -214,33 +212,19 @@ func DataCatalogs() *schema.Table {
// Table Resolver Functions
// ====================================================================================================================

func fetchAthenaDataCatalogs(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
func listDataCatalogs(ctx context.Context, meta schema.ClientMeta, detailChan chan<- interface{}) error {
c := meta.(*client.Client)
svc := c.Services().Athena
input := athena.ListDataCatalogsInput{}
var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES))
for {
response, err := svc.ListDataCatalogs(ctx, &input, func(options *athena.Options) {
options.Region = c.Region
})
if err != nil {
return diag.WrapError(err)
}
errs, ctx := errgroup.WithContext(ctx)
for _, d := range response.DataCatalogsSummary {
if err := sem.Acquire(ctx, 1); err != nil {
return diag.WrapError(err)
}
func(summary types.DataCatalogSummary) {
errs.Go(func() error {
defer sem.Release(1)
return fetchDataCatalog(ctx, res, c, summary)
})
}(d)
}
err = errs.Wait()
if err != nil {
return diag.WrapError(err)
for _, item := range response.DataCatalogsSummary {
detailChan <- item
}
if aws.ToString(response.NextToken) == "" {
break
Expand Down Expand Up @@ -335,7 +319,9 @@ func fetchAthenaDataCatalogDatabaseTablePartitionKeys(ctx context.Context, meta
// User Defined Helpers
// ====================================================================================================================

func fetchDataCatalog(ctx context.Context, res chan<- interface{}, c *client.Client, catalogSummary types.DataCatalogSummary) error {
func dataCatalogDetail(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, listInfo interface{}) {
c := meta.(*client.Client)
catalogSummary := listInfo.(types.DataCatalogSummary)
svc := c.Services().Athena
dc, err := svc.GetDataCatalog(ctx, &athena.GetDataCatalogInput{
Name: catalogSummary.CatalogName,
Expand All @@ -346,16 +332,15 @@ func fetchDataCatalog(ctx context.Context, res chan<- interface{}, c *client.Cli
// retrieving of default data catalog (AwsDataCatalog) returns "not found error" but it exists and its
// relations can be fetched by its name
if *catalogSummary.CatalogName == "AwsDataCatalog" {
res <- types.DataCatalog{Name: catalogSummary.CatalogName, Type: catalogSummary.Type}
return nil
resultsChan <- types.DataCatalog{Name: catalogSummary.CatalogName, Type: catalogSummary.Type}
return
}
if c.IsNotFoundError(err) {
return nil
return
}
return diag.WrapError(err)
errorChan <- diag.WrapError(err)
}
res <- *dc.DataCatalog
return nil
resultsChan <- *dc.DataCatalog
}

func createDataCatalogArn(cl *client.Client, catalogName string) string {
Expand Down
52 changes: 17 additions & 35 deletions resources/services/ecs/task_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@ import (
"github.com/cloudquery/cq-provider-aws/client"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

type TaskDefinitionWrapper struct {
*types.TaskDefinition
Tags []types.Tag
}

const MAX_GOROUTINES = 10

func EcsTaskDefinitions() *schema.Table {
return &schema.Table{
Name: "aws_ecs_task_definitions",
Description: "The details of a task definition which describes the container and volume definitions of an Amazon Elastic Container Service task",
Resolver: listEcsTaskDefinitions,
Name: "aws_ecs_task_definitions",
Description: "The details of a task definition which describes the container and volume definitions of an Amazon Elastic Container Service task",
Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
return diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listEcsTaskDefinitions, ecsTaskDefinitionDetail))
},
Multiplex: client.ServiceAccountRegionMultiplexer("ecs"),
IgnoreError: client.IgnoreCommonErrors,
DeleteFilter: client.DeleteAccountRegionFilter,
Expand Down Expand Up @@ -614,34 +612,35 @@ func EcsTaskDefinitions() *schema.Table {
},
}
}

func fetchEcsTaskDefinition(ctx context.Context, res chan<- interface{}, svc client.EcsClient, region, taskArn string) error {
func ecsTaskDefinitionDetail(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, detail interface{}) {
c := meta.(*client.Client)
svc := c.Services().ECS
taskArn := detail.(string)
describeTaskDefinitionOutput, err := svc.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{
TaskDefinition: aws.String(taskArn),
Include: []types.TaskDefinitionField{types.TaskDefinitionFieldTags},
}, func(o *ecs.Options) {
o.Region = region
o.Region = c.Region
})
if err != nil {
return diag.WrapError(err)
errorChan <- diag.WrapError(err)
return
}
if describeTaskDefinitionOutput.TaskDefinition == nil {
return nil
return
}
res <- TaskDefinitionWrapper{
resultsChan <- TaskDefinitionWrapper{
TaskDefinition: describeTaskDefinitionOutput.TaskDefinition,
Tags: describeTaskDefinitionOutput.Tags,
}
return nil
}

// ====================================================================================================================
// Table Resolver Functions
// ====================================================================================================================

func listEcsTaskDefinitions(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
func listEcsTaskDefinitions(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}) error {
var config ecs.ListTaskDefinitionsInput
var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES))
region := meta.(*client.Client).Region
svc := meta.(*client.Client).Services().ECS
for {
Expand All @@ -651,26 +650,9 @@ func listEcsTaskDefinitions(ctx context.Context, meta schema.ClientMeta, parent
if err != nil {
return diag.WrapError(err)
}
if len(listClustersOutput.TaskDefinitionArns) == 0 {
return nil
for _, taskDefinitionArn := range listClustersOutput.TaskDefinitionArns {
res <- taskDefinitionArn
}
errs, ctx := errgroup.WithContext(ctx)
for _, t := range listClustersOutput.TaskDefinitionArns {
if err := sem.Acquire(ctx, 1); err != nil {
return diag.WrapError(err)
}
func(arn string) {
errs.Go(func() error {
defer sem.Release(1)
return fetchEcsTaskDefinition(ctx, res, svc, region, arn)
})
}(t)
}
err = errs.Wait()
if err != nil {
return diag.WrapError(err)
}

if listClustersOutput.NextToken == nil {
break
}
Expand Down
44 changes: 15 additions & 29 deletions resources/services/sagemaker/training_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@ import (
"github.com/cloudquery/cq-provider-aws/client"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

const MAX_GOROUTINES = 10

func SagemakerTrainingJobs() *schema.Table {
return &schema.Table{
Name: "aws_sagemaker_training_jobs",
Description: "Provides summary information about a training job.",
Resolver: fetchSagemakerTrainingJobs,
Name: "aws_sagemaker_training_jobs",
Description: "Provides summary information about a training job.",
Resolver: func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
return diag.WrapError(client.ListAndDetailResolver(ctx, meta, res, listSagemakerTrainingJobs, sagemakerTrainingJobsDetail))
},
Multiplex: client.ServiceAccountRegionMultiplexer("api.sagemaker"),
IgnoreError: client.IgnoreAccessDeniedServiceDisabled,
DeleteFilter: client.DeleteAccountRegionFilter,
Expand Down Expand Up @@ -566,26 +564,27 @@ func SagemakerTrainingJobs() *schema.Table {
// Table Resolver Functions
// ====================================================================================================================

func fetchTrainingJobDefinition(ctx context.Context, res chan<- interface{}, svc client.SageMakerClient, region string, n types.TrainingJobSummary) error {
func sagemakerTrainingJobsDetail(ctx context.Context, meta schema.ClientMeta, resultsChan chan<- interface{}, errorChan chan<- error, detail interface{}) {
c := meta.(*client.Client)
svc := c.Services().SageMaker
n := detail.(types.TrainingJobSummary)
config := sagemaker.DescribeTrainingJobInput{
TrainingJobName: n.TrainingJobName,
}
response, err := svc.DescribeTrainingJob(ctx, &config, func(options *sagemaker.Options) {
options.Region = region
options.Region = c.Region
})
if err != nil {
return diag.WrapError(err)
errorChan <- diag.WrapError(err)
return
}

res <- response
return nil
resultsChan <- response
}

func fetchSagemakerTrainingJobs(ctx context.Context, meta schema.ClientMeta, _ *schema.Resource, res chan<- interface{}) error {
func listSagemakerTrainingJobs(ctx context.Context, meta schema.ClientMeta, res chan<- interface{}) error {
c := meta.(*client.Client)
svc := c.Services().SageMaker
config := sagemaker.ListTrainingJobsInput{}
var sem = semaphore.NewWeighted(int64(MAX_GOROUTINES))

for {
response, err := svc.ListTrainingJobs(ctx, &config, func(options *sagemaker.Options) {
Expand All @@ -594,21 +593,8 @@ func fetchSagemakerTrainingJobs(ctx context.Context, meta schema.ClientMeta, _ *
if err != nil {
return diag.WrapError(err)
}
errs, ctx := errgroup.WithContext(ctx)
for _, d := range response.TrainingJobSummaries {
if err := sem.Acquire(ctx, 1); err != nil {
return diag.WrapError(err)
}
func(summary types.TrainingJobSummary) {
errs.Go(func() error {
defer sem.Release(1)
return fetchTrainingJobDefinition(ctx, res, svc, c.Region, summary)
})
}(d)
}
err = errs.Wait()
if err != nil {
return diag.WrapError(err)
res <- d
}
if aws.ToString(response.NextToken) == "" {
break
Expand Down

0 comments on commit 004a544

Please sign in to comment.