Skip to content

Commit

Permalink
Do not do destructive updates during bootup (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
aattuluri authored Feb 10, 2022
1 parent d3201d4 commit c8f14da
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 2 deletions.
70 changes: 68 additions & 2 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clusters

import (
"bytes"
"fmt"
argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/gogo/protobuf/types"
Expand All @@ -15,8 +16,10 @@ import (
k8sAppsV1 "k8s.io/api/apps/v1"
k8sV1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"sort"
"strings"
"time"
)

const ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
Expand Down Expand Up @@ -540,12 +543,23 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn
obj.ResourceVersion = ""
_, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Create(obj)
op = "Add"
log.Infof(LogFormat + " SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String())
} else {
exist.Labels = obj.Labels
exist.Annotations = obj.Annotations
exist.Spec = obj.Spec
op = "Update"
_, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Update(exist)
skipUpdate, diff := skipDestructiveUpdate(rc, obj, exist)
if diff != "" {
log.Infof(LogFormat + " diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
}
if skipUpdate {
log.Infof(LogFormat, op, "ServiceEntry", obj.Name, rc.ClusterID, "Update skipped as it was destructive during Admiral's bootup phase")
return
} else {
exist.Spec = obj.Spec
_, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Update(exist)
}

}

if err != nil {
Expand All @@ -555,6 +569,58 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn
}
}

func skipDestructiveUpdate(rc *RemoteController, new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry) (skipDestructive bool, diff string) {
skipDestructive = false
destructive, diff := getServiceEntryDiff(new, old)
//do not update SEs during bootup phase if they are destructive
if time.Since(rc.StartTime) < (2 * common.GetAdmiralParams().CacheRefreshDuration) && destructive {
skipDestructive = true
}

return skipDestructive, diff
}

//Diffs only endpoints
func getServiceEntryDiff(new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry) (destructive bool, diff string) {

//we diff only if both objects exist
if old == nil || new == nil {
return false, ""
}
destructive = false
format := "%s %s before: %v, after: %v;"
var buffer bytes.Buffer
seNew := new.Spec
seOld := old.Spec

oldEndpointMap := make(map[string]*v1alpha32.ServiceEntry_Endpoint)
found := make(map[string]string)
for _, oEndpoint := range seOld.Endpoints {
oldEndpointMap[oEndpoint.Address] = oEndpoint
}
for _, nEndpoint := range seNew.Endpoints {
if val, ok := oldEndpointMap[nEndpoint.Address]; ok {
found[nEndpoint.Address] = "1"
if !reflect.DeepEqual(val, nEndpoint) {
destructive = true
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String()))
}
} else {
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String()))
}
}

for key := range oldEndpointMap {
if _, ok := found[key]; !ok {
destructive = true
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), ""))
}
}

diff = buffer.String()
return destructive, diff
}

func deleteServiceEntry(exist *v1alpha3.ServiceEntry, namespace string, rc *RemoteController) {
if exist != nil {
err := rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Delete(exist.Name, &v12.DeleteOptions{})
Expand Down
262 changes: 262 additions & 0 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
k8sv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -993,4 +994,265 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) {
}
})
}
}

func TestSkipDestructiveUpdate(t *testing.T) {

twoEndpointSe := v1alpha3.ServiceEntry{
Hosts: []string{"e2e.my-first-service.mesh"},
Addresses: []string{"240.10.1.1"},
Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort),
Name: "http", Protocol: "http"}},
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_DNS,
SubjectAltNames: []string{"spiffe://prefix/my-first-service"},
Endpoints: []*v1alpha3.ServiceEntry_Endpoint{
{Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 0}, Locality: "us-west-2"},
{Address: "dummy.admiral.global-east", Ports: map[string]uint32{"http": 0}, Locality: "us-east-2"},
},
}

twoEndpointSeUpdated := v1alpha3.ServiceEntry{
Hosts: []string{"e2e.my-first-service.mesh"},
Addresses: []string{"240.10.1.1"},
Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort),
Name: "http", Protocol: "http"}},
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_DNS,
SubjectAltNames: []string{"spiffe://prefix/my-first-service"},
Endpoints: []*v1alpha3.ServiceEntry_Endpoint{
{Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 90}, Locality: "us-west-2"},
{Address: "dummy.admiral.global-east", Ports: map[string]uint32{"http": 0}, Locality: "us-east-2"},
},
}

oneEndpointSe := v1alpha3.ServiceEntry{
Hosts: []string{"e2e.my-first-service.mesh"},
Addresses: []string{"240.10.1.1"},
Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort),
Name: "http", Protocol: "http"}},
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_DNS,
SubjectAltNames: []string{"spiffe://prefix/my-first-service"},
Endpoints: []*v1alpha3.ServiceEntry_Endpoint{
{Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 0}, Locality: "us-west-2"},
},
}

newSeTwoEndpoints := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
Spec: twoEndpointSe,
}

newSeTwoEndpointsUpdated := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
Spec: twoEndpointSeUpdated,
}

newSeOneEndpoint := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
Spec: oneEndpointSe,
}

oldSeTwoEndpoints := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
Spec: twoEndpointSe,
}

oldSeOneEndpoint := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
Spec: oneEndpointSe,
}


rcWarmupPhase := &RemoteController{
StartTime: time.Now(),
}

rcNotinWarmupPhase := &RemoteController{
StartTime: time.Now().Add(time.Duration(-21) * time.Minute),
}

//Struct of test case info. Name is required.
testCases := []struct {
name string
rc *RemoteController
newSe *v1alpha32.ServiceEntry
oldSe *v1alpha32.ServiceEntry
skipDestructive bool
diff string
}{
{
name: "Should return false when in warm up phase but not destructive",
rc: rcWarmupPhase,
newSe: newSeOneEndpoint,
oldSe: oldSeOneEndpoint,
skipDestructive: false,
diff: "",
},
{
name: "Should return true when in warm up phase but is destructive",
rc: rcWarmupPhase,
newSe: newSeOneEndpoint,
oldSe: oldSeTwoEndpoints,
skipDestructive: true,
diff: "Delete",
},
{
name: "Should return false when not in warm up phase but is destructive",
rc: rcNotinWarmupPhase,
newSe: newSeOneEndpoint,
oldSe: oldSeTwoEndpoints,
skipDestructive: false,
diff: "Delete",
},
{
name: "Should return false when in warm up phase but is constructive",
rc: rcWarmupPhase,
newSe: newSeTwoEndpoints,
oldSe: oldSeOneEndpoint,
skipDestructive: false,
diff: "Add",
},
{
name: "Should return false when not in warm up phase but endpoints updated",
rc: rcNotinWarmupPhase,
newSe: newSeTwoEndpointsUpdated,
oldSe: oldSeTwoEndpoints,
skipDestructive: false,
diff: "Update",
},
{
name: "Should return true when in warm up phase but endpoints are updated (destructive)",
rc: rcWarmupPhase,
newSe: newSeTwoEndpointsUpdated,
oldSe: oldSeTwoEndpoints,
skipDestructive: true,
diff: "Update",
},

}

//Run the test for every provided case
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
skipDestructive, diff := skipDestructiveUpdate(c.rc, c.newSe, c.oldSe)
if skipDestructive == c.skipDestructive {
//perfect
} else {
t.Errorf("Result Failed. Got %v, expected %v", skipDestructive, c.skipDestructive)
}
if c.diff == "" || (c.diff != "" && strings.Contains(diff, c.diff)) {
//perfect
} else {
t.Errorf("Diff Failed. Got %v, expected %v", diff, c.diff)
}
})
}
}

func TestAddUpdateServiceEntry(t *testing.T) {


fakeIstioClient := istiofake.NewSimpleClientset()

seCtrl := &istio.ServiceEntryController{
IstioClient: fakeIstioClient,
}

twoEndpointSe := v1alpha3.ServiceEntry{
Hosts: []string{"e2e.my-first-service.mesh"},
Addresses: []string{"240.10.1.1"},
Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort),
Name: "http", Protocol: "http"}},
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_DNS,
SubjectAltNames: []string{"spiffe://prefix/my-first-service"},
Endpoints: []*v1alpha3.ServiceEntry_Endpoint{
{Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 0}, Locality: "us-west-2"},
{Address: "dummy.admiral.global-east", Ports: map[string]uint32{"http": 0}, Locality: "us-east-2"},
},
}

oneEndpointSe := v1alpha3.ServiceEntry{
Hosts: []string{"e2e.my-first-service.mesh"},
Addresses: []string{"240.10.1.1"},
Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort),
Name: "http", Protocol: "http"}},
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_DNS,
SubjectAltNames: []string{"spiffe://prefix/my-first-service"},
Endpoints: []*v1alpha3.ServiceEntry_Endpoint{
{Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 0}, Locality: "us-west-2"},
},
}

newSeOneEndpoint := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "namespace"},
Spec: oneEndpointSe,
}

oldSeTwoEndpoints := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se2", Namespace: "namespace"},
Spec: twoEndpointSe,
}

seCtrl.IstioClient.NetworkingV1alpha3().ServiceEntries("namespace").Create(oldSeTwoEndpoints)

rcWarmupPhase := &RemoteController{
ServiceEntryController: seCtrl,
StartTime: time.Now(),
}

rcNotinWarmupPhase := &RemoteController{
ServiceEntryController: seCtrl,
StartTime: time.Now().Add(time.Duration(-21) * time.Minute),
}

//Struct of test case info. Name is required.
testCases := []struct {
name string
rc *RemoteController
newSe *v1alpha32.ServiceEntry
oldSe *v1alpha32.ServiceEntry
skipDestructive bool
}{
{
name: "Should add a new SE",
rc: rcWarmupPhase,
newSe: newSeOneEndpoint,
oldSe: nil,
skipDestructive: false,
},
{
name: "Should not update SE when in warm up mode and the update is destructive",
rc: rcWarmupPhase,
newSe: newSeOneEndpoint,
oldSe: oldSeTwoEndpoints,
skipDestructive: true,
},
{
name: "Should update an SE",
rc: rcNotinWarmupPhase,
newSe: newSeOneEndpoint,
oldSe: oldSeTwoEndpoints,
skipDestructive: false,
},

}

//Run the test for every provided case
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
addUpdateServiceEntry(c.newSe, c.oldSe, "namespace", c.rc)
if c.skipDestructive {
//verify the update did not go through
se, _ := c.rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries("namespace").Get(c.oldSe.Name, v12.GetOptions{})
_, diff := getServiceEntryDiff(c.oldSe, se)
if diff != "" {
t.Errorf("Failed. Got %v, expected %v", se.Spec.String(), c.oldSe.Spec.String())
}
}
})
}
}
1 change: 1 addition & 0 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste
stop: stop,
ClusterID: clusterID,
ApiServer: clientConfig.Host,
StartTime: time.Now(),
}

var err error
Expand Down
Loading

0 comments on commit c8f14da

Please sign in to comment.