Skip to content

Commit

Permalink
change recon test to full chain (#53)
Browse files Browse the repository at this point in the history
* fix collision in ShadowObjects

Signed-off-by: Matthias Bertschy <[email protected]>

* change recon test to full chain

Signed-off-by: Matthias Bertschy <[email protected]>

---------

Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx authored Feb 11, 2024
1 parent eb312cb commit e733923
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 117 deletions.
92 changes: 76 additions & 16 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cenkalti/backoff/v4"
"go.uber.org/multierr"

mapset "github.com/deckarep/golang-set/v2"
jsonpatch "github.com/evanphx/json-patch"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
Expand Down Expand Up @@ -130,7 +131,7 @@ func (c *Client) Start(ctx context.Context) error {
}
if c.Strategy == domain.PatchStrategy {
// remove from known resources
delete(c.ShadowObjects, id.Name)
delete(c.ShadowObjects, id.String())
}
case event.Type == watch.Modified:
logger.L().Debug("modified resource", helpers.String("id", id.String()))
Expand Down Expand Up @@ -225,9 +226,9 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj
if c.Strategy == domain.PatchStrategy {
if len(baseObject) > 0 {
// update reference object
c.ShadowObjects[id.Name] = baseObject
c.ShadowObjects[id.String()] = baseObject
}
if oldObject, ok := c.ShadowObjects[id.Name]; ok {
if oldObject, ok := c.ShadowObjects[id.String()]; ok {
// calculate checksum
checksum, err := utils.CanonicalHash(newObject)
if err != nil {
Expand All @@ -249,7 +250,7 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj
}
}
// add/update known resources
c.ShadowObjects[id.Name] = newObject
c.ShadowObjects[id.String()] = newObject
} else {
err := c.callbacks.PutObject(ctx, id, newObject)
if err != nil {
Expand Down Expand Up @@ -327,7 +328,7 @@ func (c *Client) patchObject(ctx context.Context, id domain.KindName, checksum s
return object, fmt.Errorf("checksum mismatch: %s != %s", newChecksum, checksum)
}
// update known resources
c.ShadowObjects[id.Name] = modified
c.ShadowObjects[id.String()] = modified
// save object
return object, c.PutObject(ctx, id, modified)
}
Expand Down Expand Up @@ -442,13 +443,31 @@ func reconcileBatchProcessingFunc(ctx context.Context, c *Client, items domain.B
return fmt.Errorf("reconciliation batch (%s) was empty - expected at least one NewChecksum message", c.res.Resource)
}

// create a map of resources from the client
list, err := c.client.Resource(c.res).Namespace("").List(context.Background(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("list resources: %w", err)
}
clientItems := map[string]unstructured.Unstructured{}
clientItemsSet := mapset.NewSet[string]()
for _, item := range list.Items {
k := fmt.Sprintf("%s/%s", item.GetNamespace(), item.GetName())
clientItems[k] = item
clientItemsSet.Add(k)
}

// create a map of resources from the server
serverItems := map[string]domain.NewChecksum{}
serverItemsSet := mapset.NewSet[string]()
for _, item := range items.NewChecksum {
idx := findResourceInList(list.Items, item.Namespace, item.Name)
k := fmt.Sprintf("%s/%s", item.Namespace, item.Name)
serverItems[k] = item
serverItemsSet.Add(k)
}

// resources that should not be in server, send delete
for _, k := range serverItemsSet.Difference(clientItemsSet).ToSlice() {
item := serverItems[k]

id := domain.KindName{
Kind: c.kind,
Expand All @@ -457,24 +476,30 @@ func reconcileBatchProcessingFunc(ctx context.Context, c *Client, items domain.B
ResourceVersion: item.ResourceVersion,
}

// resource was not found, sending delete message
if idx == -1 {
logger.L().Debug("resource missing", helpers.String("kind", item.Kind.String()), helpers.String("name", item.Name), helpers.String("namespace", item.Namespace))
err = multierr.Append(err, c.callbacks.DeleteObject(ctx, id))
continue
}
logger.L().Debug("resource should not be in server, sending delete message",
helpers.String("resource", item.Kind.String()),
helpers.String("name", item.Name),
helpers.String("namespace", item.Namespace))
err = multierr.Append(err, c.callbacks.DeleteObject(ctx, id))
}

// resource was found
resource := list.Items[idx]
// resources in common, check resource version
for _, k := range serverItemsSet.Intersect(clientItemsSet).ToSlice() {
item := serverItems[k]
resource := clientItems[k]
currentVersion := domain.ToResourceVersion(resource.GetResourceVersion())
if currentVersion == item.ResourceVersion {
// resource has same version, skipping
logger.L().Debug("resource has same version", helpers.String("resource", item.Kind.String()), helpers.String("name", item.Name), helpers.String("namespace", item.Namespace))
logger.L().Debug("resource has same version, skipping",
helpers.String("resource", item.Kind.String()),
helpers.String("name", item.Name),
helpers.String("namespace", item.Namespace),
helpers.Int("resource version", currentVersion))
continue
}

// resource has changed, sending a put message
logger.L().Debug("resource has changed",
logger.L().Debug("resource has changed, sending put message",
helpers.String("resource", item.Kind.String()),
helpers.String("name", item.Name),
helpers.String("namespace", item.Namespace),
Expand All @@ -483,10 +508,45 @@ func reconcileBatchProcessingFunc(ctx context.Context, c *Client, items domain.B
newObject, marshalErr := c.getObjectFromUnstructured(&resource)
if marshalErr != nil {
err = multierr.Append(err, fmt.Errorf("marshal resource: %w", marshalErr))
continue
}
id := domain.KindName{
Kind: c.kind,
Name: item.Name,
Namespace: item.Namespace,
ResourceVersion: item.ResourceVersion,
}
err = multierr.Append(err, c.callbacks.PutObject(ctx, id, newObject))
}

// resources missing in server, send verify checksum
for _, k := range clientItemsSet.Difference(serverItemsSet).ToSlice() {
item := clientItems[k]

resourceVersion := domain.ToResourceVersion(item.GetResourceVersion())
id := domain.KindName{
Kind: c.kind,
Name: item.GetName(),
Namespace: item.GetNamespace(),
ResourceVersion: resourceVersion,
}

newObject, marshalErr := utils.FilterAndMarshal(&item)
if marshalErr != nil {
err = multierr.Append(err, fmt.Errorf("marshal resource: %w", marshalErr))
continue
}

logger.L().Debug("resource missing in server, sending verify message",
helpers.String("resource", c.res.Resource),
helpers.String("name", item.GetName()),
helpers.String("namespace", item.GetNamespace()))
// remove cached object
delete(c.ShadowObjects, id.String())
// send verify message
err = multierr.Append(err, c.callVerifyObject(ctx, id, newObject))
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions adapters/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ func (m *MockAdapter) TestCallPutOrPatch(ctx context.Context, id domain.KindName
if m.patchStrategy {
if len(baseObject) > 0 {
// update reference object
m.shadowObjects[id.Name] = baseObject
m.shadowObjects[id.String()] = baseObject
}
if oldObject, ok := m.shadowObjects[id.Name]; ok {
if oldObject, ok := m.shadowObjects[id.String()]; ok {
// calculate checksum
checksum, err := utils.CanonicalHash(newObject)
if err != nil {
Expand All @@ -269,7 +269,7 @@ func (m *MockAdapter) TestCallPutOrPatch(ctx context.Context, id domain.KindName
}
}
// add/update known resources
m.shadowObjects[id.Name] = newObject
m.shadowObjects[id.String()] = newObject
} else {
err := m.callbacks.PutObject(ctx, id, newObject)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/armosec/utils-k8s-go v0.0.24
github.com/cenkalti/backoff/v4 v4.2.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/deckarep/golang-set/v2 v2.6.0
github.com/evanphx/json-patch v5.7.0+incompatible
github.com/gobwas/ws v1.3.1
github.com/google/uuid v1.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
Expand Down
Loading

0 comments on commit e733923

Please sign in to comment.