diff --git a/internal/catalogmetadata/client/client.go b/internal/catalogmetadata/client/client.go index 04495b686..ef81cc62c 100644 --- a/internal/catalogmetadata/client/client.go +++ b/internal/catalogmetadata/client/client.go @@ -13,6 +13,12 @@ import ( "github.com/operator-framework/operator-controller/internal/catalogmetadata" ) +// TODO: When a catalogd release containing https://github.com/operator-framework/catalogd/pull/168 +// is made this can be removed in favor of parsing `Catalog.Status` for the +// appropriate URL for fetching catalog contents. +const catalogdOnClusterBaseURL = "http://catalogd-catalogserver.catalogd-system.svc" +const catalogdOnClusterURLTemplate = "%s/catalogs/%s/all.json" + type ClientOpt func(c *Client) func WithBaseURL(baseURL string) ClientOpt { @@ -40,11 +46,15 @@ type Client struct { // instead of kube API server. We will need to swap this implementation. cl client.Client + // baseURL is the base URL used by the client when making a request + // to the catalogd HTTP server. When using NewClient() this defaults + // to http://catalogd-catalogserver.catalogd-system.svc + // TODO: When a catalogd release containing https://github.com/operator-framework/catalogd/pull/168 + // is made this can be removed in favor of parsing `Catalog.Status` for the + // appropriate URL for fetching catalog contents. baseURL string } -const catalogdOnClusterBaseURL = "http://catalogd-catalogserver.catalogd-system.svc" - func (c *Client) Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error) { var allBundles []*catalogmetadata.Bundle @@ -53,62 +63,58 @@ func (c *Client) Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error) return nil, err } for _, catalog := range catalogList.Items { - channels, err := fetchCatalogMetadata[catalogmetadata.Channel](ctx, c.baseURL, catalog.Name, declcfg.SchemaChannel) - if err != nil { - return nil, err - } + channels := []*catalogmetadata.Channel{} + bundles := []*catalogmetadata.Bundle{} - bundles, err := fetchCatalogMetadata[catalogmetadata.Bundle](ctx, c.baseURL, catalog.Name, declcfg.SchemaBundle) + // TODO: When a catalogd release containing https://github.com/operator-framework/catalogd/pull/168 + // is made this should be updated in favor of parsing `Catalog.Status` for the + // appropriate URL for fetching catalog contents. + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf(catalogdOnClusterURLTemplate, c.baseURL, catalog.Name), nil) if err != nil { - return nil, err + return nil, fmt.Errorf("error forming request: %s", err) } - bundles, err = populateExtraFields(catalog.Name, channels, bundles) + resp, err := http.DefaultClient.Do(req) if err != nil { - return nil, err + return nil, fmt.Errorf("error performing request: %s", err) } + defer resp.Body.Close() - allBundles = append(allBundles, bundles...) - } - - return allBundles, nil -} - -const catalogdOnClusterURLTemplate = "%s/catalogs/%s/all.json" - -func fetchCatalogMetadata[T catalogmetadata.Schemas](ctx context.Context, baseURL, catalogName, schema string) ([]*T, error) { - var metadata []*T - req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf(catalogdOnClusterURLTemplate, baseURL, catalogName), nil) - if err != nil { - return nil, fmt.Errorf("error forming request: %s", err) - } + err = declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error { + if err != nil { + return err + } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, fmt.Errorf("error performing request: %s", err) - } - defer resp.Body.Close() + switch meta.Schema { + case declcfg.SchemaChannel: + content, err := catalogmetadata.Unmarshal[catalogmetadata.Channel](meta) + if err != nil { + return fmt.Errorf("error unmarshalling catalog metadata: %s", err) + } + channels = append(channels, content) + case declcfg.SchemaBundle: + content, err := catalogmetadata.Unmarshal[catalogmetadata.Bundle](meta) + if err != nil { + return fmt.Errorf("error unmarshalling catalog metadata: %s", err) + } + bundles = append(bundles, content) + } - err = declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error { + return nil + }) if err != nil { - return err + return nil, fmt.Errorf("error processing response: %s", err) } - if meta.Schema == schema { - content, err := catalogmetadata.Unmarshal[T](meta) - if err != nil { - return fmt.Errorf("error unmarshalling catalog metadata: %s", err) - } - metadata = append(metadata, content) + bundles, err = populateExtraFields(catalog.Name, channels, bundles) + if err != nil { + return nil, err } - return nil - }) - if err != nil { - return nil, fmt.Errorf("error processing response: %s", err) + allBundles = append(allBundles, bundles...) } - return metadata, nil + return allBundles, nil } func populateExtraFields(catalogName string, channels []*catalogmetadata.Channel, bundles []*catalogmetadata.Bundle) ([]*catalogmetadata.Bundle, error) { diff --git a/internal/resolution/entitysources/catalogdsource.go b/internal/resolution/entitysources/catalogdsource.go index 106f893f8..cf2b4f569 100644 --- a/internal/resolution/entitysources/catalogdsource.go +++ b/internal/resolution/entitysources/catalogdsource.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" "github.com/operator-framework/deppy/pkg/deppy" @@ -15,15 +16,35 @@ import ( "github.com/operator-framework/operator-controller/internal/resolution/entities" ) +const catalogdOnClusterBaseURL = "http://catalogd-catalogserver.catalogd-system.svc" + // CatalogdEntitySource is a source for(/collection of) deppy defined input.Entity, built from content // made accessible on-cluster by https://github.com/operator-framework/catalogd. // It is an implementation of deppy defined input.EntitySource type CatalogdEntitySource struct { - client client.Client + client client.Client + baseURL string } -func NewCatalogdEntitySource(client client.Client) *CatalogdEntitySource { - return &CatalogdEntitySource{client: client} +type CatalogdEntitySourceOpt func(es *CatalogdEntitySource) + +func WithBaseURL(baseURL string) CatalogdEntitySourceOpt { + return func(es *CatalogdEntitySource) { + es.baseURL = baseURL + } +} + +func NewCatalogdEntitySource(client client.Client, opts ...CatalogdEntitySourceOpt) *CatalogdEntitySource { + ces := &CatalogdEntitySource{ + client: client, + baseURL: catalogdOnClusterBaseURL, + } + + for _, opt := range opts { + opt(ces) + } + + return ces } func (es *CatalogdEntitySource) Get(_ context.Context, _ deppy.Identifier) (*input.Entity, error) { @@ -32,7 +53,7 @@ func (es *CatalogdEntitySource) Get(_ context.Context, _ deppy.Identifier) (*inp func (es *CatalogdEntitySource) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) { resultSet := input.EntityList{} - entities, err := getEntities(ctx, es.client) + entities, err := getEntities(ctx, es.client, es.baseURL) if err != nil { return nil, err } @@ -45,7 +66,7 @@ func (es *CatalogdEntitySource) Filter(ctx context.Context, filter input.Predica } func (es *CatalogdEntitySource) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) { - entities, err := getEntities(ctx, es.client) + entities, err := getEntities(ctx, es.client, es.baseURL) if err != nil { return nil, err } @@ -60,7 +81,7 @@ func (es *CatalogdEntitySource) GroupBy(ctx context.Context, fn input.GroupByFun } func (es *CatalogdEntitySource) Iterate(ctx context.Context, fn input.IteratorFunction) error { - entities, err := getEntities(ctx, es.client) + entities, err := getEntities(ctx, es.client, es.baseURL) if err != nil { return err } @@ -72,7 +93,7 @@ func (es *CatalogdEntitySource) Iterate(ctx context.Context, fn input.IteratorFu return nil } -func getEntities(ctx context.Context, cl client.Client) (input.EntityList, error) { +func getEntities(ctx context.Context, cl client.Client, baseURL string) (input.EntityList, error) { allEntitiesList := input.EntityList{} var catalogList catalogd.CatalogList @@ -80,7 +101,7 @@ func getEntities(ctx context.Context, cl client.Client) (input.EntityList, error return nil, err } for _, catalog := range catalogList.Items { - channels, bundles, err := fetchCatalogMetadata(ctx, cl, catalog.Name) + channels, bundles, err := fetchCatalogMetadata(ctx, baseURL, catalog.Name) if err != nil { return nil, err } @@ -152,12 +173,12 @@ func MetadataToEntities(catalogName string, channels []declcfg.Channel, bundles return entityList, nil } -func fetchCatalogMetadata(ctx context.Context, cl client.Client, catalogName string) ([]declcfg.Channel, []declcfg.Bundle, error) { - channels, err := fetchCatalogMetadataByScheme[declcfg.Channel](ctx, cl, declcfg.SchemaChannel, catalogName) +func fetchCatalogMetadata(ctx context.Context, baseURL, catalogName string) ([]declcfg.Channel, []declcfg.Bundle, error) { + channels, err := fetchCatalogMetadataByScheme[declcfg.Channel](ctx, baseURL, declcfg.SchemaChannel, catalogName) if err != nil { return nil, nil, err } - bundles, err := fetchCatalogMetadataByScheme[declcfg.Bundle](ctx, cl, declcfg.SchemaBundle, catalogName) + bundles, err := fetchCatalogMetadataByScheme[declcfg.Bundle](ctx, baseURL, declcfg.SchemaBundle, catalogName) if err != nil { return nil, nil, err } @@ -169,21 +190,40 @@ type declcfgSchema interface { declcfg.Package | declcfg.Bundle | declcfg.Channel } +const catalogdOnClusterURLTemplate = "%s/catalogs/%s/all.json" + // TODO: Cleanup once https://github.com/golang/go/issues/45380 implemented // We should be able to get rid of the schema arg and switch based on the type passed to this generic -func fetchCatalogMetadataByScheme[T declcfgSchema](ctx context.Context, cl client.Client, schema, catalogName string) ([]T, error) { - cmList := catalogd.CatalogMetadataList{} - if err := cl.List(ctx, &cmList, client.MatchingLabels{"schema": schema, "catalog": catalogName}); err != nil { - return nil, err +func fetchCatalogMetadataByScheme[T declcfgSchema](ctx context.Context, baseURL, schema, catalogName string) ([]T, error) { + contents := []T{} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf(catalogdOnClusterURLTemplate, baseURL, catalogName), nil) + if err != nil { + return nil, fmt.Errorf("error forming request: %s", err) } - contents := []T{} - for _, cm := range cmList.Items { - var content T - if err := json.Unmarshal(cm.Spec.Content, &content); err != nil { - return nil, err + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("error performing request: %s", err) + } + defer resp.Body.Close() + + err = declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error { + if err != nil { + return err + } + + if meta.Schema == schema { + var content T + if err := json.Unmarshal(meta.Blob, &content); err != nil { + return fmt.Errorf("error unmarshalling content: %s", err) + } + contents = append(contents, content) } - contents = append(contents, content) + + return nil + }) + if err != nil { + return nil, fmt.Errorf("error processing response: %s", err) } return contents, nil