Skip to content

Commit

Permalink
Serve locally stored fbc content via an http server
Browse files Browse the repository at this point in the history
Closes operator-framework#113

Signed-off-by: Anik <[email protected]>
  • Loading branch information
anik120 committed Sep 6, 2023
1 parent d30f161 commit 45046c7
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 18 deletions.
12 changes: 12 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"flag"
"fmt"
"net/url"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -39,6 +40,7 @@ import (
corecontrollers "github.com/operator-framework/catalogd/pkg/controllers/core"
"github.com/operator-framework/catalogd/pkg/features"
"github.com/operator-framework/catalogd/pkg/profile"
"github.com/operator-framework/catalogd/pkg/server"
"github.com/operator-framework/catalogd/pkg/storage"

//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -67,6 +69,7 @@ func main() {
catalogdVersion bool
systemNamespace string
storageDir string
httpExternalAddr string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -77,6 +80,7 @@ func main() {
flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images")
flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads")
flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from")
flag.StringVar(&httpExternalAddr, "http-external-address", "http://localhost:8080", "The external address at which the http server is reachable.")
flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof")
flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit")
opts := zap.Options{
Expand Down Expand Up @@ -122,6 +126,14 @@ func main() {
if err := os.MkdirAll(storageDir, 0700); err != nil {
setupLog.Error(err, "unable to create storage directory for catalogs")
}
serverURL, err := url.Parse(fmt.Sprintf("%s/catalogs/", httpExternalAddr))
if err != nil {
setupLog.Error(err, "unable to parse bundle content server URL")
os.Exit(1)
}
server := server.Instance{StorageDir: storageDir, URL: *serverURL}
mgr.AddMetricsExtraHandler("/catalogs/", server.CatalogServerHandler())

if err = (&corecontrollers.CatalogReconciler{
Client: mgr.GetClient(),
Unpacker: unpacker,
Expand Down
3 changes: 2 additions & 1 deletion config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ spec:
- "--metrics-bind-address=127.0.0.1:8080"
- "--leader-elect"
- "--catalogs-storage-dir=/var/cache/catalogs"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=false"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=true"
- "--http-external-address=https://core.catalogd-system.svc"
14 changes: 3 additions & 11 deletions config/rbac/auth_proxy_service.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
apiVersion: v1
kind: Service
metadata:
labels:
control-plane: controller-manager
app.kubernetes.io/name: service
app.kubernetes.io/instance: controller-manager-metrics-service
app.kubernetes.io/component: kube-rbac-proxy
app.kubernetes.io/created-by: catalogd
app.kubernetes.io/part-of: catalogd
app.kubernetes.io/managed-by: kustomize
name: controller-manager-metrics-service
name: core
namespace: system
spec:
ports:
- name: https
port: 8443
port: 443
protocol: TCP
targetPort: https
targetPort: 8443
selector:
control-plane: controller-manager
9 changes: 9 additions & 0 deletions config/rbac/catalog_reader_client_clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: catalog-reader
rules:
- nonResourceURLs:
- /catalogs/*
verbs:
- get
1 change: 1 addition & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ resources:
- role_binding.yaml
- leader_election_role.yaml
- leader_election_role_binding.yaml
- catalog_reader_client_clusterrole.yaml
# Comment the following 4 lines if you want to disable
# the auth proxy (https://github.com/brancz/kube-rbac-proxy)
# which protects your /metrics endpoint.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/onsi/ginkgo/v2 v2.9.7
github.com/onsi/gomega v1.27.7
github.com/operator-framework/operator-registry v1.27.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
k8s.io/api v0.26.1
k8s.io/apimachinery v0.26.1
Expand Down Expand Up @@ -63,6 +62,7 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
Expand Down
36 changes: 36 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package server

import (
"io/fs"
"net/http"
"net/url"
"os"
)

type Instance struct {
StorageDir string
URL url.URL
}

func (i Instance) CatalogServerHandler() http.Handler {
return http.StripPrefix(i.URL.Path, http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(i.StorageDir)})))
}

type filesOnlyFilesystem struct {
FS fs.FS
}

func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) {
file, err := f.FS.Open(name)
if err != nil {
return nil, err
}
stat, err := file.Stat()
if err != nil {
return nil, err
}
if !stat.Mode().IsRegular() {
return nil, os.ErrNotExist
}
return file, nil
}
19 changes: 15 additions & 4 deletions test/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -15,8 +19,11 @@ import (
)

var (
cfg *rest.Config
c client.Client
cfg *rest.Config
c client.Client
err error
defaultSystemNamespace = "catalogd-system"
kubeClient kubernetes.Interface
)

func TestE2E(t *testing.T) {
Expand All @@ -30,8 +37,12 @@ var _ = BeforeSuite(func() {
cfg = ctrl.GetConfigOrDie()

scheme := runtime.NewScheme()
err := catalogd.AddToScheme(scheme)
Expect(err).ToNot(HaveOccurred())
Expect(catalogd.AddToScheme(scheme)).To(Succeed())
Expect(corev1.AddToScheme(scheme)).To(Succeed())
Expect(rbacv1.AddToScheme(scheme)).To(Succeed())
Expect(batchv1.AddToScheme(scheme)).To(Succeed())
c, err = client.New(cfg, client.Options{Scheme: scheme})
Expect(err).To(Not(HaveOccurred()))
kubeClient, err = kubernetes.NewForConfig(cfg)
Expect(err).ToNot(HaveOccurred())
})
135 changes: 134 additions & 1 deletion test/e2e/unpack_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package e2e

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"

"github.com/google/go-cmp/cmp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/operator-framework/operator-registry/alpha/declcfg"
"github.com/operator-framework/operator-registry/alpha/property"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -169,8 +174,136 @@ var _ = Describe("Catalog Unpacking", func() {
err = c.Get(ctx, client.ObjectKeyFromObject(expectedBndl), bndl)
Expect(err).ToNot(HaveOccurred())
Expect(expectedBndl).To(komega.EqualObject(bndl, komega.IgnoreAutogeneratedMetadata))
})

By("Making sure the catalog content is available via the http server")
// Create a temporary ServiceAccount
var (
sa corev1.ServiceAccount
crb rbacv1.ClusterRoleBinding
job batchv1.Job
pod corev1.Pod
)
sa = corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "test-svr-sa",
Namespace: defaultSystemNamespace,
},
}
err = c.Create(ctx, &sa)
Expect(err).ToNot(HaveOccurred())

// Create a temporary ClusterRoleBinding to bind the ServiceAccount to catalog-reader ClusterRole
crb = rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "test-svr-crb",
Namespace: defaultSystemNamespace,
},

Subjects: []rbacv1.Subject{{Kind: "ServiceAccount", Name: "test-svr-sa", Namespace: defaultSystemNamespace}},
RoleRef: rbacv1.RoleRef{APIGroup: "rbac.authorization.k8s.io", Kind: "ClusterRole", Name: "catalogd-catalog-reader"},
}

err = c.Create(ctx, &crb)
Expect(err).ToNot(HaveOccurred())
mounttoken := true
job = batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-svr-job",
Namespace: defaultSystemNamespace,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-svr",
Image: "curlimages/curl",
Command: []string{"sh", "-c", "curl -sSLk -H \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" -o - " + "https://catalogd-core.catalogd-system.svc/catalogs/test-catalog/all.json"},
},
},
ServiceAccountName: "test-svr-sa",
RestartPolicy: "Never",
AutomountServiceAccountToken: &mounttoken,
},
},
},
}
err = c.Create(ctx, &job)
Expect(err).ToNot(HaveOccurred())
Eventually(func() (bool, error) {
err = c.Get(ctx, types.NamespacedName{Name: "test-svr-job", Namespace: defaultSystemNamespace}, &job)
if err != nil {
return false, err
}
return job.Status.CompletionTime != nil && job.Status.Succeeded == 1, err
}).Should(BeTrue())
pods := &corev1.PodList{}
Eventually(func() (bool, error) {
err := c.List(context.Background(), pods, client.MatchingLabels{"job-name": "test-svr-job"})
if err != nil {
return false, err
}
return len(pods.Items) == 1, nil
}).Should(BeTrue())

type value struct {
PackageName string `json:"packageName"`
Version string `json:"version"`
}
rawMessage, err := json.Marshal(value{PackageName: "prometheus", Version: "0.47.0"})
Expect(err).To(Not(HaveOccurred()))
expectedDeclCfg := declcfg.DeclarativeConfig{
Packages: []declcfg.Package{
{
Schema: "olm.package",
Name: "prometheus",
DefaultChannel: "beta",
},
},
Channels: []declcfg.Channel{
{
Schema: "olm.channel",
Name: "beta",
Package: "prometheus",
Entries: []declcfg.ChannelEntry{
{
Name: "prometheus-operator.0.47.0",
},
},
},
},
Bundles: []declcfg.Bundle{
{
Schema: "olm.bundle",
Name: "prometheus-operator.0.47.0",
Package: "prometheus",
Image: "localhost/testdata/bundles/registry-v1/prometheus-operator:v0.47.0",
Properties: []property.Property{
{
Type: "olm.package",
Value: rawMessage,
},
},
},
},
}
Eventually(func() (bool, error) {

// Get logs of the Pod
pod = pods.Items[0]
logReader, err := kubeClient.CoreV1().Pods(defaultSystemNamespace).GetLogs(pod.Name, &corev1.PodLogOptions{}).Stream(context.Background())
if err != nil {
return false, err
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(logReader)
Expect(err).ToNot(HaveOccurred())
cfg, err := declcfg.LoadReader(buf)
Expect(err).To(Not(HaveOccurred()))
Expect(err).To(Not(HaveOccurred()))
return cmp.Diff(cfg, &expectedDeclCfg) == "", nil
}).Should(BeTrue())
})
AfterEach(func() {
err := c.Delete(ctx, catalog)
Expect(err).ToNot(HaveOccurred())
Expand Down

0 comments on commit 45046c7

Please sign in to comment.