Skip to content

Commit

Permalink
use catalogd HTTP server instead of CatalogMetadata API
Browse files Browse the repository at this point in the history
Signed-off-by: Bryce Palmer <[email protected]>
  • Loading branch information
everettraven committed Sep 21, 2023
1 parent 52a1e28 commit b2edc1b
Show file tree
Hide file tree
Showing 11 changed files with 786 additions and 324 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/onsi/ginkgo/v2 v2.12.1
github.com/onsi/gomega v1.27.10
github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42
github.com/operator-framework/catalogd v0.6.0
github.com/operator-framework/deppy v0.0.1
github.com/operator-framework/catalogd v0.7.0
github.com/operator-framework/operator-registry v1.28.0
github.com/operator-framework/rukpak v0.13.0
github.com/spf13/pflag v1.0.5
Expand Down
143 changes: 143 additions & 0 deletions internal/catalogmetadata/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package cache

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sync"

catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"

"github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
)

var _ client.Fetcher = &filesystemCache{}

// NewFilesystemCache returns a client.Fetcher implementation that uses a
// local filesystem to cache Catalog contents. When fetching the Catalog contents
// it will:
// - Check if the Catalog is cached
// - IF !cached it will fetch from the catalogd HTTP server and cache the response
// - IF cached it will verify the cache is up to date. If it is up to date it will return
// the cached contents, if not it will fetch the new contents from the catalogd HTTP
// server and update the cached contents.
func NewFilesystemCache(cachePath string, tripper http.RoundTripper) client.Fetcher {
return &filesystemCache{
cachePath: cachePath,
mutex: sync.RWMutex{},
tripper: tripper,
cacheDataByCatalogName: map[string]cacheData{},
}
}

// cacheData holds information about a catalog
// other than it's contents that is used for
// making decisions on when to attempt to refresh
// the cache.
type cacheData struct {
ResolvedRef string
}

// FilesystemCache is a cache that
// uses the local filesystem for caching
// catalog contents. It will fetch catalog
// contents if the catalog does not already
// exist in the cache.
type filesystemCache struct {
mutex sync.RWMutex
cachePath string
tripper http.RoundTripper
cacheDataByCatalogName map[string]cacheData
}

// FetchCatalogContents implements the client.Fetcher interface and
// will fetch the contents for the provided Catalog from the filesystem.
// If the provided Catalog has not yet been cached, it will make a GET
// request to the Catalogd HTTP server to get the Catalog contents and cache
// them. The cache will be updated automatically if a Catalog is noticed to
// have a different resolved image reference.
// The Catalog provided to this function is expected to:
// - Be non-nil
// - Have a non-nil Catalog.Status.ResolvedSource.Image
// This ensures that we are only attempting to fetch catalog contents for Catalog
// resources that have been successfully reconciled, unpacked, and are being served.
// These requirements help ensure that we can rely on status conditions to determine
// when to issue a request to update the cached Catalog contents.
func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.Catalog) (io.ReadCloser, error) {
if catalog == nil {
return nil, fmt.Errorf("error: provided catalog must be non-nil")
}

if catalog.Status.ResolvedSource == nil {
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource value", catalog.Name)
}

if catalog.Status.ResolvedSource.Image == nil {
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource.image value", catalog.Name)
}

cacheDir := filepath.Join(fsc.cachePath, catalog.Name)
cacheFilePath := filepath.Join(cacheDir, "data.json")

fsc.mutex.RLock()
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if catalog.Status.ResolvedSource.Image.Ref == data.ResolvedRef {
fsc.mutex.RUnlock()
return os.Open(cacheFilePath)
}
}
fsc.mutex.RUnlock()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil)
if err != nil {
return nil, fmt.Errorf("error forming request: %s", err)
}

resp, err := fsc.tripper.RoundTrip(req)
if err != nil {
return nil, fmt.Errorf("error performing request: %s", err)
}
defer resp.Body.Close()

switch resp.StatusCode {
case http.StatusOK:
contents, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %s", err)
}

fsc.mutex.Lock()
defer fsc.mutex.Unlock()

// make sure we only write if this info hasn't been updated
// by another thread. The check here, if multiple threads are
// updating this, has no way to tell if the current ref is the
// newest possible ref. If another thread has already updated
// this to be the same value, skip the write logic and return
// the cached contents
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if data.ResolvedRef == catalog.Status.ResolvedSource.Image.Ref {
break
}
}

if err = os.MkdirAll(cacheDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("error creating cache directory for Catalog %q: %s", catalog.Name, err)
}

if err = os.WriteFile(cacheFilePath, contents, os.ModePerm); err != nil {
return nil, fmt.Errorf("error caching response: %s", err)
}

fsc.cacheDataByCatalogName[catalog.Name] = cacheData{
ResolvedRef: catalog.Status.ResolvedSource.Image.Ref,
}
default:
return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
}

return os.Open(cacheFilePath)
}
Loading

0 comments on commit b2edc1b

Please sign in to comment.