Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use catalogd HTTP Server instead of CatalogMetadata API #411

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package main

import (
"flag"
"net/http"
"os"
"time"

"github.com/spf13/pflag"
"go.uber.org/zap/zapcore"
Expand All @@ -35,6 +37,7 @@ import (
rukpakv1alpha1 "github.com/operator-framework/rukpak/api/v1alpha1"

operatorsv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1"
"github.com/operator-framework/operator-controller/internal/catalogmetadata/cache"
catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
"github.com/operator-framework/operator-controller/internal/controllers"
"github.com/operator-framework/operator-controller/pkg/features"
Expand All @@ -56,14 +59,18 @@ func init() {
}

func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var (
metricsAddr string
enableLeaderElection bool
probeAddr string
cachePath string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.StringVar(&cachePath, "cache-path", "/var/cache", "The local directory path used for filesystem based caching")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -100,7 +107,7 @@ func main() {
}

cl := mgr.GetClient()
catalogClient := catalogclient.New(cl)
catalogClient := catalogclient.New(cl, cache.NewFilesystemCache(cachePath, &http.Client{Timeout: 10 * time.Second}))

if err = (&controllers.OperatorReconciler{
Client: cl,
Expand Down
6 changes: 6 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ spec:
image: controller:latest
imagePullPolicy: IfNotPresent
name: manager
volumeMounts:
- name: cache
mountPath: /var/cache
securityContext:
allowPrivilegeEscalation: false
capabilities:
Expand All @@ -97,3 +100,6 @@ spec:
memory: 64Mi
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
volumes:
- name: cache
emptyDir: {}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/onsi/ginkgo/v2 v2.12.1
github.com/onsi/gomega v1.27.10
github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42
github.com/operator-framework/catalogd v0.6.0
github.com/operator-framework/catalogd v0.7.0
github.com/operator-framework/deppy v0.0.1
github.com/operator-framework/operator-registry v1.28.0
github.com/operator-framework/rukpak v0.13.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,8 @@ github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3
github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8=
github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42 h1:d/Pnr19TnmIq3zQ6ebewC+5jt5zqYbRkvYd37YZENQY=
github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42/go.mod h1:l/cuwtPxkVUY7fzYgdust2m9tlmb8I4pOvbsUufRb24=
github.com/operator-framework/catalogd v0.6.0 h1:dSZ54MVSHJ8hcoV7OCRxnk3x4O3ramlyPvvz0vsKYdk=
github.com/operator-framework/catalogd v0.6.0/go.mod h1:I0n086a4a+nP1YZy742IrPaWvOlWu0Mj6qA6j4K96Vg=
github.com/operator-framework/catalogd v0.7.0 h1:L0uesxq+r59rGubtxMoVtIShKn7gSSSLqxpWLfwpAaw=
github.com/operator-framework/catalogd v0.7.0/go.mod h1:tVhaenJVFTHHgdJ0Pju7U4G3epeoZfUWWM1J5nPISPQ=
github.com/operator-framework/deppy v0.0.1 h1:PLTtaFGwktPhKuKZkfUruTimrWpyaO3tghbsjs0uMjc=
github.com/operator-framework/deppy v0.0.1/go.mod h1:EV6vnxRodSFRn2TFztfxFhMPGh5QufOhn3tpIP1Z8cc=
github.com/operator-framework/operator-registry v1.28.0 h1:vtmd2WgJxkx7vuuOxW4k5Le/oo0SfonSeJVMU3rKIfk=
Expand Down
150 changes: 150 additions & 0 deletions internal/catalogmetadata/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package cache

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sync"

catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"

"github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
)

var _ client.Fetcher = &filesystemCache{}

// NewFilesystemCache returns a client.Fetcher implementation that uses a
// local filesystem to cache Catalog contents. When fetching the Catalog contents
// it will:
// - Check if the Catalog is cached
// - IF !cached it will fetch from the catalogd HTTP server and cache the response
// - IF cached it will verify the cache is up to date. If it is up to date it will return
// the cached contents, if not it will fetch the new contents from the catalogd HTTP
// server and update the cached contents.
func NewFilesystemCache(cachePath string, client *http.Client) client.Fetcher {
return &filesystemCache{
cachePath: cachePath,
mutex: sync.RWMutex{},
client: client,
cacheDataByCatalogName: map[string]cacheData{},
}
}

// cacheData holds information about a catalog
// other than it's contents that is used for
// making decisions on when to attempt to refresh
// the cache.
type cacheData struct {
ResolvedRef string
}

// FilesystemCache is a cache that
// uses the local filesystem for caching
// catalog contents. It will fetch catalog
// contents if the catalog does not already
// exist in the cache.
type filesystemCache struct {
mutex sync.RWMutex
cachePath string
client *http.Client
cacheDataByCatalogName map[string]cacheData
}

// FetchCatalogContents implements the client.Fetcher interface and
// will fetch the contents for the provided Catalog from the filesystem.
// If the provided Catalog has not yet been cached, it will make a GET
// request to the Catalogd HTTP server to get the Catalog contents and cache
// them. The cache will be updated automatically if a Catalog is noticed to
// have a different resolved image reference.
// The Catalog provided to this function is expected to:
// - Be non-nil
// - Have a non-nil Catalog.Status.ResolvedSource.Image
// This ensures that we are only attempting to fetch catalog contents for Catalog
// resources that have been successfully reconciled, unpacked, and are being served.
// These requirements help ensure that we can rely on status conditions to determine
// when to issue a request to update the cached Catalog contents.
func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.Catalog) (io.ReadCloser, error) {
if catalog == nil {
return nil, fmt.Errorf("error: provided catalog must be non-nil")
}

if catalog.Status.ResolvedSource == nil {
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource value", catalog.Name)
}

if catalog.Status.ResolvedSource.Image == nil {
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource.image value", catalog.Name)
}
joelanford marked this conversation as resolved.
Show resolved Hide resolved

cacheDir := filepath.Join(fsc.cachePath, catalog.Name)
cacheFilePath := filepath.Join(cacheDir, "data.json")

fsc.mutex.RLock()
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if catalog.Status.ResolvedSource.Image.Ref == data.ResolvedRef {
everettraven marked this conversation as resolved.
Show resolved Hide resolved
fsc.mutex.RUnlock()
return os.Open(cacheFilePath)
}
}
fsc.mutex.RUnlock()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil)
if err != nil {
return nil, fmt.Errorf("error forming request: %s", err)
}

resp, err := fsc.client.Do(req)
if err != nil {
return nil, fmt.Errorf("error performing request: %s", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
}

fsc.mutex.Lock()
defer fsc.mutex.Unlock()

// make sure we only write if this info hasn't been updated
// by another thread. The check here, if multiple threads are
// updating this, has no way to tell if the current ref is the
// newest possible ref. If another thread has already updated
// this to be the same value, skip the write logic and return
// the cached contents
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if data.ResolvedRef == catalog.Status.ResolvedSource.Image.Ref {
return os.Open(cacheFilePath)
}
}

if err = os.MkdirAll(cacheDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("error creating cache directory for Catalog %q: %s", catalog.Name, err)
}

file, err := os.Create(cacheFilePath)
if err != nil {
return nil, fmt.Errorf("error creating cache file for Catalog %q: %s", catalog.Name, err)
}

everettraven marked this conversation as resolved.
Show resolved Hide resolved
if _, err := io.Copy(file, resp.Body); err != nil {
return nil, fmt.Errorf("error writing contents to cache file for Catalog %q: %s", catalog.Name, err)
}

if err = file.Sync(); err != nil {
return nil, fmt.Errorf("error syncing contents to cache file for Catalog %q: %s", catalog.Name, err)
}

if _, err = file.Seek(0, 0); err != nil {
return nil, fmt.Errorf("error resetting offset for cache file reader for Catalog %q: %s", catalog.Name, err)
}

fsc.cacheDataByCatalogName[catalog.Name] = cacheData{
ResolvedRef: catalog.Status.ResolvedSource.Image.Ref,
}

return file, nil
}
Loading