Skip to content

Commit

Permalink
Serve locally stored fbc content via an http server
Browse files Browse the repository at this point in the history
Closes operator-framework#113

Signed-off-by: Anik <[email protected]>
  • Loading branch information
anik120 committed Sep 7, 2023
1 parent d30f161 commit 1f54620
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 11 deletions.
12 changes: 11 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand Down Expand Up @@ -67,6 +69,7 @@ func main() {
catalogdVersion bool
systemNamespace string
storageDir string
catalogServerAddr string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -77,6 +80,7 @@ func main() {
flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images")
flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads")
flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from")
flag.StringVar(&catalogServerAddr, "catalogs-server-addr", "127.0.0.1:8083", "The address where the unpacked catalogs' content will be accessible")
flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof")
flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit")
opts := zap.Options{
Expand Down Expand Up @@ -122,10 +126,16 @@ func main() {
if err := os.MkdirAll(storageDir, 0700); err != nil {
setupLog.Error(err, "unable to create storage directory for catalogs")
}
localStorage := storage.LocalDir{RootDir: storageDir, Server: &http.Server{Addr: catalogServerAddr, ReadHeaderTimeout: 3. * time.Second}}
if err := mgr.Add(localStorage); err != nil {
setupLog.Error(err, "unable to start catalog server")
os.Exit(1)
}

if err = (&corecontrollers.CatalogReconciler{
Client: mgr.GetClient(),
Unpacker: unpacker,
Storage: storage.LocalDir{RootDir: storageDir},
Storage: localStorage,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Catalog")
os.Exit(1)
Expand Down
3 changes: 2 additions & 1 deletion config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ spec:
- "--metrics-bind-address=127.0.0.1:8080"
- "--leader-elect"
- "--catalogs-storage-dir=/var/cache/catalogs"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=false"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=true"
- "--catalogs-server-addr=:8083"
13 changes: 13 additions & 0 deletions config/rbac/catalogserver_service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: catalogserver
namespace: system
spec:
selector:
control-plane: controller-manager
ports:
- name: http
protocol: TCP
port: 80
targetPort: 8083
1 change: 1 addition & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ resources:
- role_binding.yaml
- leader_election_role.yaml
- leader_election_role_binding.yaml
- catalogserver_service.yaml
# Comment the following 4 lines if you want to disable
# the auth proxy (https://github.com/brancz/kube-rbac-proxy)
# which protects your /metrics endpoint.
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/core/catalog_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (m MockStore) Delete(_ string) error {
return nil
}

func (m MockStore) Start(_ context.Context) error {
panic("not needed")
}

var _ = Describe("Catalogd Controller Test", func() {
format.MaxLength = 0
var (
Expand Down
36 changes: 36 additions & 0 deletions pkg/storage/localdir.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package storage

import (
"context"
"fmt"
"io/fs"
"net/http"
"os"
"path/filepath"

Expand All @@ -16,6 +18,7 @@ import (
// atomic view of the content for a catalog.
type LocalDir struct {
RootDir string
Server *http.Server
}

func (s LocalDir) Store(catalog string, fsys fs.FS) error {
Expand Down Expand Up @@ -45,3 +48,36 @@ func (s LocalDir) Store(catalog string, fsys fs.FS) error {
func (s LocalDir) Delete(catalog string) error {
return os.RemoveAll(filepath.Join(s.RootDir, catalog))
}

func (s LocalDir) Start(ctx context.Context) error {
s.Server.Handler = http.StripPrefix("/catalogs/", http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)})))
errChan := make(chan error)
go func() {
errChan <- s.Server.ListenAndServe()
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
return s.Server.Shutdown(context.Background())
}
}

type filesOnlyFilesystem struct {
FS fs.FS
}

func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) {
file, err := f.FS.Open(name)
if err != nil {
return nil, err
}
stat, err := file.Stat()
if err != nil {
return nil, err
}
if !stat.Mode().IsRegular() {
return nil, os.ErrNotExist
}
return file, nil
}
9 changes: 7 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package storage

import "io/fs"
import (
"context"
"io/fs"
)

// Instance is a storage instance that stores FBC content of catalogs
// added to a cluster. It can be used to Store or Delete FBC in the
// host's filesystem
// host's filesystem. It also a manager runnable object, that starts
// a server to serve the content stored.
type Instance interface {
Store(catalog string, fsys fs.FS) error
Delete(catalog string) error
Start(ctx context.Context) error
}
19 changes: 15 additions & 4 deletions test/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -15,8 +19,11 @@ import (
)

var (
cfg *rest.Config
c client.Client
cfg *rest.Config
c client.Client
err error
defaultSystemNamespace = "catalogd-system"
kubeClient kubernetes.Interface
)

func TestE2E(t *testing.T) {
Expand All @@ -30,8 +37,12 @@ var _ = BeforeSuite(func() {
cfg = ctrl.GetConfigOrDie()

scheme := runtime.NewScheme()
err := catalogd.AddToScheme(scheme)
Expect(err).ToNot(HaveOccurred())
Expect(catalogd.AddToScheme(scheme)).To(Succeed())
Expect(corev1.AddToScheme(scheme)).To(Succeed())
Expect(rbacv1.AddToScheme(scheme)).To(Succeed())
Expect(batchv1.AddToScheme(scheme)).To(Succeed())
c, err = client.New(cfg, client.Options{Scheme: scheme})
Expect(err).To(Not(HaveOccurred()))
kubeClient, err = kubernetes.NewForConfig(cfg)
Expect(err).ToNot(HaveOccurred())
})
105 changes: 102 additions & 3 deletions test/e2e/unpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"fmt"
"os"

"github.com/google/go-cmp/cmp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/operator-framework/operator-registry/alpha/declcfg"
"github.com/operator-framework/operator-registry/alpha/property"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -44,6 +47,8 @@ var _ = Describe("Catalog Unpacking", func() {
var (
ctx context.Context
catalog *catalogd.Catalog
job batchv1.Job
pod corev1.Pod
)
When("A Catalog is created", func() {
BeforeEach(func() {
Expand Down Expand Up @@ -169,11 +174,105 @@ var _ = Describe("Catalog Unpacking", func() {
err = c.Get(ctx, client.ObjectKeyFromObject(expectedBndl), bndl)
Expect(err).ToNot(HaveOccurred())
Expect(expectedBndl).To(komega.EqualObject(bndl, komega.IgnoreAutogeneratedMetadata))
})

AfterEach(func() {
err := c.Delete(ctx, catalog)
By("Making sure the catalog content is available via the http server")
// (TODO): Get the URL from the CR once https://github.com/operator-framework/catalogd/issues/119 is done
catalogURL := fmt.Sprintf("%s.%s.svc/catalogs/%s/all.json", "catalogd-catalogserver", defaultSystemNamespace, catalogName)
job = batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-svr-job",
Namespace: defaultSystemNamespace,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-svr",
Image: "curlimages/curl",
Command: []string{"sh", "-c", "curl --silent --show-error --location --insecure --header \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" -o - " + catalogURL},
},
},
RestartPolicy: "Never",
},
},
},
}
err = c.Create(ctx, &job)
Expect(err).ToNot(HaveOccurred())
Eventually(func() (bool, error) {
err = c.Get(ctx, types.NamespacedName{Name: "test-svr-job", Namespace: defaultSystemNamespace}, &job)
if err != nil {
return false, err
}
return job.Status.CompletionTime != nil && job.Status.Succeeded == 1, err
}).Should(BeTrue())
pods := &corev1.PodList{}
Eventually(func() (bool, error) {
err := c.List(context.Background(), pods, client.MatchingLabels{"job-name": "test-svr-job"})
if err != nil {
return false, err
}
return len(pods.Items) == 1, nil
}).Should(BeTrue())

type value struct {
PackageName string `json:"packageName"`
Version string `json:"version"`
}
rawMessage, err := json.Marshal(value{PackageName: "prometheus", Version: "0.47.0"})
Expect(err).To(Not(HaveOccurred()))
expectedDeclCfg := declcfg.DeclarativeConfig{
Packages: []declcfg.Package{
{
Schema: "olm.package",
Name: "prometheus",
DefaultChannel: "beta",
},
},
Channels: []declcfg.Channel{
{
Schema: "olm.channel",
Name: "beta",
Package: "prometheus",
Entries: []declcfg.ChannelEntry{
{
Name: "prometheus-operator.0.47.0",
},
},
},
},
Bundles: []declcfg.Bundle{
{
Schema: "olm.bundle",
Name: "prometheus-operator.0.47.0",
Package: "prometheus",
Image: "localhost/testdata/bundles/registry-v1/prometheus-operator:v0.47.0",
Properties: []property.Property{
{
Type: "olm.package",
Value: rawMessage,
},
},
},
},
}
Eventually(func() (bool, error) {

// Get logs of the Pod
pod = pods.Items[0]
logReader, err := kubeClient.CoreV1().Pods(defaultSystemNamespace).GetLogs(pod.Name, &corev1.PodLogOptions{}).Stream(context.Background())
if err != nil {
return false, err
}

cfg, err := declcfg.LoadReader(logReader)
Expect(err).To(Not(HaveOccurred()))
return cmp.Diff(cfg, &expectedDeclCfg) == "", nil
}).Should(BeTrue())
})
AfterEach(func() {
Expect(c.Delete(ctx, catalog)).To(Succeed())
Eventually(func(g Gomega) {
err = c.Get(ctx, types.NamespacedName{Name: catalog.Name}, &catalogd.Catalog{})
g.Expect(errors.IsNotFound(err)).To(BeTrue())
Expand Down

0 comments on commit 1f54620

Please sign in to comment.