Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support instance ids for rollout controller segregation #342

Merged
merged 2 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions"
"github.com/argoproj/argo-rollouts/pkg/signals"
controllerutil "github.com/argoproj/argo-rollouts/utils/controller"
)

const (
Expand All @@ -36,6 +37,7 @@ func newCommand() *cobra.Command {
logLevel string
glogLevel int
metricsPort int
instanceID string
rolloutThreads int
experimentThreads int
analysisThreads int
Expand Down Expand Up @@ -75,10 +77,14 @@ func newCommand() *cobra.Command {
kubeClient,
resyncDuration,
kubeinformers.WithNamespace(namespace))
instanceIDSelector := controllerutil.InstanceIDRequirement(instanceID)
argoRolloutsInformerFactory := informers.NewSharedInformerFactoryWithOptions(
rolloutClient,
resyncDuration,
informers.WithNamespace(namespace))
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = instanceIDSelector.String()
}))
jobInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
resyncDuration,
Expand All @@ -94,6 +100,7 @@ func newCommand() *cobra.Command {
argoRolloutsInformerFactory.Argoproj().V1alpha1().AnalysisRuns(),
argoRolloutsInformerFactory.Argoproj().V1alpha1().AnalysisTemplates(),
resyncDuration,
instanceID,
metricsPort)

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
Expand All @@ -113,6 +120,7 @@ func newCommand() *cobra.Command {
command.Flags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")
command.Flags().IntVar(&metricsPort, "metricsport", controller.DefaultMetricsPort, "Set the port the metrics endpoint should be exposed over")
command.Flags().StringVar(&instanceID, "instance-id", "", "Indicates which argo rollout objects the controller should operate on")
command.Flags().IntVar(&rolloutThreads, "rollout-threads", controller.DefaultRolloutThreads, "Set the number of worker threads for the Rollout controller")
command.Flags().IntVar(&experimentThreads, "experiment-threads", controller.DefaultExperimentThreads, "Set the number of worker threads for the Experiment controller")
command.Flags().IntVar(&analysisThreads, "analysis-threads", controller.DefaultAnalysisThreads, "Set the number of worker threads for the Experiment controller")
Expand Down
1 change: 1 addition & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func NewManager(
analysisRunInformer informers.AnalysisRunInformer,
analysisTemplateInformer informers.AnalysisTemplateInformer,
resyncPeriod time.Duration,
instanceID string,
metricsPort int,
) *Manager {

Expand Down
28 changes: 28 additions & 0 deletions experiments/analysisrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,34 @@ func TestCreateAnalysisRunWhenAvailable(t *testing.T) {
assert.Equal(t, v1alpha1.AnalysisPhasePending, patchedEx.Status.AnalysisRuns[0].Phase)
}

// TestCreateAnalysisRunWithInstanceID ensures we add an instance ID to the AnalysisRun
func TestCreateAnalysisRunWithInstanceID(t *testing.T) {
templates := generateTemplates("bar")
aTemplates := generateAnalysisTemplates("success-rate")
e := newExperiment("foo", templates, "")
e.Labels = map[string]string{v1alpha1.LabelKeyControllerInstanceID: "my-instance-id"}
e.Spec.Analyses = []v1alpha1.ExperimentAnalysisTemplateRef{
{
Name: "success-rate",
TemplateName: aTemplates[0].Name,
},
}
e.Status.Phase = v1alpha1.AnalysisPhaseRunning
e.Status.AvailableAt = now()
rs := templateToRS(e, templates[0], 1)
ar := analysisTemplateToRun("success-rate", e, &aTemplates[0].Spec)

f := newFixture(t, e, rs, &aTemplates[0])
defer f.Close()

createIndex := f.expectCreateAnalysisRunAction(ar)
f.expectPatchExperimentAction(e)
f.run(getKey(e, t))

createdAr := f.getCreatedAnalysisRun(createIndex)
assert.Equal(t, "my-instance-id", createdAr.Labels[v1alpha1.LabelKeyControllerInstanceID])
}

// TestAnalysisTemplateNotExists verifies we error the run the template does not exist (before availablility)
func TestAnalysisTemplateNotExists(t *testing.T) {
templates := generateTemplates("bar")
Expand Down
14 changes: 14 additions & 0 deletions experiments/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,20 @@ func (f *fixture) expectPatchAnalysisRunAction(r *v1alpha1.AnalysisRun) int {
return len
}

func (f *fixture) getCreatedAnalysisRun(index int) *v1alpha1.AnalysisRun {
action := filterInformerActions(f.client.Actions())[index]
createAction, ok := action.(core.CreateAction)
if !ok {
assert.Failf(f.t, "Expected Created action, not %s", action.GetVerb())
}
obj := createAction.GetObject()
ar := &v1alpha1.AnalysisRun{}
converter := runtime.NewTestUnstructuredConverter(equality.Semantic)
objMap, _ := converter.ToUnstructured(obj)
runtime.NewTestUnstructuredConverter(equality.Semantic).FromUnstructured(objMap, ar)
return ar
}

func (f *fixture) getCreatedReplicaSet(index int) *appsv1.ReplicaSet {
action := filterInformerActions(f.kubeclient.Actions())[index]
createAction, ok := action.(core.CreateAction)
Expand Down
4 changes: 4 additions & 0 deletions experiments/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ func (ec *experimentContext) newAnalysisRun(analysis v1alpha1.ExperimentAnalysis
if err != nil {
return nil, err
}
instanceID := analysisutil.GetInstanceID(ec.ex)
if instanceID != "" {
run.Labels = map[string]string{v1alpha1.LabelKeyControllerInstanceID: ec.ex.Labels[v1alpha1.LabelKeyControllerInstanceID]}
}
run.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(ec.ex, controllerKind)}
return run, nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/rollouts/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const (
// DefaultReplicaSetScaleDownDeadlineAnnotationKey is the default key attached to an old stable ReplicaSet after
// the rollout transitioned to a new version. It contains the time when the controller can scale down the RS.
DefaultReplicaSetScaleDownDeadlineAnnotationKey = "scale-down-deadline"

// LabelKeyControllerInstanceID is the label the controller uses for the rollout, experiment, analysis segregation
// between controllers. Controllers will only operate on objects with the same instanceID as the controller.
LabelKeyControllerInstanceID = "argo-rollouts.argoproj.io/controller-instance-id"
)

// RolloutStrategy defines strategy to apply during next rollout
Expand Down
7 changes: 7 additions & 0 deletions pkg/kubectl-argo-rollouts/cmd/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type CreateAnalysisRunOptions struct {

Name string
GenerateName string
InstanceID string
ArgFlags []string
From string
FromFile string
Expand Down Expand Up @@ -232,6 +233,11 @@ func NewCmdCreateAnalysisRun(o *options.ArgoRolloutsOptions) *cobra.Command {
if err != nil {
return err
}
if createOptions.InstanceID != "" {
run.Labels = map[string]string{
v1alpha1.LabelKeyControllerInstanceID: createOptions.InstanceID,
}
}
created, err := createOptions.RolloutsClientset().ArgoprojV1alpha1().AnalysisRuns(ns).Create(run)
if err != nil {
return err
Expand All @@ -243,6 +249,7 @@ func NewCmdCreateAnalysisRun(o *options.ArgoRolloutsOptions) *cobra.Command {
o.AddKubectlFlags(cmd)
cmd.Flags().StringVar(&createOptions.Name, "name", "", "Use the specified name for the run")
cmd.Flags().StringVar(&createOptions.GenerateName, "generate-name", "", "Use the specified generateName for the run")
cmd.Flags().StringVar(&createOptions.InstanceID, "instance-id", "", "Instance-ID for the AnalysisRun")
cmd.Flags().StringArrayVarP(&createOptions.ArgFlags, "argument", "a", []string{}, "Arguments to the parameter template")
cmd.Flags().StringVar(&createOptions.From, "from", "", "Create an AnalysisRun from an AnalysisTemplate in the cluster")
cmd.Flags().StringVar(&createOptions.FromFile, "from-file", "", "Create an AnalysisRun from an AnalysisTemplate in a local file")
Expand Down
25 changes: 25 additions & 0 deletions pkg/kubectl-argo-rollouts/cmd/create/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
core "k8s.io/client-go/testing"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
fakeroclient "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -151,6 +155,27 @@ func TestCreateAnalysisRunName(t *testing.T) {
assert.Empty(t, stderr)
}

func TestCreateAnalysisRunWithInstanceID(t *testing.T) {
tf, o := options.NewFakeArgoRolloutsOptions()
defer tf.Cleanup()
fakeClient := o.RolloutsClient.(*fakeroclient.Clientset)
cmd := NewCmdCreateAnalysisRun(o)
cmd.PersistentPreRunE = o.PersistentPreRunE
cmd.SetArgs([]string{"--from-file", "testdata/analysis-template.yaml", "-a", "foo=bar", "--name", "my-run", "--instance-id", "test"})
err := cmd.Execute()
assert.NoError(t, err)
stdout := o.Out.(*bytes.Buffer).String()
stderr := o.ErrOut.(*bytes.Buffer).String()
assert.Equal(t, "analysisrun.argoproj.io/my-run created\n", stdout)
assert.Empty(t, stderr)
assert.Len(t, fakeClient.Actions(), 1)
action := fakeClient.Actions()[0].(core.CreateAction)
objMap, err := runtime.NewTestUnstructuredConverter(equality.Semantic).ToUnstructured(action.GetObject())
assert.Nil(t, err)
obj := unstructured.Unstructured{Object: objMap}
assert.Equal(t, obj.GetLabels()[v1alpha1.LabelKeyControllerInstanceID], "test")
}

func TestCreateJSON(t *testing.T) {
tf, o := options.NewFakeArgoRolloutsOptions()
defer tf.Cleanup()
Expand Down
7 changes: 4 additions & 3 deletions rollout/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func (c *RolloutController) reconcileBackgroundAnalysisRun(roCtx *canaryContext)
}
if currentAr == nil {
podHash := replicasetutil.GetPodTemplateHash(newRS)
backgroundLabels := analysisutil.BackgroundLabels(podHash)
instanceID := analysisutil.GetInstanceID(rollout)
backgroundLabels := analysisutil.BackgroundLabels(podHash, instanceID)
currentAr, err := c.createAnalysisRun(roCtx, rollout.Spec.Strategy.Canary.Analysis, nil, backgroundLabels)
if err == nil {
roCtx.Log().WithField(logutil.AnalysisRunKey, currentAr.Name).Info("Created background AnalysisRun")
Expand Down Expand Up @@ -168,7 +169,8 @@ func (c *RolloutController) reconcileStepBasedAnalysisRun(roCtx *canaryContext)
}
if currentAr == nil {
podHash := replicasetutil.GetPodTemplateHash(newRS)
stepLabels := analysisutil.StepLabels(*index, podHash)
instanceID := analysisutil.GetInstanceID(rollout)
stepLabels := analysisutil.StepLabels(*index, podHash, instanceID)
currentAr, err := c.createAnalysisRun(roCtx, step.Analysis, index, stepLabels)
if err == nil {
roCtx.Log().WithField(logutil.AnalysisRunKey, currentAr.Name).Infof("Created AnalysisRun for step '%d'", *index)
Expand Down Expand Up @@ -224,7 +226,6 @@ func (c *RolloutController) newAnalysisRunFromRollout(roCtx *canaryContext, roll
}
nameParts = append(nameParts, rolloutAnalysisStep.TemplateName)
name := strings.Join(nameParts, "-")

run, err := analysisutil.NewAnalysisRunFromTemplate(template, args, name, "", r.Namespace)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions rollout/analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func analysisRun(at *v1alpha1.AnalysisTemplate, analysisRunType string, r *v1alp
labels := map[string]string{}
podHash := controller.ComputeHash(&r.Spec.Template, r.Status.CollisionCount)
if analysisRunType == v1alpha1.RolloutTypeStepLabel {
labels = analysisutil.StepLabels(*r.Status.CurrentStepIndex, podHash)
labels = analysisutil.StepLabels(*r.Status.CurrentStepIndex, podHash, "")
} else if analysisRunType == v1alpha1.RolloutTypeBackgroundRunLabel {
labels = analysisutil.BackgroundLabels(podHash)
labels = analysisutil.BackgroundLabels(podHash, "")
}
return &v1alpha1.AnalysisRun{
ObjectMeta: metav1.ObjectMeta{
Expand Down
5 changes: 5 additions & 0 deletions rollout/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func GetExperimentFromTemplate(r *v1alpha1.Rollout, stableRS, newRS *appsv1.Repl
},
}

instanceID := analysisutil.GetInstanceID(r)
if instanceID != "" {
experiment.Labels[v1alpha1.LabelKeyControllerInstanceID] = instanceID
}

for i := range step.Templates {
templateStep := step.Templates[i]
template := v1alpha1.TemplateSpec{
Expand Down
44 changes: 44 additions & 0 deletions rollout/experiment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,47 @@ func TestCancelExperimentWhenAborted(t *testing.T) {
f.expectPatchRolloutAction(r2)
f.run(getKey(r2, t))
}

func TestRolloutCreateExperimentWithInstanceID(t *testing.T) {
f := newFixture(t)
defer f.Close()

at := analysisTemplate("bar")
steps := []v1alpha1.CanaryStep{{
Experiment: &v1alpha1.RolloutExperimentStep{
Templates: []v1alpha1.RolloutExperimentTemplate{{
Name: "stable-template",
SpecRef: v1alpha1.StableSpecRef,
Replicas: pointer.Int32Ptr(1),
}},
Analyses: []v1alpha1.RolloutExperimentStepAnalysisTemplateRef{{
Name: "test",
TemplateName: at.Name,
}},
},
}}

r1 := newCanaryRollout("foo", 1, nil, steps, pointer.Int32Ptr(0), intstr.FromInt(0), intstr.FromInt(1))
r2 := bumpVersion(r1)
r2.Labels = map[string]string{v1alpha1.LabelKeyControllerInstanceID: "instance-id-test"}

rs1 := newReplicaSetWithStatus(r1, 1, 1)
rs2 := newReplicaSetWithStatus(r2, 0, 0)
f.kubeobjects = append(f.kubeobjects, rs1, rs2)
f.replicaSetLister = append(f.replicaSetLister, rs1, rs2)
rs1PodHash := rs1.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]

ex, _ := GetExperimentFromTemplate(r2, rs1, rs2)
r2 = updateCanaryRolloutStatus(r2, rs1PodHash, 1, 0, 1, false)

f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

createExIndex := f.expectCreateExperimentAction(ex)
f.expectPatchRolloutAction(r1)

f.run(getKey(r2, t))
createdEx := f.getCreatedExperiment(createExIndex)
assert.Equal(t, createdEx.Name, ex.Name)
assert.Equal(t, "instance-id-test", createdEx.Labels[v1alpha1.LabelKeyControllerInstanceID])
}
17 changes: 13 additions & 4 deletions utils/analysis/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,30 @@ func BuildArgumentsForRolloutAnalysisRun(args []v1alpha1.AnalysisRunArgument, st
}

// BackgroundLabels returns a map[string]string of common labels for the background analysis
func BackgroundLabels(podHash string) map[string]string {
return map[string]string{
func BackgroundLabels(podHash, instanceID string) map[string]string {
labels := map[string]string{
v1alpha1.DefaultRolloutUniqueLabelKey: podHash,
v1alpha1.RolloutTypeLabel: v1alpha1.RolloutTypeBackgroundRunLabel,
}
if instanceID != "" {
labels[v1alpha1.LabelKeyControllerInstanceID] = instanceID
}
return labels

}

// StepLabels returns a map[string]string of common labels for analysisruns created from an analysis step
func StepLabels(index int32, podHash string) map[string]string {
func StepLabels(index int32, podHash, instanceID string) map[string]string {
indexStr := strconv.Itoa(int(index))
return map[string]string{
labels := map[string]string{
v1alpha1.DefaultRolloutUniqueLabelKey: podHash,
v1alpha1.RolloutTypeLabel: v1alpha1.RolloutTypeStepLabel,
v1alpha1.RolloutCanaryStepIndexLabel: indexStr,
}
if instanceID != "" {
labels[v1alpha1.LabelKeyControllerInstanceID] = instanceID
}
return labels
}

// ValidateMetrics validates an analysis template spec
Expand Down
6 changes: 4 additions & 2 deletions utils/analysis/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ func TestStepLabels(t *testing.T) {
v1alpha1.DefaultRolloutUniqueLabelKey: podHash,
v1alpha1.RolloutTypeLabel: v1alpha1.RolloutTypeStepLabel,
v1alpha1.RolloutCanaryStepIndexLabel: "1",
v1alpha1.LabelKeyControllerInstanceID: "test",
}
generated := StepLabels(1, podHash)
generated := StepLabels(1, podHash, "test")
assert.Equal(t, expected, generated)
}

Expand All @@ -69,8 +70,9 @@ func TestBackgroundLabels(t *testing.T) {
expected := map[string]string{
v1alpha1.DefaultRolloutUniqueLabelKey: podHash,
v1alpha1.RolloutTypeLabel: v1alpha1.RolloutTypeBackgroundRunLabel,
v1alpha1.LabelKeyControllerInstanceID: "test",
}
generated := BackgroundLabels(podHash)
generated := BackgroundLabels(podHash, "test")
assert.Equal(t, expected, generated)
}

Expand Down
19 changes: 19 additions & 0 deletions utils/analysis/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
log "github.com/sirupsen/logrus"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
patchtypes "k8s.io/apimachinery/pkg/types"

argoprojclient "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/typed/rollouts/v1alpha1"
Expand Down Expand Up @@ -205,3 +207,20 @@ func NewAnalysisRunFromTemplate(template *v1alpha1.AnalysisTemplate, args []v1al
}
return &ar, nil
}

// GetInstanceID takes an object and returns the controller instance id if it has one
func GetInstanceID(obj runtime.Object) string {
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
// The objects passed into this function are already valid Kubernetes objects stored in the API server.
// This function errors when the object passed can't be converted to a map[string]string. As a result,
// the object passed in will never fail and the controller should panic in that case.
panic(err)
}
uObj := unstructured.Unstructured{Object: objMap}
labels := uObj.GetLabels()
if labels != nil {
return labels[v1alpha1.LabelKeyControllerInstanceID]
}
return ""
}
Loading