Skip to content

Commit

Permalink
minimally update entitysources, optimize catalogmetadata/client.Client
Browse files Browse the repository at this point in the history
Signed-off-by: Bryce Palmer <[email protected]>
  • Loading branch information
everettraven committed Sep 14, 2023
1 parent f6d1bc3 commit e94fddd
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 63 deletions.
90 changes: 48 additions & 42 deletions internal/catalogmetadata/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/operator-framework/operator-controller/internal/catalogmetadata"
)

// TODO: When a catalogd release containing https://github.com/operator-framework/catalogd/pull/168
// is made this can be removed in favor of parsing `Catalog.Status` for the
// appropriate URL for fetching catalog contents.
const catalogdOnClusterBaseURL = "http://catalogd-catalogserver.catalogd-system.svc"
const catalogdOnClusterURLTemplate = "%s/catalogs/%s/all.json"

type ClientOpt func(c *Client)

Check warning on line 22 in internal/catalogmetadata/client/client.go

View workflow job for this annotation

GitHub Actions / lint

exported: type name will be used as client.ClientOpt by other packages, and that stutters; consider calling this Opt (revive)

func WithBaseURL(baseURL string) ClientOpt {
Expand Down Expand Up @@ -40,11 +46,15 @@ type Client struct {
// instead of kube API server. We will need to swap this implementation.
cl client.Client

// baseURL is the base URL used by the client when making a request
// to the catalogd HTTP server. When using NewClient() this defaults
// to http://catalogd-catalogserver.catalogd-system.svc
// TODO: When a catalogd release containing https://github.com/operator-framework/catalogd/pull/168
// is made this can be removed in favor of parsing `Catalog.Status` for the
// appropriate URL for fetching catalog contents.
baseURL string
}

const catalogdOnClusterBaseURL = "http://catalogd-catalogserver.catalogd-system.svc"

func (c *Client) Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error) {
var allBundles []*catalogmetadata.Bundle

Expand All @@ -53,62 +63,58 @@ func (c *Client) Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error)
return nil, err
}
for _, catalog := range catalogList.Items {
channels, err := fetchCatalogMetadata[catalogmetadata.Channel](ctx, c.baseURL, catalog.Name, declcfg.SchemaChannel)
if err != nil {
return nil, err
}
channels := []*catalogmetadata.Channel{}
bundles := []*catalogmetadata.Bundle{}

bundles, err := fetchCatalogMetadata[catalogmetadata.Bundle](ctx, c.baseURL, catalog.Name, declcfg.SchemaBundle)
// TODO: When a catalogd release containing https://github.com/operator-framework/catalogd/pull/168
// is made this should be updated in favor of parsing `Catalog.Status` for the
// appropriate URL for fetching catalog contents.
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf(catalogdOnClusterURLTemplate, c.baseURL, catalog.Name), nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("error forming request: %s", err)

Check warning on line 74 in internal/catalogmetadata/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/catalogmetadata/client/client.go#L74

Added line #L74 was not covered by tests
}

bundles, err = populateExtraFields(catalog.Name, channels, bundles)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
return nil, fmt.Errorf("error performing request: %s", err)

Check warning on line 79 in internal/catalogmetadata/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/catalogmetadata/client/client.go#L79

Added line #L79 was not covered by tests
}
defer resp.Body.Close()

allBundles = append(allBundles, bundles...)
}

return allBundles, nil
}

const catalogdOnClusterURLTemplate = "%s/catalogs/%s/all.json"

func fetchCatalogMetadata[T catalogmetadata.Schemas](ctx context.Context, baseURL, catalogName, schema string) ([]*T, error) {
var metadata []*T
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf(catalogdOnClusterURLTemplate, baseURL, catalogName), nil)
if err != nil {
return nil, fmt.Errorf("error forming request: %s", err)
}
err = declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error {
if err != nil {
return err
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error performing request: %s", err)
}
defer resp.Body.Close()
switch meta.Schema {
case declcfg.SchemaChannel:
content, err := catalogmetadata.Unmarshal[catalogmetadata.Channel](meta)
if err != nil {
return fmt.Errorf("error unmarshalling catalog metadata: %s", err)

Check warning on line 92 in internal/catalogmetadata/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/catalogmetadata/client/client.go#L92

Added line #L92 was not covered by tests
}
channels = append(channels, content)
case declcfg.SchemaBundle:
content, err := catalogmetadata.Unmarshal[catalogmetadata.Bundle](meta)
if err != nil {
return fmt.Errorf("error unmarshalling catalog metadata: %s", err)

Check warning on line 98 in internal/catalogmetadata/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/catalogmetadata/client/client.go#L98

Added line #L98 was not covered by tests
}
bundles = append(bundles, content)
}

err = declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error {
return nil
})
if err != nil {
return err
return nil, fmt.Errorf("error processing response: %s", err)
}

if meta.Schema == schema {
content, err := catalogmetadata.Unmarshal[T](meta)
if err != nil {
return fmt.Errorf("error unmarshalling catalog metadata: %s", err)
}
metadata = append(metadata, content)
bundles, err = populateExtraFields(catalog.Name, channels, bundles)
if err != nil {
return nil, err
}

return nil
})
if err != nil {
return nil, fmt.Errorf("error processing response: %s", err)
allBundles = append(allBundles, bundles...)
}

return metadata, nil
return allBundles, nil
}

func populateExtraFields(catalogName string, channels []*catalogmetadata.Channel, bundles []*catalogmetadata.Bundle) ([]*catalogmetadata.Bundle, error) {
Expand Down
82 changes: 61 additions & 21 deletions internal/resolution/entitysources/catalogdsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"

catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
"github.com/operator-framework/deppy/pkg/deppy"
Expand All @@ -15,15 +16,35 @@ import (
"github.com/operator-framework/operator-controller/internal/resolution/entities"
)

const catalogdOnClusterBaseURL = "http://catalogd-catalogserver.catalogd-system.svc"

// CatalogdEntitySource is a source for(/collection of) deppy defined input.Entity, built from content
// made accessible on-cluster by https://github.com/operator-framework/catalogd.
// It is an implementation of deppy defined input.EntitySource
type CatalogdEntitySource struct {
client client.Client
client client.Client
baseURL string
}

func NewCatalogdEntitySource(client client.Client) *CatalogdEntitySource {
return &CatalogdEntitySource{client: client}
type CatalogdEntitySourceOpt func(es *CatalogdEntitySource)

func WithBaseURL(baseURL string) CatalogdEntitySourceOpt {
return func(es *CatalogdEntitySource) {
es.baseURL = baseURL

Check warning on line 33 in internal/resolution/entitysources/catalogdsource.go

View check run for this annotation

Codecov / codecov/patch

internal/resolution/entitysources/catalogdsource.go#L32-L33

Added lines #L32 - L33 were not covered by tests
}
}

func NewCatalogdEntitySource(client client.Client, opts ...CatalogdEntitySourceOpt) *CatalogdEntitySource {
ces := &CatalogdEntitySource{
client: client,
baseURL: catalogdOnClusterBaseURL,
}

for _, opt := range opts {
opt(ces)

Check warning on line 44 in internal/resolution/entitysources/catalogdsource.go

View check run for this annotation

Codecov / codecov/patch

internal/resolution/entitysources/catalogdsource.go#L44

Added line #L44 was not covered by tests
}

return ces
}

func (es *CatalogdEntitySource) Get(_ context.Context, _ deppy.Identifier) (*input.Entity, error) {
Expand All @@ -32,7 +53,7 @@ func (es *CatalogdEntitySource) Get(_ context.Context, _ deppy.Identifier) (*inp

func (es *CatalogdEntitySource) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) {
resultSet := input.EntityList{}
entities, err := getEntities(ctx, es.client)
entities, err := getEntities(ctx, es.client, es.baseURL)
if err != nil {
return nil, err
}
Expand All @@ -45,7 +66,7 @@ func (es *CatalogdEntitySource) Filter(ctx context.Context, filter input.Predica
}

func (es *CatalogdEntitySource) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) {
entities, err := getEntities(ctx, es.client)
entities, err := getEntities(ctx, es.client, es.baseURL)

Check warning on line 69 in internal/resolution/entitysources/catalogdsource.go

View check run for this annotation

Codecov / codecov/patch

internal/resolution/entitysources/catalogdsource.go#L69

Added line #L69 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -60,7 +81,7 @@ func (es *CatalogdEntitySource) GroupBy(ctx context.Context, fn input.GroupByFun
}

func (es *CatalogdEntitySource) Iterate(ctx context.Context, fn input.IteratorFunction) error {
entities, err := getEntities(ctx, es.client)
entities, err := getEntities(ctx, es.client, es.baseURL)

Check warning on line 84 in internal/resolution/entitysources/catalogdsource.go

View check run for this annotation

Codecov / codecov/patch

internal/resolution/entitysources/catalogdsource.go#L84

Added line #L84 was not covered by tests
if err != nil {
return err
}
Expand All @@ -72,15 +93,15 @@ func (es *CatalogdEntitySource) Iterate(ctx context.Context, fn input.IteratorFu
return nil
}

func getEntities(ctx context.Context, cl client.Client) (input.EntityList, error) {
func getEntities(ctx context.Context, cl client.Client, baseURL string) (input.EntityList, error) {
allEntitiesList := input.EntityList{}

var catalogList catalogd.CatalogList
if err := cl.List(ctx, &catalogList); err != nil {
return nil, err
}
for _, catalog := range catalogList.Items {
channels, bundles, err := fetchCatalogMetadata(ctx, cl, catalog.Name)
channels, bundles, err := fetchCatalogMetadata(ctx, baseURL, catalog.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,12 +173,12 @@ func MetadataToEntities(catalogName string, channels []declcfg.Channel, bundles
return entityList, nil
}

func fetchCatalogMetadata(ctx context.Context, cl client.Client, catalogName string) ([]declcfg.Channel, []declcfg.Bundle, error) {
channels, err := fetchCatalogMetadataByScheme[declcfg.Channel](ctx, cl, declcfg.SchemaChannel, catalogName)
func fetchCatalogMetadata(ctx context.Context, baseURL, catalogName string) ([]declcfg.Channel, []declcfg.Bundle, error) {
channels, err := fetchCatalogMetadataByScheme[declcfg.Channel](ctx, baseURL, declcfg.SchemaChannel, catalogName)
if err != nil {
return nil, nil, err
}
bundles, err := fetchCatalogMetadataByScheme[declcfg.Bundle](ctx, cl, declcfg.SchemaBundle, catalogName)
bundles, err := fetchCatalogMetadataByScheme[declcfg.Bundle](ctx, baseURL, declcfg.SchemaBundle, catalogName)
if err != nil {
return nil, nil, err
}
Expand All @@ -169,21 +190,40 @@ type declcfgSchema interface {
declcfg.Package | declcfg.Bundle | declcfg.Channel
}

const catalogdOnClusterURLTemplate = "%s/catalogs/%s/all.json"

// TODO: Cleanup once https://github.com/golang/go/issues/45380 implemented
// We should be able to get rid of the schema arg and switch based on the type passed to this generic
func fetchCatalogMetadataByScheme[T declcfgSchema](ctx context.Context, cl client.Client, schema, catalogName string) ([]T, error) {
cmList := catalogd.CatalogMetadataList{}
if err := cl.List(ctx, &cmList, client.MatchingLabels{"schema": schema, "catalog": catalogName}); err != nil {
return nil, err
func fetchCatalogMetadataByScheme[T declcfgSchema](ctx context.Context, baseURL, schema, catalogName string) ([]T, error) {
contents := []T{}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf(catalogdOnClusterURLTemplate, baseURL, catalogName), nil)
if err != nil {
return nil, fmt.Errorf("error forming request: %s", err)

Check warning on line 201 in internal/resolution/entitysources/catalogdsource.go

View check run for this annotation

Codecov / codecov/patch

internal/resolution/entitysources/catalogdsource.go#L201

Added line #L201 was not covered by tests
}

contents := []T{}
for _, cm := range cmList.Items {
var content T
if err := json.Unmarshal(cm.Spec.Content, &content); err != nil {
return nil, err
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error performing request: %s", err)

Check warning on line 206 in internal/resolution/entitysources/catalogdsource.go

View check run for this annotation

Codecov / codecov/patch

internal/resolution/entitysources/catalogdsource.go#L206

Added line #L206 was not covered by tests
}
defer resp.Body.Close()

err = declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error {
if err != nil {
return err
}

if meta.Schema == schema {
var content T
if err := json.Unmarshal(meta.Blob, &content); err != nil {
return fmt.Errorf("error unmarshalling content: %s", err)

Check warning on line 218 in internal/resolution/entitysources/catalogdsource.go

View check run for this annotation

Codecov / codecov/patch

internal/resolution/entitysources/catalogdsource.go#L218

Added line #L218 was not covered by tests
}
contents = append(contents, content)
}
contents = append(contents, content)

return nil
})
if err != nil {
return nil, fmt.Errorf("error processing response: %s", err)
}

return contents, nil
Expand Down

0 comments on commit e94fddd

Please sign in to comment.