Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync resources in the kubescape namespace #71

Merged
merged 8 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions adapters/incluster/v1/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (a *Adapter) GetClientByKind(kind domain.Kind) adapters.Client {
if !ok {
logger.L().Error("client not found", helpers.String("kind", kind.String()))
// if client is not found, create an empty one to discard the messages from the server in callbacks if the kind is not in the list
client = NewClient(&NoOpDynamicClient{}, a.cfg.Account, a.cfg.ClusterName, config.Resource{
client = NewClient(&NoOpDynamicClient{}, a.cfg, config.Resource{
Group: kind.Group,
Version: kind.Version,
Resource: kind.Resource,
Expand Down Expand Up @@ -110,7 +110,7 @@ func (a *Adapter) Callbacks(_ context.Context) (domain.Callbacks, error) {

func (a *Adapter) Start(ctx context.Context) error {
for _, r := range a.cfg.Resources {
client := NewClient(a.k8sclient, a.cfg.Account, a.cfg.ClusterName, r)
client := NewClient(a.k8sclient, a.cfg, r)
client.RegisterCallbacks(ctx, a.callbacks)
a.clients[r.String()] = client

Expand Down
52 changes: 36 additions & 16 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
jsonpatch "github.com/evanphx/json-patch"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/config"
"github.com/kubescape/synchronizer/domain"
Expand All @@ -30,7 +31,10 @@ import (
"k8s.io/client-go/dynamic"
)

const envMultiplier = "EVENT_MULTIPLIER"
const (
envMultiplier = "EVENT_MULTIPLIER"
kubescapeCustomResourceGroup = "spdx.softwarecomposition.kubescape.io"
)

var fieldsToRemove = map[string][][]string{
"default": {},
Expand All @@ -50,6 +54,7 @@ type Client struct {
client dynamic.Interface
account string
cluster string
operatorNamespace string // the namespace where the kubescape operator is running
kind *domain.Kind
multiplier int
callbacks domain.Callbacks
Expand All @@ -61,14 +66,15 @@ type Client struct {

var errWatchClosed = errors.New("watch channel closed")

func NewClient(client dynamic.Interface, account, cluster string, r config.Resource) *Client {
func NewClient(client dynamic.Interface, cfg config.InCluster, r config.Resource) *Client {
res := schema.GroupVersionResource{Group: r.Group, Version: r.Version, Resource: r.Resource}
// get event multiplier from env, defaults to 0
multiplier, _ := strconv.Atoi(os.Getenv(envMultiplier))
return &Client{
account: account,
client: client,
cluster: cluster,
account: cfg.Account,
client: client,
operatorNamespace: cfg.Namespace,
cluster: cfg.ClusterName,
kind: &domain.Kind{
Group: res.Group,
Version: res.Version,
Expand All @@ -93,7 +99,7 @@ func (c *Client) Start(ctx context.Context) error {
// for our storage, we need to list all resources and get them one by one
// as list returns objects with empty spec
// and watch does not return existing objects
if c.res.Group == "spdx.softwarecomposition.kubescape.io" {
if c.res.Group == kubescapeCustomResourceGroup {
if err := backoff.RetryNotify(func() error {
var err error
watchOpts.ResourceVersion, err = c.getExistingStorageObjects(ctx)
Expand All @@ -118,7 +124,7 @@ func (c *Client) Start(ctx context.Context) error {
}

// skip non-standalone resources
if hasParent(d) {
if c.isFiltered(d) {
continue
}
id := domain.KindName{
Expand Down Expand Up @@ -226,18 +232,18 @@ func multiplyEvent(event watch.Event, queue *utils.CooldownQueue, multiplier int
// change the workload-name label too - if applicable
labels := newEvent.Object.(*unstructured.Unstructured).GetLabels()
if labels != nil {
if workloadName, ok := labels["kubescape.io/workload-name"]; ok {
labels["kubescape.io/workload-name"] = fmt.Sprintf("%s-%d", workloadName, i)
if workloadName, ok := labels[helpersv1.NameMetadataKey]; ok {
labels[helpersv1.NameMetadataKey] = fmt.Sprintf("%s-%d", workloadName, i)
newEvent.Object.(*unstructured.Unstructured).SetLabels(labels)
}
}

annotations := newEvent.Object.(*unstructured.Unstructured).GetAnnotations()
if annotations != nil {
if wlid, ok := annotations["kubescape.io/wlid"]; ok {
if wlid, ok := annotations[helpersv1.WlidMetadataKey]; ok {

// workloadinterface.
annotations["kubescape.io/wlid"] = fmt.Sprintf("%s-%d", wlid, i)
annotations[helpersv1.WlidMetadataKey] = fmt.Sprintf("%s-%d", wlid, i)
newEvent.Object.(*unstructured.Unstructured).SetAnnotations(annotations)
}
}
Expand All @@ -247,6 +253,20 @@ func multiplyEvent(event watch.Event, queue *utils.CooldownQueue, multiplier int
}
}

// isFiltered returns true if workload should be filtered out.
// filters out workloads that have a parent, unless they are in the kubescape-operator namespace
func (c *Client) isFiltered(workload *unstructured.Unstructured) bool {
if workload == nil {
return false
}
// workload is not filtered if it is in the kubescape-operator namespace
if c.operatorNamespace != "" && workload.GetNamespace() == c.operatorNamespace {
return false
}
// for all other workloads, we filter out those that have a parent
return hasParent(workload)
}

// hasParent returns true if workload has a parent
// based on https://github.com/kubescape/k8s-interface/blob/2855cc94bd7666b227ad9e5db5ca25cb895e6cee/k8sinterface/k8sdynamic.go#L219
func hasParent(workload *unstructured.Unstructured) bool {
Expand Down Expand Up @@ -492,8 +512,8 @@ func (c *Client) multiplyVerifyObject(ctx context.Context, id domain.KindName, o
// change the workload-name label too - if applicable
labels := obj.GetLabels()
if labels != nil {
if workloadName, ok := labels["kubescape.io/workload-name"]; ok {
labels["kubescape.io/workload-name"] = fmt.Sprintf("%s-%d", workloadName, i)
if workloadName, ok := labels[helpersv1.NameMetadataKey]; ok {
labels[helpersv1.NameMetadataKey] = fmt.Sprintf("%s-%d", workloadName, i)
obj.SetLabels(labels)
}
}
Expand Down Expand Up @@ -523,7 +543,7 @@ func (c *Client) filterAndMarshal(d *unstructured.Unstructured) ([]byte, error)
}

func (c *Client) getObjectFromUnstructured(d *unstructured.Unstructured) ([]byte, error) {
if c.res.Group == "spdx.softwarecomposition.kubescape.io" {
if c.res.Group == kubescapeCustomResourceGroup {
obj, err := c.getResource(d.GetNamespace(), d.GetName())
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
Expand Down Expand Up @@ -620,8 +640,8 @@ func reconcileBatchProcessingFunc(ctx context.Context, c *Client, items domain.B
for _, k := range clientItemsSet.Difference(serverItemsSet).ToSlice() {
item := clientItems[k]

if hasParent(&item) {
logger.L().Debug("reconciliation: resource missing in server has parent, skipping",
if c.isFiltered(&item) {
logger.L().Debug("reconciliation: resource missing in server should be filtered, skipping",
helpers.String("resource", c.kind.String()),
helpers.String("name", item.GetName()),
helpers.String("namespace", item.GetNamespace()))
Expand Down
5 changes: 4 additions & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ func main() {
if clusterConfig, err := config.LoadClusterConfig(); err != nil {
logger.L().Warning("failed to load cluster config", helpers.Error(err))
} else {
logger.L().Debug("cluster config loaded", helpers.String("clusterName", clusterConfig.ClusterName))
logger.L().Debug("cluster config loaded",
helpers.String("clusterName", clusterConfig.ClusterName),
helpers.String("namespace", clusterConfig.Namespace))
cfg.InCluster.ClusterName = clusterConfig.ClusterName
cfg.InCluster.Namespace = clusterConfig.Namespace
}

// load credentials (access key & account)
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Backend struct {

type InCluster struct {
ServerUrl string `mapstructure:"serverUrl"`
Namespace string `mapstructure:"namespace"`
ClusterName string `mapstructure:"clusterName"`
Account string `mapstructure:"account"`
AccessKey string `mapstructure:"accessKey"`
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestLoadConfig(t *testing.T) {
Account: "11111111-2222-3333-4444-11111111",
AccessKey: "xxxxxxxx-1111-1111-1111-xxxxxxxx",
Resources: []Resource{
{Group: "", Version: "v1", Resource: "pods", Strategy: "patch"},
{Group: "", Version: "v1", Resource: "nodes", Strategy: "patch"},
{Group: "apps", Version: "v1", Resource: "deployments", Strategy: "patch"},
{Group: "apps", Version: "v1", Resource: "statefulsets", Strategy: "patch"},
Expand Down
6 changes: 6 additions & 0 deletions configuration/client/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
"account": "11111111-2222-3333-4444-11111111",
"accessKey": "xxxxxxxx-1111-1111-1111-xxxxxxxx",
"resources": [
{
"group": "",
"version": "v1",
"resource": "pods",
"strategy": "patch"
},
{
"group": "",
"version": "v1",
Expand Down
23 changes: 16 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ toolchain go1.21.5
require (
github.com/SergJa/jsonhash v0.0.0-20210531165746-fc45f346aa74
github.com/apache/pulsar-client-go v0.12.0
github.com/armosec/armoapi-go v0.0.292
github.com/armosec/utils-k8s-go v0.0.24
github.com/armosec/armoapi-go v0.0.329
github.com/armosec/utils-k8s-go v0.0.26
github.com/cenkalti/backoff/v4 v4.2.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/deckarep/golang-set/v2 v2.6.0
Expand All @@ -19,6 +19,7 @@ require (
github.com/kinbiko/jsonassert v1.1.1
github.com/kubescape/backend v0.0.14
github.com/kubescape/go-logger v0.0.22
github.com/kubescape/k8s-interface v0.0.161
github.com/kubescape/messaging v0.0.22
github.com/panjf2000/ants/v2 v2.8.2
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
Expand Down Expand Up @@ -48,23 +49,26 @@ require (
github.com/Microsoft/hcsshim v0.11.4 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armosec/gojay v1.2.17 // indirect
github.com/armosec/utils-go v0.0.56 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.8.0 // indirect
github.com/briandowns/spinner v1.23.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.7.11 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/docker v24.0.7+incompatible // indirect
github.com/docker/docker v25.0.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down Expand Up @@ -101,6 +105,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -110,11 +115,11 @@ require (
github.com/olvrng/ujson v1.1.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/pquerna/cachecontrol v0.2.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
Expand All @@ -137,6 +142,7 @@ require (
github.com/uptrace/opentelemetry-go-extra/otelzap v0.2.3 // indirect
github.com/uptrace/uptrace-go v1.21.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
Expand All @@ -150,10 +156,11 @@ require (
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.15.0 // indirect
Expand All @@ -165,10 +172,12 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e // indirect
sigs.k8s.io/controller-runtime v0.15.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down
Loading
Loading