Skip to content

Commit

Permalink
Serve locally stored fbc content via an http server (#148)
Browse files Browse the repository at this point in the history
Closes #113

Signed-off-by: Anik <[email protected]>
  • Loading branch information
anik120 authored Sep 8, 2023
1 parent d30f161 commit 966a4d6
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 16 deletions.
24 changes: 23 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -35,6 +37,7 @@ import (
"github.com/spf13/pflag"

"github.com/operator-framework/catalogd/internal/source"
"github.com/operator-framework/catalogd/internal/third_party/server"
"github.com/operator-framework/catalogd/internal/version"
corecontrollers "github.com/operator-framework/catalogd/pkg/controllers/core"
"github.com/operator-framework/catalogd/pkg/features"
Expand Down Expand Up @@ -67,6 +70,7 @@ func main() {
catalogdVersion bool
systemNamespace string
storageDir string
catalogServerAddr string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -77,6 +81,7 @@ func main() {
flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images")
flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads")
flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from")
flag.StringVar(&catalogServerAddr, "catalogs-server-addr", ":8083", "The address where the unpacked catalogs' content will be accessible")
flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof")
flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit")
opts := zap.Options{
Expand Down Expand Up @@ -122,10 +127,27 @@ func main() {
if err := os.MkdirAll(storageDir, 0700); err != nil {
setupLog.Error(err, "unable to create storage directory for catalogs")
}
localStorage := storage.LocalDir{RootDir: storageDir}
shutdownTimeout := 30 * time.Second
catalogServer := server.Server{
Kind: "catalogs",
Server: &http.Server{
Addr: catalogServerAddr,
Handler: localStorage.StorageServerHandler(),
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
},
ShutdownTimeout: &shutdownTimeout,
}
if err := mgr.Add(&catalogServer); err != nil {
setupLog.Error(err, "unable to start catalog server")
os.Exit(1)
}

if err = (&corecontrollers.CatalogReconciler{
Client: mgr.GetClient(),
Unpacker: unpacker,
Storage: storage.LocalDir{RootDir: storageDir},
Storage: localStorage,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Catalog")
os.Exit(1)
Expand Down
3 changes: 2 additions & 1 deletion config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ spec:
- "--metrics-bind-address=127.0.0.1:8080"
- "--leader-elect"
- "--catalogs-storage-dir=/var/cache/catalogs"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=false"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=true"

13 changes: 13 additions & 0 deletions config/rbac/catalogserver_service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: catalogserver
namespace: system
spec:
selector:
control-plane: controller-manager
ports:
- name: http
protocol: TCP
port: 80
targetPort: 8083
1 change: 1 addition & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ resources:
- role_binding.yaml
- leader_election_role.yaml
- leader_election_role_binding.yaml
- catalogserver_service.yaml
# Comment the following 4 lines if you want to disable
# the auth proxy (https://github.com/brancz/kube-rbac-proxy)
# which protects your /metrics endpoint.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.20

require (
github.com/blang/semver/v4 v4.0.0
github.com/go-logr/logr v1.2.4
github.com/google/go-cmp v0.5.9
github.com/nlepage/go-tarfs v1.1.0
github.com/onsi/ginkgo/v2 v2.9.7
Expand Down Expand Up @@ -31,7 +32,6 @@ require (
github.com/go-git/gcfg v1.5.0 // indirect
github.com/go-git/go-billy/v5 v5.4.1 // indirect
github.com/go-git/go-git/v5 v5.4.2 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
Expand Down
122 changes: 122 additions & 0 deletions internal/third_party/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// this is copied from https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/77b08a845e451b695cfa25b79ebe277d85064345/pkg/manager/server.go
// we will remove this once we update to a version of controller-runitme that has this included
// https://github.com/kubernetes-sigs/controller-runtime/pull/2473

package server

import (
"context"
"errors"
"net"
"net/http"
"time"

"github.com/go-logr/logr"

crlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var (
_ manager.Runnable = (*Server)(nil)
_ manager.LeaderElectionRunnable = (*Server)(nil)
)

// Server is a general purpose HTTP server Runnable for a manager.
// It is used to serve some internal handlers for health probes and profiling,
// but it can also be used to run custom servers.
type Server struct {
// Kind is an optional string that describes the purpose of the server. It is used in logs to distinguish
// among multiple servers.
Kind string

// Log is the logger used by the server. If not set, a logger will be derived from the context passed to Start.
Log logr.Logger

// Server is the HTTP server to run. It is required.
Server *http.Server

// Listener is an optional listener to use. If not set, the server start a listener using the server.Addr.
// Using a listener is useful when the port reservation needs to happen in advance of this runnable starting.
Listener net.Listener

// OnlyServeWhenLeader is an optional bool that indicates that the server should only be started when the manager is the leader.
OnlyServeWhenLeader bool

// ShutdownTimeout is an optional duration that indicates how long to wait for the server to shutdown gracefully. If not set,
// the server will wait indefinitely for all connections to close.
ShutdownTimeout *time.Duration
}

// Start starts the server. It will block until the server is stopped or an error occurs.
func (s *Server) Start(ctx context.Context) error {
log := s.Log
if log.GetSink() == nil {
log = crlog.FromContext(ctx)
}
if s.Kind != "" {
log = log.WithValues("kind", s.Kind)
}
log = log.WithValues("addr", s.addr())

serverShutdown := make(chan struct{})
go func() {
<-ctx.Done()
log.Info("shutting down server")

shutdownCtx := context.Background()
if s.ShutdownTimeout != nil {
var shutdownCancel context.CancelFunc
shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), *s.ShutdownTimeout)
defer shutdownCancel()
}

if err := s.Server.Shutdown(shutdownCtx); err != nil {
log.Error(err, "error shutting down server")
}
close(serverShutdown)
}()

log.Info("starting server")
if err := s.serve(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}

<-serverShutdown
return nil
}

// NeedLeaderElection returns true if the server should only be started when the manager is the leader.
func (s *Server) NeedLeaderElection() bool {
return s.OnlyServeWhenLeader
}

func (s *Server) addr() string {
if s.Listener != nil {
return s.Listener.Addr().String()
}
return s.Server.Addr
}

func (s *Server) serve() error {
if s.Listener != nil {
return s.Server.Serve(s.Listener)
}
return s.Server.ListenAndServe()
}
5 changes: 5 additions & 0 deletions pkg/controllers/core/catalog_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/fs"
"net/http"
"os"
"testing/fstest"

Expand Down Expand Up @@ -67,6 +68,10 @@ func (m MockStore) Delete(_ string) error {
return nil
}

func (m MockStore) StorageServerHandler() http.Handler {
panic("not needed")
}

var _ = Describe("Catalogd Controller Test", func() {
format.MaxLength = 0
var (
Expand Down
34 changes: 34 additions & 0 deletions pkg/storage/localdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"fmt"
"io/fs"
"net/http"
"os"
"path/filepath"

Expand Down Expand Up @@ -45,3 +46,36 @@ func (s LocalDir) Store(catalog string, fsys fs.FS) error {
func (s LocalDir) Delete(catalog string) error {
return os.RemoveAll(filepath.Join(s.RootDir, catalog))
}

func (s LocalDir) StorageServerHandler() http.Handler {
mux := http.NewServeMux()
mux.Handle("/catalogs/", http.StripPrefix("/catalogs/", http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)}))))
return mux
}

// filesOnlyFilesystem is a file system that can open only regular
// files from the underlying filesystem. All other file types result
// in os.ErrNotExists
type filesOnlyFilesystem struct {
FS fs.FS
}

// Open opens a named file from the underlying filesystem. If the file
// is not a regular file, it return os.ErrNotExists. Callers are resposible
// for closing the file returned.
func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) {
file, err := f.FS.Open(name)
if err != nil {
return nil, err
}
stat, err := file.Stat()
if err != nil {
_ = file.Close()
return nil, err
}
if !stat.Mode().IsRegular() {
_ = file.Close()
return nil, os.ErrNotExist
}
return file, nil
}
67 changes: 66 additions & 1 deletion pkg/storage/localdir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package storage

import (
"fmt"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing/fstest"
Expand Down Expand Up @@ -34,8 +37,8 @@ var _ = Describe("LocalDir Storage Test", func() {
)
BeforeEach(func() {
d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache")
rootDir = d
Expect(err).ToNot(HaveOccurred())
rootDir = d

store = LocalDir{RootDir: rootDir}
unpackResultFS = &fstest.MapFS{
Expand Down Expand Up @@ -76,6 +79,68 @@ var _ = Describe("LocalDir Storage Test", func() {
})
})

var _ = Describe("LocalDir Server Handler tests", func() {
var (
testServer *httptest.Server
store LocalDir
)
BeforeEach(func() {
d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache")
Expect(err).ToNot(HaveOccurred())
Expect(os.Mkdir(filepath.Join(d, "test-catalog"), 0700)).To(Succeed())
store = LocalDir{RootDir: d}
testServer = httptest.NewServer(store.StorageServerHandler())

})
It("gets 404 for the path /", func() {
expectNotFound(testServer.URL)
})
It("gets 404 for the path /catalogs/", func() {
expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/"))
})
It("gets 404 for the path /catalogs/test-catalog/", func() {
expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/"))
})
It("gets 404 for the path /test-catalog/foo.txt", func() {
// This ensures that even if the file exists, the URL must contain the /catalogs/ prefix
Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), []byte("bar"), 0600)).To(Succeed())
expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/test-catalog/foo.txt"))
})
It("gets 404 for the path /catalogs/test-catalog/non-existent.txt", func() {
expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/non-existent.txt"))
})
It("gets 200 for the path /catalogs/foo.txt", func() {
expectedContent := []byte("bar")
Expect(os.WriteFile(filepath.Join(store.RootDir, "foo.txt"), expectedContent, 0600)).To(Succeed())
expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/foo.txt"), expectedContent)
})
It("gets 200 for the path /catalogs/test-catalog/foo.txt", func() {
expectedContent := []byte("bar")
Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), expectedContent, 0600)).To(Succeed())
expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/foo.txt"), expectedContent)
})
AfterEach(func() {
testServer.Close()
})
})

func expectNotFound(url string) {
resp, err := http.Get(url) //nolint:gosec
Expect(err).To(Not(HaveOccurred()))
Expect(resp.StatusCode).To(Equal(http.StatusNotFound))
Expect(resp.Body.Close()).To(Succeed())
}

func expectFound(url string, expectedContent []byte) {
resp, err := http.Get(url) //nolint:gosec
Expect(err).To(Not(HaveOccurred()))
Expect(resp.StatusCode).To(Equal(http.StatusOK))
actualContent, err := io.ReadAll(resp.Body)
Expect(err).To(Not(HaveOccurred()))
Expect(actualContent).To(Equal(expectedContent))
Expect(resp.Body.Close()).To(Succeed())
}

const testBundleTemplate = `---
image: %s
name: %s
Expand Down
Loading

0 comments on commit 966a4d6

Please sign in to comment.