Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for multiple clusters #2

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions charts/reports-server/templates/cluster-roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ metadata:
rbac.authorization.k8s.io/aggregate-to-view: 'true'
{{- include "reports-server.labels" . | nindent 4 }}
rules:
- apiGroups:
- ''
resources:
- namespaces
verbs:
- get
- list
- apiGroups:
- reports.kyverno.io
resources:
Expand Down
7 changes: 7 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ metadata:
app.kubernetes.io/version: "v0.1.0-alpha.1"
app.kubernetes.io/managed-by: Helm
rules:
- apiGroups:
- ''
resources:
- namespaces
verbs:
- get
- list
- apiGroups:
- reports.kyverno.io
resources:
Expand Down
30 changes: 26 additions & 4 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package server

import (
"context"
"net/http"

"github.com/nirmata/reports-server/pkg/api"
"github.com/nirmata/reports-server/pkg/storage"
"github.com/nirmata/reports-server/pkg/storage/db"
"github.com/kyverno/reports-server/pkg/api"

Check failure on line 7 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.26 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/api; to add it:

Check failure on line 7 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / unit-tests

no required module provides package github.com/kyverno/reports-server/pkg/api; to add it:

Check failure on line 7 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / required

no required module provides package github.com/kyverno/reports-server/pkg/api; to add it:

Check failure on line 7 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.27 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/api; to add it:

Check failure on line 7 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.28 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/api; to add it:

Check failure on line 7 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.29 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/api; to add it:
"github.com/kyverno/reports-server/pkg/storage"

Check failure on line 8 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.26 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/storage; to add it:

Check failure on line 8 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / unit-tests

no required module provides package github.com/kyverno/reports-server/pkg/storage; to add it:

Check failure on line 8 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / required

no required module provides package github.com/kyverno/reports-server/pkg/storage; to add it:

Check failure on line 8 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.27 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/storage; to add it:

Check failure on line 8 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.28 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/storage; to add it:

Check failure on line 8 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.29 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/storage; to add it:
"github.com/kyverno/reports-server/pkg/storage/db"

Check failure on line 9 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.26 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/storage/db; to add it:

Check failure on line 9 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / unit-tests

no required module provides package github.com/kyverno/reports-server/pkg/storage/db; to add it:

Check failure on line 9 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / required

no required module provides package github.com/kyverno/reports-server/pkg/storage/db; to add it:

Check failure on line 9 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.27 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/storage/db; to add it:

Check failure on line 9 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.28 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/storage/db; to add it:

Check failure on line 9 in pkg/server/config.go

View workflow job for this annotation

GitHub Actions / v1.29 - ^reports$

no required module provides package github.com/kyverno/reports-server/pkg/storage/db; to add it:
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
Expand All @@ -34,7 +37,11 @@
}
genericServer.Handler.NonGoRestfulMux.HandleFunc("/metrics", metricsHandler)

store, err := storage.New(c.Debug, c.DBconfig)
id, err := c.getClusterId()
if err != nil {
return nil, err
}
store, err := storage.New(c.Debug, c.DBconfig, id)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -69,3 +76,18 @@
metrics.HandlerFor(registry, metrics.HandlerOpts{}).ServeHTTP(w, req)
}, nil
}

func (c Config) getClusterId() (string, error) {
clientset, err := kubernetes.NewForConfig(c.Rest)
if err != nil {
return "", err
}

// Kubernetes clusters do not have a uid. The uid of kubesystem namespace does not change and is commonly accepted as the id of the cluster
ns, err := clientset.CoreV1().Namespaces().Get(context.Background(), "kube-system", metav1.GetOptions{})
if err != nil {
return "", err
}

return string(ns.GetUID()), nil
}
25 changes: 16 additions & 9 deletions pkg/storage/db/cephr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@ import (

type cephr struct {
sync.Mutex
db *sql.DB
clusterId string
db *sql.DB
}

func NewClusterEphemeralReportStore(db *sql.DB) (api.ClusterEphemeralReportsInterface, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS clusterephemeralreports (name VARCHAR NOT NULL, report JSONB NOT NULL, PRIMARY KEY(name))")
func NewClusterEphemeralReportStore(db *sql.DB, clusterId string) (api.ClusterEphemeralReportsInterface, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS clusterephemeralreports (name VARCHAR NOT NULL, clusterId VARCHAR NOT NULL, report JSONB NOT NULL, PRIMARY KEY(name, clusterId))")
if err != nil {
klog.ErrorS(err, "failed to create table")
return nil, err
}

return &cephr{db: db}, nil
_, err = db.Exec("CREATE INDEX IF NOT EXISTS clusterephemeralreportcluster ON clusterephemeralreports(clusterId)")
if err != nil {
klog.ErrorS(err, "failed to create index")
return nil, err
}

return &cephr{db: db, clusterId: clusterId}, nil
}

func (c *cephr) List(ctx context.Context) ([]reportsv1.ClusterEphemeralReport, error) {
Expand All @@ -35,7 +42,7 @@ func (c *cephr) List(ctx context.Context) ([]reportsv1.ClusterEphemeralReport, e
res := make([]reportsv1.ClusterEphemeralReport, 0)
var jsonb string

rows, err := c.db.Query("SELECT report FROM clusterephemeralreports")
rows, err := c.db.Query("SELECT report FROM clusterephemeralreports WHERE (clusterId = $1)", c.clusterId)
if err != nil {
klog.ErrorS(err, "failed to list clusterephemeralreports")
return nil, fmt.Errorf("clusterephemeralreports list: %v", err)
Expand Down Expand Up @@ -65,7 +72,7 @@ func (c *cephr) Get(ctx context.Context, name string) (reportsv1.ClusterEphemera

var jsonb string

row := c.db.QueryRow("SELECT report FROM clusterephemeralreports WHERE (name = $1)", name)
row := c.db.QueryRow("SELECT report FROM clusterephemeralreports WHERE (name = $1) AND (clusterId = $2)", name, c.clusterId)
if err := row.Scan(&jsonb); err != nil {
klog.ErrorS(err, fmt.Sprintf("clusterephemeralreport not found name=%s", name))
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -93,7 +100,7 @@ func (c *cephr) Create(ctx context.Context, cephr reportsv1.ClusterEphemeralRepo
return err
}

_, err = c.db.Exec("INSERT INTO clusterephemeralreports (name, report) VALUES ($1, $2)", cephr.Name, string(jsonb))
_, err = c.db.Exec("INSERT INTO clusterephemeralreports (name, report, clusterId) VALUES ($1, $2, $3)", cephr.Name, string(jsonb), c.clusterId)
if err != nil {
klog.ErrorS(err, "failed to crate cephr")
return fmt.Errorf("create clusterephemeralreport: %v", err)
Expand All @@ -110,7 +117,7 @@ func (c *cephr) Update(ctx context.Context, cephr reportsv1.ClusterEphemeralRepo
return err
}

_, err = c.db.Exec("UPDATE clusterephemeralreports SET report = $1 WHERE (name = $2)", string(jsonb), cephr.Name)
_, err = c.db.Exec("UPDATE clusterephemeralreports SET report = $1 WHERE (name = $2) AND (clusterId = $3)", string(jsonb), cephr.Name, c.clusterId)
if err != nil {
klog.ErrorS(err, "failed to updates cephr")
return fmt.Errorf("update clusterephemeralreport: %v", err)
Expand All @@ -122,7 +129,7 @@ func (c *cephr) Delete(ctx context.Context, name string) error {
c.Lock()
defer c.Unlock()

_, err := c.db.Exec("DELETE FROM clusterephemeralreports WHERE (name = $1)", name)
_, err := c.db.Exec("DELETE FROM clusterephemeralreports WHERE (name = $1) AND (clusterId = $2)", name, c.clusterId)
if err != nil {
klog.ErrorS(err, "failed to delete cephr")
return fmt.Errorf("delete clusterephemeralreport: %v", err)
Expand Down
25 changes: 16 additions & 9 deletions pkg/storage/db/cpolr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@ import (

type cpolrdb struct {
sync.Mutex
db *sql.DB
db *sql.DB
clusterId string
}

func NewClusterPolicyReportStore(db *sql.DB) (api.ClusterPolicyReportsInterface, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS clusterpolicyreports (name VARCHAR NOT NULL, report JSONB NOT NULL, PRIMARY KEY(name))")
func NewClusterPolicyReportStore(db *sql.DB, clusterId string) (api.ClusterPolicyReportsInterface, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS clusterpolicyreports (name VARCHAR NOT NULL, clusterId VARCHAR NOT NULL, report JSONB NOT NULL, PRIMARY KEY(name, clusterId))")
if err != nil {
klog.ErrorS(err, "failed to create table")
return nil, err
}

return &cpolrdb{db: db}, nil
_, err = db.Exec("CREATE INDEX IF NOT EXISTS clusterpolicyreportcluster ON clusterpolicyreports(clusterId)")
if err != nil {
klog.ErrorS(err, "failed to create index")
return nil, err
}

return &cpolrdb{db: db, clusterId: clusterId}, nil
}

func (c *cpolrdb) List(ctx context.Context) ([]v1alpha2.ClusterPolicyReport, error) {
Expand All @@ -35,7 +42,7 @@ func (c *cpolrdb) List(ctx context.Context) ([]v1alpha2.ClusterPolicyReport, err
res := make([]v1alpha2.ClusterPolicyReport, 0)
var jsonb string

rows, err := c.db.Query("SELECT report FROM clusterpolicyreports")
rows, err := c.db.Query("SELECT report FROM clusterpolicyreports WHERE clusterId = $1", c.clusterId)
if err != nil {
klog.ErrorS(err, "failed to list clusterpolicyreports")
return nil, fmt.Errorf("clusterpolicyreport list: %v", err)
Expand Down Expand Up @@ -65,7 +72,7 @@ func (c *cpolrdb) Get(ctx context.Context, name string) (v1alpha2.ClusterPolicyR

var jsonb string

row := c.db.QueryRow("SELECT report FROM clusterpolicyreports WHERE (name = $1)", name)
row := c.db.QueryRow("SELECT report FROM clusterpolicyreports WHERE (name = $1) AND (clusterId = $2)", name, c.clusterId)
if err := row.Scan(&jsonb); err != nil {
klog.ErrorS(err, fmt.Sprintf("clusterpolicyreport not found name=%s", name))
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -93,7 +100,7 @@ func (c *cpolrdb) Create(ctx context.Context, cpolr v1alpha2.ClusterPolicyReport
return err
}

_, err = c.db.Exec("INSERT INTO clusterpolicyreports (name, report) VALUES ($1, $2)", cpolr.Name, string(jsonb))
_, err = c.db.Exec("INSERT INTO clusterpolicyreports (name, report, clusterId) VALUES ($1, $2, $3)", cpolr.Name, string(jsonb), c.clusterId)
if err != nil {
klog.ErrorS(err, "failed to crate cpolr")
return fmt.Errorf("create clusterpolicyreport: %v", err)
Expand All @@ -110,7 +117,7 @@ func (c *cpolrdb) Update(ctx context.Context, cpolr v1alpha2.ClusterPolicyReport
return err
}

_, err = c.db.Exec("UPDATE clusterpolicyreports SET report = $1 WHERE (name = $2)", string(jsonb), cpolr.Name)
_, err = c.db.Exec("UPDATE clusterpolicyreports SET report = $1 WHERE (name = $2) AND (clusterId = $3)", string(jsonb), cpolr.Name, c.clusterId)
if err != nil {
klog.ErrorS(err, "failed to updates cpolr")
return fmt.Errorf("update clusterpolicyreport: %v", err)
Expand All @@ -122,7 +129,7 @@ func (c *cpolrdb) Delete(ctx context.Context, name string) error {
c.Lock()
defer c.Unlock()

_, err := c.db.Exec("DELETE FROM clusterpolicyreports WHERE (name = $1)", name)
_, err := c.db.Exec("DELETE FROM clusterpolicyreports WHERE (name = $1) AND (clusterId = $2)", name, c.clusterId)
if err != nil {
klog.ErrorS(err, "failed to delete cpolr")
return fmt.Errorf("delete clusterpolicyreport: %v", err)
Expand Down
27 changes: 17 additions & 10 deletions pkg/storage/db/ephr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (

type ephrdb struct {
sync.Mutex
db *sql.DB
db *sql.DB
clusterId string
}

func NewEphemeralReportStore(db *sql.DB) (api.EphemeralReportsInterface, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS ephemeralreports (name VARCHAR NOT NULL, namespace VARCHAR NOT NULL, report JSONB NOT NULL, PRIMARY KEY(name, namespace))")
func NewEphemeralReportStore(db *sql.DB, clusterId string) (api.EphemeralReportsInterface, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS ephemeralreports (name VARCHAR NOT NULL, namespace VARCHAR NOT NULL, clusterId VARCHAR NOT NULL, report JSONB NOT NULL, PRIMARY KEY(name, namespace, clusterId))")
if err != nil {
klog.ErrorS(err, "failed to create table")
return nil, err
Expand All @@ -29,7 +30,13 @@ func NewEphemeralReportStore(db *sql.DB) (api.EphemeralReportsInterface, error)
klog.ErrorS(err, "failed to create index")
return nil, err
}
return &ephrdb{db: db}, nil

_, err = db.Exec("CREATE INDEX IF NOT EXISTS ephemeralreportcluster ON ephemeralreports(clusterId)")
if err != nil {
klog.ErrorS(err, "failed to create index")
return nil, err
}
return &ephrdb{db: db, clusterId: clusterId}, nil
}

func (p *ephrdb) List(ctx context.Context, namespace string) ([]reportsv1.EphemeralReport, error) {
Expand All @@ -43,9 +50,9 @@ func (p *ephrdb) List(ctx context.Context, namespace string) ([]reportsv1.Epheme
var err error

if len(namespace) == 0 {
rows, err = p.db.Query("SELECT report FROM ephemeralreports")
rows, err = p.db.Query("SELECT report FROM ephemeralreports WHERE clusterId = $1", p.clusterId)
} else {
rows, err = p.db.Query("SELECT report FROM ephemeralreports WHERE namespace = $1", namespace)
rows, err = p.db.Query("SELECT report FROM ephemeralreports WHERE namespace = $1 AND clusterId = $2", namespace, p.clusterId)
}
if err != nil {
klog.ErrorS(err, "ephemeralreport list: ")
Expand Down Expand Up @@ -76,7 +83,7 @@ func (p *ephrdb) Get(ctx context.Context, name, namespace string) (reportsv1.Eph

var jsonb string

row := p.db.QueryRow("SELECT report FROM ephemeralreports WHERE (namespace = $1) AND (name = $2)", namespace, name)
row := p.db.QueryRow("SELECT report FROM ephemeralreports WHERE (namespace = $1) AND (name = $2) AND (clusterId = $3)", namespace, name, p.clusterId)
if err := row.Scan(&jsonb); err != nil {
klog.ErrorS(err, fmt.Sprintf("ephemeralreport not found name=%s namespace=%s", name, namespace))
if err == sql.ErrNoRows {
Expand All @@ -103,7 +110,7 @@ func (p *ephrdb) Create(ctx context.Context, polr reportsv1.EphemeralReport) err
return err
}

_, err = p.db.Exec("INSERT INTO ephemeralreports (name, namespace, report) VALUES ($1, $2, $3)", polr.Name, polr.Namespace, string(jsonb))
_, err = p.db.Exec("INSERT INTO ephemeralreports (name, namespace, report, clusterId) VALUES ($1, $2, $3, $4)", polr.Name, polr.Namespace, string(jsonb), p.clusterId)
if err != nil {
klog.ErrorS(err, "failed to create ephemeral report")
return fmt.Errorf("create ephemeralreport: %v", err)
Expand All @@ -120,7 +127,7 @@ func (p *ephrdb) Update(ctx context.Context, polr reportsv1.EphemeralReport) err
return err
}

_, err = p.db.Exec("UPDATE ephemeralreports SET report = $1 WHERE (namespace = $2) AND (name = $3)", string(jsonb), polr.Namespace, polr.Name)
_, err = p.db.Exec("UPDATE ephemeralreports SET report = $1 WHERE (namespace = $2) AND (name = $3) AND (clusterId = $4)", string(jsonb), polr.Namespace, polr.Name, p.clusterId)
if err != nil {
klog.ErrorS(err, "failed to update ephemeral report")
return fmt.Errorf("update ephemeralreport: %v", err)
Expand All @@ -132,7 +139,7 @@ func (p *ephrdb) Delete(ctx context.Context, name, namespace string) error {
p.Lock()
defer p.Unlock()

_, err := p.db.Exec("DELETE FROM ephemeralreports WHERE (namespace = $1) AND (name = $2)", namespace, name)
_, err := p.db.Exec("DELETE FROM ephemeralreports WHERE (namespace = $1) AND (name = $2) AND (clusterId = $3)", namespace, name, p.clusterId)
if err != nil {
klog.ErrorS(err, "failed to delete ephemeral report")
return fmt.Errorf("delete ephemeralreport: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/db/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
sleepDuration = 15 * time.Second
)

func New(config *PostgresConfig) (api.Storage, error) {
func New(config *PostgresConfig, clusterId string) (api.Storage, error) {
klog.Infof("starting postgres db, config: %s", config.String())
db, err := sql.Open("postgres", config.String())
if err != nil {
Expand All @@ -43,25 +43,25 @@ func New(config *PostgresConfig) (api.Storage, error) {
klog.Info("successfully connected to db")

klog.Info("starting reports store")
polrstore, err := NewPolicyReportStore(db)
polrstore, err := NewPolicyReportStore(db, clusterId)
if err != nil {
klog.Error("failed to start policy report store", err.Error())
return nil, err
}

cpolrstore, err := NewClusterPolicyReportStore(db)
cpolrstore, err := NewClusterPolicyReportStore(db, clusterId)
if err != nil {
klog.Error("failed to start cluster policy report store", err.Error())
return nil, err
}

ephrstore, err := NewEphemeralReportStore(db)
ephrstore, err := NewEphemeralReportStore(db, clusterId)
if err != nil {
klog.Error("failed to start policy report store", err.Error())
return nil, err
}

cephrstore, err := NewClusterEphemeralReportStore(db)
cephrstore, err := NewClusterEphemeralReportStore(db, clusterId)
if err != nil {
klog.Error("failed to start cluster policy report store", err.Error())
return nil, err
Expand Down
Loading
Loading