Skip to content

Commit

Permalink
Introduce new SaaS flag (#232)
Browse files Browse the repository at this point in the history
Introduce a new saas flag to simplify code and make it clearer when to
use what. This allows to avoid to do multiple calls, like first checking
whether brokers pods with self-managed label are deployed and then with
SaaS label. With this approach we can know in which environment we are
running.

Adjust the tests to also use the CRD to detect the environment.
  • Loading branch information
ChrisKujawa authored Nov 18, 2022
2 parents 77b7d9a + e9f568e commit a736b9d
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 55 deletions.
19 changes: 5 additions & 14 deletions go-chaos/internal/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,19 @@ import (
)

func (c K8Client) getGatewayDeployment() (*v12.Deployment, error) {

listOptions := metav1.ListOptions{
LabelSelector: getSelfManagedGatewayLabels(),
LabelSelector: c.getGatewayLabels(),
}
deploymentList, err := c.Clientset.AppsV1().Deployments(c.GetCurrentNamespace()).List(context.TODO(), listOptions)
if err != nil {
return nil, err
}

// here it is currently hard to distingush between not existing and embedded gateway;
// since we don't use embedded gateway in our current chaos setup I would not support it right now here
if deploymentList == nil || len(deploymentList.Items) <= 0 {
// lets check for SaaS setup
listOptions.LabelSelector = getSaasGatewayLabels()
deploymentList, err = c.Clientset.AppsV1().Deployments(c.GetCurrentNamespace()).List(context.TODO(), listOptions)
if err != nil {
return nil, err
}

// here it is currently hard to distingush between not existing and embedded gateway;
// since we don't use embedded gateway in our current chaos setup I would not support it right now here
if deploymentList == nil || len(deploymentList.Items) <= 0 {
return nil, errors.New(fmt.Sprintf("Expected to find standalone gateway deployment in namespace %s, but none found!", c.GetCurrentNamespace()))
}
return nil, errors.New(fmt.Sprintf("Expected to find standalone gateway deployment in namespace %s, but none found! The embedded gateway is not supported.", c.GetCurrentNamespace()))
}

return &deploymentList.Items[0], err
}
2 changes: 2 additions & 0 deletions go-chaos/internal/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func Test_ShouldReturnTrueForRunningSaaSGatewayDeployment(t *testing.T) {
k8Client := CreateFakeClient()
selector, err := metav1.ParseToLabelSelector(getSaasGatewayLabels())
require.NoError(t, err)
k8Client.createSaaSCRD(t)
k8Client.CreateDeploymentWithLabelsAndName(t, selector, "gateway")

// when
Expand Down Expand Up @@ -85,6 +86,7 @@ func Test_ShouldReturnSaaSGatewayDeployment(t *testing.T) {
k8Client := CreateFakeClient()
selector, err := metav1.ParseToLabelSelector(getSaasGatewayLabels())
require.NoError(t, err)
k8Client.createSaaSCRD(t)
k8Client.CreateDeploymentWithLabelsAndName(t, selector, "gateway")

// when
Expand Down
2 changes: 1 addition & 1 deletion go-chaos/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package internal
import "github.com/camunda/zeebe/clients/go/v8/pkg/zbc"

// defines whether the functions should print verbose output
var Verbosity bool = false
var Verbosity = false

// defines if a custom kube config should be used instead of the default one found by k8s
var KubeConfigPath string
Expand Down
26 changes: 25 additions & 1 deletion go-chaos/internal/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,31 @@ package internal

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/require"
v12 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
dynamicFake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
)

func CreateFakeClient() K8Client {
k8Client := K8Client{Clientset: fake.NewSimpleClientset(), ClientConfig: &testClientConfig{namespace: "testNamespace"}}
scheme := runtime.NewScheme()
groupVersionKind := schema.GroupVersionKind{Group: "cloud.camunda.io", Version: "v1alpha1", Kind: "zeebeclusters"}
scheme.AddKnownTypeWithName(groupVersionKind, &unstructured.Unstructured{})

k8Client := K8Client{Clientset: fake.NewSimpleClientset(),
DynamicClient: dynamicFake.NewSimpleDynamicClient(scheme),
ClientConfig: &testClientConfig{namespace: "testNamespace"}}
return k8Client
}

Expand Down Expand Up @@ -78,6 +89,19 @@ func (c K8Client) CreateDeploymentWithLabelsAndName(t *testing.T, selector *meta
require.NoError(t, err)
}

func (c *K8Client) createSaaSCRD(t *testing.T) {
zeebeCrd := schema.GroupVersionResource{Group: "cloud.camunda.io", Version: "v1alpha1", Resource: "zeebeclusters"}
obj := &unstructured.Unstructured{}
namespace := c.GetCurrentNamespace()
clusterId := strings.TrimSuffix(namespace, "-zeebe")
obj.SetName(clusterId)

_, err := c.DynamicClient.Resource(zeebeCrd).Create(context.TODO(), obj, metav1.CreateOptions{})
require.NoError(t, err)

c.SaaSEnv = c.isSaaSEnvironment()
}

func (c K8Client) CreateStatefulSetWithLabelsAndName(t *testing.T, selector *metav1.LabelSelector, name string) {
_, err := c.Clientset.AppsV1().StatefulSets(c.GetCurrentNamespace()).Create(context.TODO(), &v12.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Labels: selector.MatchLabels, Name: name},
Expand Down
18 changes: 16 additions & 2 deletions go-chaos/internal/k8helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package internal

import (
"fmt"
"k8s.io/client-go/dynamic"
"path/filepath"

"k8s.io/client-go/dynamic"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
Expand All @@ -33,6 +34,7 @@ type K8Client struct {
ClientConfig clientcmd.ClientConfig
DynamicClient dynamic.Interface
Clientset kubernetes.Interface
SaaSEnv bool
}

// Returns the current namespace, defined in the kubeconfig
Expand Down Expand Up @@ -72,7 +74,19 @@ func createK8Client(settings KubernetesSettings) (K8Client, error) {
if err != nil {
return K8Client{}, err
}
return K8Client{Clientset: clientset, ClientConfig: clientConfig, DynamicClient: dynamicClient}, nil

client := K8Client{Clientset: clientset, ClientConfig: clientConfig, DynamicClient: dynamicClient}
client.SaaSEnv = client.isSaaSEnvironment()

if Verbosity {
if client.SaaSEnv {
fmt.Println("Running experiment in SaaS environment.")
} else {
fmt.Println("Running experiment in self-managed environment.")
}
}

return client, nil
}

type KubernetesSettings struct {
Expand Down
16 changes: 16 additions & 0 deletions go-chaos/internal/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func getSelfManagedZeebeStatefulSetLabels() string {
return labels.Set(labelSelector.MatchLabels).String()
}

func (c K8Client) getBrokerLabels() string {
if c.SaaSEnv {
return getSaasBrokerLabels()
} else {
return getSelfManagedBrokerLabels()
}
}

func getSelfManagedBrokerLabels() string {
labelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{"app.kubernetes.io/component": "zeebe-broker"},
Expand Down Expand Up @@ -58,3 +66,11 @@ func getSaasGatewayLabels() string {
}
return labels.Set(labelSelector.MatchLabels).String()
}

func (c K8Client) getGatewayLabels() string {
if c.SaaSEnv {
return getSaasGatewayLabels()
} else {
return getSelfManagedGatewayLabels()
}
}
3 changes: 2 additions & 1 deletion go-chaos/internal/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
func Test_ShouldApplyNetworkPatchOnStatefulSet(t *testing.T) {
// given
k8Client := CreateFakeClient()
k8Client.createSaaSCRD(t)
k8Client.CreateStatefulSetWithLabelsAndName(t, &metav1.LabelSelector{}, "zeebe")

// when
Expand All @@ -44,7 +45,7 @@ func Test_ShouldApplyNetworkPatchOnStatefulSet(t *testing.T) {
func Test_ShouldApplyNetworkPatchOnDeployment(t *testing.T) {
// given
k8Client := CreateFakeClient()
selector, err := metav1.ParseToLabelSelector(getSaasGatewayLabels())
selector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels())
require.NoError(t, err)
k8Client.CreateDeploymentWithLabelsAndName(t, selector, "gateway")

Expand Down
24 changes: 4 additions & 20 deletions go-chaos/internal/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,15 @@ import (

func (c K8Client) GetBrokerPods() (*v1.PodList, error) {
listOptions := metav1.ListOptions{
LabelSelector: getSelfManagedBrokerLabels(),
LabelSelector: c.getBrokerLabels(),
}

list, err := c.Clientset.CoreV1().Pods(c.GetCurrentNamespace()).List(context.TODO(), listOptions)
if err != nil {
return nil, err
}

if list != nil && len(list.Items) > 0 {
return list, err
}

// lets check for SaaS setup
listOptions.LabelSelector = getSaasBrokerLabels()
return c.Clientset.CoreV1().Pods(c.GetCurrentNamespace()).List(context.TODO(), listOptions)
return list, err
}

func (c K8Client) GetBrokerPodNames() ([]string, error) {
Expand All @@ -74,7 +68,7 @@ func (c K8Client) extractPodNames(list *v1.PodList) ([]string, error) {

func (c K8Client) GetGatewayPodNames() ([]string, error) {
listOptions := metav1.ListOptions{
LabelSelector: getSelfManagedGatewayLabels(),
LabelSelector: c.getGatewayLabels(),
// we check for running gateways, since terminated gateways can be lying around
FieldSelector: "status.phase=Running",
}
Expand All @@ -85,17 +79,7 @@ func (c K8Client) GetGatewayPodNames() ([]string, error) {
}

if list == nil || len(list.Items) == 0 {
// lets check for SaaS setup
listOptions.LabelSelector = getSaasGatewayLabels()
list, err = c.Clientset.CoreV1().Pods(c.GetCurrentNamespace()).List(context.TODO(), listOptions)
if err != nil {
return nil, err
}

if list == nil || len(list.Items) == 0 {
// maybe we have an embedded gateway setup
return c.GetBrokerPodNames()
}
return c.GetBrokerPodNames()
}

return c.extractPodNames(list)
Expand Down
4 changes: 3 additions & 1 deletion go-chaos/internal/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func Test_GetSaasBrokerPods(t *testing.T) {
require.NoError(t, err)

k8Client := CreateFakeClient()
k8Client.createSaaSCRD(t)
k8Client.CreatePodWithLabels(t, selector)

// when
Expand All @@ -78,7 +79,7 @@ func Test_GetSaasBrokerPods(t *testing.T) {

func Test_GetBrokersInOrder(t *testing.T) {
// given
selector, err := metav1.ParseToLabelSelector(getSaasBrokerLabels())
selector, err := metav1.ParseToLabelSelector(getSelfManagedBrokerLabels())
require.NoError(t, err)

k8Client := CreateFakeClient()
Expand Down Expand Up @@ -149,6 +150,7 @@ func Test_GetSelfManagedGatewayPodNames(t *testing.T) {
func Test_GetSaasGatewayPodNames(t *testing.T) {
// given
k8Client := CreateFakeClient()
k8Client.createSaaSCRD(t)

// gateway
selector, err := metav1.ParseToLabelSelector(getSaasGatewayLabels())
Expand Down
25 changes: 19 additions & 6 deletions go-chaos/internal/saas.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"strings"

k8sErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -39,6 +38,13 @@ func (c K8Client) ResumeReconciliation() error {
// otherwise it gets overwritten on the next reconcilation loop by the controller
// Based on https://github.com/camunda-cloud/zeebe-controller-k8s#turning-the-controller-off
func (c K8Client) setPauseFlag(pauseEnabled bool) error {
if !c.SaaSEnv {
if Verbosity {
fmt.Printf("Did not find zeebe cluster to pause reconciliation, ignoring. \n")
}
return nil
}

ctx := context.TODO()
namespace := c.GetCurrentNamespace()
clusterId := strings.TrimSuffix(namespace, "-zeebe")
Expand All @@ -48,10 +54,17 @@ func (c K8Client) setPauseFlag(pauseEnabled bool) error {
_, err := c.DynamicClient.Resource(zeebeCrd).Patch(ctx, clusterId, types.MergePatchType, []byte(payload), meta.PatchOptions{})
return err
})
if k8sErrors.IsNotFound(err) {
// No zb resource found so probably not Saas. Ignore for now.
fmt.Printf("Did not find zeebe cluster to pause reconciliation, ignoring. %s\n", err)
return nil
}
return err
}

func (c K8Client) isSaaSEnvironment() bool {
namespace := c.GetCurrentNamespace()
clusterId := strings.TrimSuffix(namespace, "-zeebe")
zeebeCrd := schema.GroupVersionResource{Group: "cloud.camunda.io", Version: "v1alpha1", Resource: "zeebeclusters"}
resource, err := c.DynamicClient.Resource(zeebeCrd).Get(context.TODO(), clusterId, meta.GetOptions{})

if err != nil || resource == nil {
return false
}
return true
}
42 changes: 42 additions & 0 deletions go-chaos/internal/saas_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 Camunda Services GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_ShouldReturnTrueWhenCRDDeployed(t *testing.T) {
// given
k8Client := CreateFakeClient()
k8Client.createSaaSCRD(t)

// when
isSaaSEnvironment := k8Client.isSaaSEnvironment()

assert.True(t, isSaaSEnvironment)
}

func Test_ShouldReturnFalseWhenNoCRDDeployed(t *testing.T) {
// given
k8Client := CreateFakeClient()

// when
isSaaSEnvironment := k8Client.isSaaSEnvironment()

assert.False(t, isSaaSEnvironment)
}
19 changes: 10 additions & 9 deletions go-chaos/internal/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,19 @@ func (c K8Client) GetZeebeStatefulSet() (*v1.StatefulSet, error) {
ctx := context.TODO()

statefulSets := c.Clientset.AppsV1().StatefulSets(namespace)
sfs, err := statefulSets.List(ctx, meta.ListOptions{LabelSelector: getSelfManagedZeebeStatefulSetLabels()})
if err != nil {
return nil, err
}
if len(sfs.Items) == 1 {
return &sfs.Items[0], nil
}
if len(sfs.Items) == 0 {
if c.SaaSEnv {
// On SaaS the StatefulSet is just named "zeebe" without any identifying labels
return statefulSets.Get(ctx, "zeebe", meta.GetOptions{})
} else {
sfs, err := statefulSets.List(ctx, meta.ListOptions{LabelSelector: getSelfManagedZeebeStatefulSetLabels()})
if err != nil {
return nil, err
}
if len(sfs.Items) == 1 {
return &sfs.Items[0], nil
}
return nil, errors.New("could not uniquely identify the stateful set for Zeebe")
}
return nil, errors.New("could not uniquely identify the stateful set for Zeebe")
}

// ScaleZeebeCluster Scales the StatefulSet for Zeebe. Waits until scaling is complete before returning the initial scale.
Expand Down
Loading

0 comments on commit a736b9d

Please sign in to comment.