Skip to content

Commit

Permalink
[TEP-0135] Coschedule per (isolate) PipelineRun
Browse files Browse the repository at this point in the history
Part of [tektoncd#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure
that all of a PipelineRun's pods are scheduled to the same node.

This commit consumes the functions added in [tektoncd#6819] to implement end to end support of `Coschedule:PipelineRuns` where all the `PipelineRun pods` are scheduled to the same node,
and the `Coschedule:isolate-pipelinerun` coschedule modes where only 1 PipelineRun is allowed to run in a node at the same time.

/kind feature

[tektoncd#6819]: tektoncd#6819
[tektoncd#6740]: tektoncd#6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jul 26, 2023
1 parent 1eef4db commit 224cba3
Show file tree
Hide file tree
Showing 7 changed files with 465 additions and 143 deletions.
46 changes: 25 additions & 21 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/workspace"
Expand Down Expand Up @@ -102,15 +103,13 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
}
}
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
if claimNames != nil || claimTemplates != nil {
aaName := GetAffinityAssistantName("", pr.Name)
// The PVCs are created via StatefulSet's VolumeClaimTemplate for volume scheduling
// in AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes.
// This is because PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time in these modes,
// and there is no requirement of the PVC OwnerReference.
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claimNames, unschedulableNodes); err != nil {
return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err)
}
aaName := GetAffinityAssistantName("", pr.Name)
// The PVCs are created via StatefulSet's VolumeClaimTemplate for volume scheduling
// in AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes.
// This is because PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time in these modes,
// and there is no requirement of the PVC OwnerReference.
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claimNames, unschedulableNodes); err != nil {
return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err)
}
case aa.AffinityAssistantDisabled:
for _, workspace := range claimTemplateToWorkspace {
Expand Down Expand Up @@ -232,13 +231,29 @@ func (c *Reconciler) cleanupAffinityAssistantsAndPVCs(ctx context.Context, pr *v
// getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is
// created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled.
// The PVCs created by StatefulSet VolumeClaimTemplates follow the format `<pvcName>-<affinityAssistantName>-0`
// TODO(#6740)(WIP): use this function when adding end-to-end support for AffinityAssistantPerPipelineRun mode
func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1.WorkspaceBinding, owner metav1.OwnerReference) string {
pvcName := volumeclaim.GeneratePVCNameFromWorkspaceBinding(wb.VolumeClaimTemplate.Name, wb, owner)
affinityAssistantName := GetAffinityAssistantName(pipelineWorkspaceName, prName)
return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName)
}

// getAffinityAssistantAnnotationVal generates and returns the value for `pipeline.tekton.dev/affinity-assistant` annotation
// based on aaBehavior, pipelinePVCWorkspaceName and prName
func getAffinityAssistantAnnotationVal(aaBehavior affinityassistant.AffinityAssistantBehavior, pipelinePVCWorkspaceName string, prName string) string {
switch aaBehavior {
case affinityassistant.AffinityAssistantPerWorkspace:
if pipelinePVCWorkspaceName != "" {
return GetAffinityAssistantName(pipelinePVCWorkspaceName, prName)
}
case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation:
return GetAffinityAssistantName("", prName)

case affinityassistant.AffinityAssistantDisabled:
}

return ""
}

// GetAffinityAssistantName returns the Affinity Assistant name based on pipelineWorkspaceName and pipelineRunName
func GetAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName))
Expand Down Expand Up @@ -355,17 +370,6 @@ func affinityAssistantStatefulSet(aaBehavior aa.AffinityAssistantBehavior, name
}
}

// isAffinityAssistantDisabled returns a bool indicating whether an Affinity Assistant should
// be created for each PipelineRun that use workspaces with PersistentVolumeClaims
// as volume source. The default behaviour is to enable the Affinity Assistant to
// provide Node Affinity for TaskRuns that share a PVC workspace.
//
// TODO(#6740)(WIP): replace this function with GetAffinityAssistantBehavior
func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool {
cfg := config.FromContextOrDefaults(ctx)
return cfg.FeatureFlags.DisableAffinityAssistant
}

// getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity.
func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun, aaBehavior aa.AffinityAssistantBehavior) *corev1.Affinity {
affinityAssistantsAffinity := &corev1.Affinity{}
Expand Down
185 changes: 130 additions & 55 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
)

var podSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodSpec{}, "Containers", "Affinity")
var statefulSetSpecFilter cmp.Option = cmpopts.IgnoreFields(appsv1.StatefulSetSpec{}, "Replicas", "Selector")
var podTemplateSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodTemplateSpec{}, "ObjectMeta")

var workspacePVCName = "test-workspace-pvc"
Expand Down Expand Up @@ -112,6 +111,7 @@ var testPRWithEmptyDir = &v1.PipelineRun{
// TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun tests to create and delete Affinity Assistants and PVCs
// per pipelinerun for a given PipelineRun
func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) {
replicas := int32(1)
tests := []struct {
name string
pr *v1.PipelineRun
Expand All @@ -120,6 +120,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) {
name: "PersistentVolumeClaim Workspace type",
pr: testPRWithPVC,
expectStatefulSetSpec: &appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
pipeline.PipelineRunLabelKey: testPRWithPVC.Name,
workspace.LabelInstance: "affinity-assistant-622aca4516",
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{{
Expand All @@ -135,6 +143,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) {
name: "VolumeClaimTemplate Workspace type",
pr: testPRWithVolumeClaimTemplate,
expectStatefulSetSpec: &appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplate.Name,
workspace.LabelInstance: "affinity-assistant-426b306c50",
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"},
}},
Expand All @@ -143,6 +159,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) {
name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces",
pr: testPRWithVolumeClaimTemplateAndPVC,
expectStatefulSetSpec: &appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name,
workspace.LabelInstance: "affinity-assistant-5bf44db4a8",
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"},
}},
Expand All @@ -158,16 +182,31 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) {
},
},
}, {
name: "other Workspace type",
pr: testPRWithEmptyDir,
expectStatefulSetSpec: nil,
name: "other Workspace type",
pr: testPRWithEmptyDir,
expectStatefulSetSpec: &appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
pipeline.PipelineRunLabelKey: testPRWithEmptyDir.Name,
workspace.LabelInstance: "affinity-assistant-c655a0c8a2",
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
},
}}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
configMap := map[string]string{
"disable-affinity-assistant": "true",
"coschedule": "pipelineruns",
}
kubeClientSet := fakek8s.NewSimpleClientset()
ctx := cfgtesting.SetFeatureFlags(context.Background(), t, configMap)
c := Reconciler{
KubeClientSet: fakek8s.NewSimpleClientset(),
KubeClientSet: kubeClientSet,
pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()),
}

err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun)
Expand All @@ -179,14 +218,23 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) {
expectAAName := GetAffinityAssistantName("", tc.pr.Name)
validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec)

// TODO(#6740)(WIP): test cleanupAffinityAssistantsAndPVCs for coscheduling-pipelinerun mode when fully implemented
// clean up Affinity Assistant
c.cleanupAffinityAssistantsAndPVCs(ctx, tc.pr)
if err != nil {
t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err)
}
_, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectAAName, metav1.GetOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected a NotFound response, got: %v", err)
}
})
}
}

// TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled tests to create and delete Affinity Assistants and PVCs
// per workspace or disabled for a given PipelineRun
func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testing.T) {
replicas := int32(1)
tests := []struct {
name, expectedPVCName string
pr *v1.PipelineRun
Expand All @@ -197,6 +245,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testin
aaBehavior: aa.AffinityAssistantPerWorkspace,
pr: testPRWithPVC,
expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
pipeline.PipelineRunLabelKey: testPRWithPVC.Name,
workspace.LabelInstance: "affinity-assistant-ac9f8fc5ee",
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{{
Expand All @@ -214,6 +270,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testin
pr: testPRWithVolumeClaimTemplate,
expectedPVCName: "pvc-b9eea16dce",
expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplate.Name,
workspace.LabelInstance: "affinity-assistant-4cf1a1c468",
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{{
Expand All @@ -236,6 +300,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testin
pr: testPRWithVolumeClaimTemplateAndPVC,
expectedPVCName: "pvc-b9eea16dce",
expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name,
workspace.LabelInstance: "affinity-assistant-6c87e714a0",
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{{
Expand All @@ -246,6 +318,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testin
}},
},
}}, {
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name,
workspace.LabelInstance: "affinity-assistant-6399c93362",
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{{
Expand Down Expand Up @@ -880,53 +960,6 @@ func TestThatCleanupIsAvoidedIfAssistantIsDisabled(t *testing.T) {
}
}

func TestDisableAffinityAssistant(t *testing.T) {
for _, tc := range []struct {
description string
configMap *corev1.ConfigMap
expected bool
}{{
description: "Default behaviour: A missing disable-affinity-assistant flag should result in false",
configMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{},
},
expected: false,
}, {
description: "Setting disable-affinity-assistant to false should result in false",
configMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
featureFlagDisableAffinityAssistantKey: "false",
},
},
expected: false,
}, {
description: "Setting disable-affinity-assistant to true should result in true",
configMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
featureFlagDisableAffinityAssistantKey: "true",
},
},
expected: true,
}} {
t.Run(tc.description, func(t *testing.T) {
c := Reconciler{
KubeClientSet: fakek8s.NewSimpleClientset(
tc.configMap,
),
Images: pipeline.Images{},
}
store := config.NewStore(logtesting.TestLogger(t))
store.OnConfigChanged(tc.configMap)
if result := c.isAffinityAssistantDisabled(store.ToContext(context.Background())); result != tc.expected {
t.Errorf("Expected %t Received %t", tc.expected, result)
}
})
}
}

func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) {
labelSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{
Expand Down Expand Up @@ -1101,6 +1134,48 @@ spec:
}
}

func TestGetAffinityAssistantAnnotationVal(t *testing.T) {
tcs := []struct {
name string
aaBehavior aa.AffinityAssistantBehavior
wsName, prName, expectAffinityAssistantAnnotationVal string
}{{
name: "per workspace",
aaBehavior: aa.AffinityAssistantPerWorkspace,
wsName: "my-ws",
prName: "my-pipelinerun",
expectAffinityAssistantAnnotationVal: "affinity-assistant-315f58d30d",
}, {
name: "per workspace - empty pipeline workspace name",
aaBehavior: aa.AffinityAssistantPerWorkspace,
prName: "my-pipelinerun",
}, {
name: "per pipelinerun",
aaBehavior: aa.AffinityAssistantPerPipelineRun,
wsName: "my-ws",
prName: "my-pipelinerun",
expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50",
}, {
name: "isolate pipelinerun",
aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation,
wsName: "my-ws",
prName: "my-pipelinerun",
expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50",
}, {
name: "disabled",
aaBehavior: aa.AffinityAssistantDisabled,
wsName: "my-ws",
prName: "my-pipelinerun",
}}

for _, tc := range tcs {
aaAnnotationVal := getAffinityAssistantAnnotationVal(tc.aaBehavior, tc.wsName, tc.prName)
if diff := cmp.Diff(tc.expectAffinityAssistantAnnotationVal, aaAnnotationVal); diff != "" {
t.Errorf("Affinity Assistant Annotation Val mismatch: %v", diff)
}
}
}

type Data struct {
StatefulSets []*appsv1.StatefulSet
Nodes []*corev1.Node
Expand Down Expand Up @@ -1139,7 +1214,7 @@ func validateStatefulSetSpec(t *testing.T, ctx context.Context, c Reconciler, ex
if err != nil {
t.Fatalf("unexpected error when retrieving StatefulSet: %v", err)
}
if d := cmp.Diff(expectStatefulSetSpec, &aa.Spec, statefulSetSpecFilter, podSpecFilter, podTemplateSpecFilter); d != "" {
if d := cmp.Diff(expectStatefulSetSpec, &aa.Spec, podSpecFilter, podTemplateSpecFilter); d != "" {
t.Errorf("StatefulSetSpec diff: %s", diff.PrintWantGot(d))
}
} else if !apierrors.IsNotFound(err) {
Expand Down
Loading

0 comments on commit 224cba3

Please sign in to comment.