Skip to content

Commit

Permalink
TEP-0135: Refactor Affinity Assistant PVC creation
Browse files Browse the repository at this point in the history
Part of [tektoncd#6740][tektoncd#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator
to ensure that all of a PipelineRun's pods are scheduled to the same node.

Before this commit, the PipelineRun reconciler creates PVC for each `VolumeClaimTemplate` backed workspace,
and mount the PVCs to the AA to avoid PV availability zone conflict.
This implementation works for `AffinityAssistantPerWorkspace` but introduces availability zone conflict
issue in the `AffinityAssistantPerPipelineRun` mode since we cannot enforce all the PVC are created in the same availability zone.

Instead of directly creating a PVC for each PipelineRun workspace backed by a VolumeClaimTemplate,
this commit sets one VolumeClaimTemplate per PVC workspace in the affinity assistant StatefulSet spec,
which enforces all VolumeClaimTemplates in StatefulSets are all provisioned on the same node/availability zone.

This commit just refactors the current implementation in favor of the `AffinityAssistantPerPipelineRun` feature.
There is no functionality change. The `AffinityAssistantPerPipelineRun` feature will be added in the follow up PRs.

[tektoncd#6740]: tektoncd#6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jun 5, 2023
1 parent 264476b commit 243d002
Show file tree
Hide file tree
Showing 8 changed files with 400 additions and 155 deletions.
172 changes: 101 additions & 71 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,73 +56,74 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []
var errs []error
var unschedulableNodes sets.Set[string] = nil
for _, w := range wb {
if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
claimName := getClaimName(w, *kmeta.NewControllerRef(pr))
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil {
continue
}

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
if w.PersistentVolumeClaim != nil {
claims = append(claims, *w.PersistentVolumeClaim.DeepCopy())
} else if w.VolumeClaimTemplate != nil {
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
claimTemplate.Name = volumeclaim.GetPersistentVolumeClaimNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
claimTemplates = append(claimTemplates, *claimTemplate)
}

affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
}
return errorutils.NewAggregate(errs)
}

func getClaimName(w v1beta1.WorkspaceBinding, ownerReference metav1.OwnerReference) string {
if w.PersistentVolumeClaim != nil {
return w.PersistentVolumeClaim.ClaimName
} else if w.VolumeClaimTemplate != nil {
return volumeclaim.GetPersistentVolumeClaimName(w.VolumeClaimTemplate, w, ownerReference)
}

return ""
}

func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error {
// omit cleanup if the feature is disabled
if c.isAffinityAssistantDisabled(ctx) {
Expand All @@ -136,11 +137,28 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.
if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantStsName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantStsName, err))
}

// cleanup PVCs created by Affinity Assistants
if w.VolumeClaimTemplate != nil {
pvcName := getPersistentVolumeClaimNameWithAffinityAssistant(w.Name, pr.Name, w, *kmeta.NewControllerRef(pr))
if err := c.KubeClientSet.CoreV1().PersistentVolumeClaims(pr.Namespace).Delete(ctx, pvcName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("failed to delete PersistentVolumeClaim %s: %w", pvcName, err))
}
}
}
}
return errorutils.NewAggregate(errs)
}

// getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is
// created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled.
// The PVCs created by StatefulSet VolumeClaimTemplates follow the format `<pvcName>-<affinityAssistantName>-0`
func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1beta1.WorkspaceBinding, owner metav1.OwnerReference) string {
pvcName := volumeclaim.GetPersistentVolumeClaimNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner)
affinityAssistantName := getAffinityAssistantName(pipelineWorkspaceName, prName)
return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName)
}

func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName))
hashString := fmt.Sprintf("%x", hashBytes)
Expand All @@ -162,7 +180,7 @@ func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string)
return labels
}

func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimName string, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
// We want a singleton pod
replicas := int32(1)

Expand All @@ -172,6 +190,11 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
tpl = pod.MergeAAPodTemplateWithDefault(pr.Spec.PodTemplate.ToAffinityAssistantTemplate(), defaultAATpl)
}

var mounts []corev1.VolumeMount
for _, claimTemplate := range claimTemplates {
mounts = append(mounts, corev1.VolumeMount{Name: claimTemplate.Name, MountPath: claimTemplate.Name})
}

containers := []corev1.Container{{
Name: "affinity-assistant",
Image: affinityAssistantImage,
Expand All @@ -190,8 +213,27 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
"memory": resource.MustParse("100Mi"),
},
},
VolumeMounts: mounts,
}}

var volumes []corev1.Volume
for i, claim := range claims {
volumes = append(volumes, corev1.Volume{
Name: fmt.Sprintf("workspace-%d", i),
VolumeSource: corev1.VolumeSource{
// A Pod mounting a PersistentVolumeClaim that has a StorageClass with
// volumeBindingMode: Immediate
// the PV is allocated on a Node first, and then the pod need to be
// scheduled to that node.
// To support those PVCs, the Affinity Assistant must also mount the
// same PersistentVolumeClaim - to be sure that the Affinity Assistant
// pod is scheduled to the same Availability Zone as the PV, when using
// a regional cluster. This is called VolumeScheduling.
PersistentVolumeClaim: claim.DeepCopy(),
},
})
}

return &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
Expand All @@ -207,6 +249,8 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
Selector: &metav1.LabelSelector{
MatchLabels: getStatefulSetLabels(pr, name),
},
// by setting VolumeClaimTemplates from StatefulSet, all the PVs are scheduled to the same Availability Zone as the StatefulSet
VolumeClaimTemplates: claimTemplates,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: getStatefulSetLabels(pr, name),
Expand All @@ -219,21 +263,7 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
ImagePullSecrets: tpl.ImagePullSecrets,

Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr),
Volumes: []corev1.Volume{{
Name: "workspace",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
// A Pod mounting a PersistentVolumeClaim that has a StorageClass with
// volumeBindingMode: Immediate
// the PV is allocated on a Node first, and then the pod need to be
// scheduled to that node.
// To support those PVCs, the Affinity Assistant must also mount the
// same PersistentVolumeClaim - to be sure that the Affinity Assistant
// pod is scheduled to the same Availability Zone as the PV, when using
// a regional cluster. This is called VolumeScheduling.
ClaimName: claimName,
}},
}},
Volumes: volumes,
},
},
},
Expand Down
Loading

0 comments on commit 243d002

Please sign in to comment.