Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding SubdomainPolicy to support a service per replica #197

Merged
merged 17 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ const (

// Pods that are part of the same subgroup will have the same unique hash value.
SubGroupUniqueHashLabelKey string = "leaderworkerset.sigs.k8s.io/subgroup-key"

// Leader pods will have an annotation that determines what type of domain
// will be injected. Corresponds to LeaderWorkerSet.Spec.NetworkConfig.SubdomainPolicy
SubdomainPolicyAnnotationKey string = "leaderworkerset.sigs.k8s.io/subdomainPolicy"
)

// One group consists of a single leader and M workers, and the total number of pods in a group is M+1.
Expand Down Expand Up @@ -119,6 +123,10 @@ type LeaderWorkerSetSpec struct {
// +kubebuilder:validation:Enum={LeaderCreated,LeaderReady}
// +optional
StartupPolicy StartupPolicyType `json:"startupPolicy"`

// NetworkConfig defines the network configuration of the group
// +optional
NetworkConfig *NetworkConfig `json:"networkConfig,omitempty"`
}

// Template of the leader/worker pods, the group will include at least one leader pod.
Expand Down Expand Up @@ -180,6 +188,27 @@ type SubGroupPolicy struct {
SubGroupSize *int32 `json:"subGroupSize,omitempty"`
}

type NetworkConfig struct {
// SubdomainPolicy determines the policy that will be used when creating
// the headless service
SubdomainPolicy SubdomainPolicy `json:"subdomainPolicy"`
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
}

type SubdomainPolicy string

const (
// SubdomainShared will create a single headless service that all replicas
// will share. The host names look like:
// Replica 0: my-lws-0.my-lws, my-lws-0-1.my-lws
// Replica 1: my-lws-1.my-lws, my-lws-1-1.my-lws
SubdomainShared SubdomainPolicy = "Shared"
// UniquePerReplica will create a headless service per replica
// The pod host names look like:
// Replica 0: my-lws-0.my-lws-0,my-lws-0-1.my-lws-0, my-lws-0-2.my-lws-0
// Replica 1: my-lws-1.my-lws-1,my-lws-1-1.my-lws-1, my-lws-1-2.my-lws-1
SubdomainUniquePerReplica SubdomainPolicy = "UniquePerReplica"
)

// RollingUpdateConfiguration defines the parameters to be used for RollingUpdateStrategyType.
type RollingUpdateConfiguration struct {
// The maximum number of replicas that can be unavailable during the update.
Expand Down
20 changes: 20 additions & 0 deletions api/leaderworkerset/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions client-go/applyconfiguration/leaderworkerset/v1/networkconfig.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client-go/applyconfiguration/utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions config/crd/bases/leaderworkerset.x-k8s.io_leaderworkersets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16105,6 +16105,18 @@ spec:
required:
- workerTemplate
type: object
networkConfig:
description: NetworkConfig defines the network configuration of the
group
properties:
subdomainPolicy:
description: |-
SubdomainPolicy determines the policy that will be used when creating
the headless service
type: string
required:
- subdomainPolicy
type: object
replicas:
default: 1
description: |-
Expand Down
101 changes: 93 additions & 8 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

// Create headless service if it does not exist.
if err := r.createHeadlessServiceIfNotExists(ctx, lws); err != nil {
if err := r.reconcileHeadlessServices(ctx, lws, replicas); err != nil {
log.Error(err, "Creating headless service.")
r.Record.Eventf(lws, corev1.EventTypeWarning, FailedCreate,
fmt.Sprintf("Failed to create headless service for error: %v", err))
Expand All @@ -122,27 +122,61 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}

func (r *LeaderWorkerSetReconciler) createHeadlessServiceIfNotExists(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
func (r *LeaderWorkerSetReconciler) reconcileHeadlessServices(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, replicas int32) error {

if lws.Spec.NetworkConfig == nil || lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
if err := r.createHeadlessServiceIfNotExists(ctx, lws, lws.Name, map[string]string{leaderworkerset.SetNameLabelKey: lws.Name}); err != nil {
return err
}
return nil
}
// using replicas instead of lws.spec.replicas as needed to create services for burst replicas during maxSurge
for i := 0; i < int(replicas); i++ {
err := r.createHeadlessServiceIfNotExists(ctx, lws, fmt.Sprintf("%s-%s", lws.Name, strconv.Itoa(i)), map[string]string{leaderworkerset.GroupIndexLabelKey: strconv.Itoa(i)})
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
}

var headlessServiceList corev1.ServiceList
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
if err := r.List(ctx, &headlessServiceList, client.InNamespace(lws.Namespace)); err != nil {
return err
}

if len(headlessServiceList.Items) == int(replicas) {
return nil
}

// In case of scale down, need to delete the services of the replicas that will be deleted
for i := len(headlessServiceList.Items); i > int(replicas); i-- {
err := r.deleteHeadlessServiceIfExists(ctx, lws, fmt.Sprintf("%s-%s", lws.Name, strconv.Itoa(i-1)))
if err != nil {
return err
}
}
return nil
}

func (r *LeaderWorkerSetReconciler) createHeadlessServiceIfNotExists(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, serviceName string, serviceSelector map[string]string) error {
log := ctrl.LoggerFrom(ctx)
// If the headless service does not exist in the namespace, create it.
var headlessService corev1.Service
if err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &headlessService); err != nil {
if err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: lws.Namespace}, &headlessService); err != nil {
if client.IgnoreNotFound(err) != nil {
return err
}
headlessService := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: lws.Name,
Name: serviceName,
Namespace: lws.Namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "None", // defines service as headless
Selector: map[string]string{
leaderworkerset.SetNameLabelKey: lws.Name,
},
ClusterIP: "None", // defines service as headless
Selector: serviceSelector,
PublishNotReadyAddresses: true,
},
}

// Set the controller owner reference for garbage collection and reconciliation.
if err := ctrl.SetControllerReference(lws, &headlessService, r.Scheme); err != nil {
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
return err
Expand All @@ -156,6 +190,50 @@ func (r *LeaderWorkerSetReconciler) createHeadlessServiceIfNotExists(ctx context
return nil
}

func (r *LeaderWorkerSetReconciler) deleteMultipleHeadlessSevices(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {

podSelector := client.MatchingLabels(map[string]string{
leaderworkerset.SetNameLabelKey: lws.Name,
leaderworkerset.WorkerIndexLabelKey: "0",
})
var leaderPodList corev1.PodList
if err := r.List(ctx, &leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
return err
}

if len(leaderPodList.Items) == 0 {
return nil
}

subdomainPolicy, foundSubdomainPolicy := leaderPodList.Items[0].Annotations[leaderworkerset.SubdomainPolicyAnnotationKey]
if foundSubdomainPolicy && subdomainPolicy == string(leaderworkerset.SubdomainUniquePerReplica) {
return r.deleteHeadlessServiceIfExists(ctx, lws, lws.Name)
}

for i := 0; i < int(*lws.Spec.Replicas); i++ {
err := r.deleteHeadlessServiceIfExists(ctx, lws, fmt.Sprintf("%s-%s", lws.Name, strconv.Itoa(i)))
if err != nil {
return err
}
}
return nil
}

func (r *LeaderWorkerSetReconciler) deleteHeadlessServiceIfExists(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, serviceName string) error {
var headlessService corev1.Service
if err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: lws.Namespace}, &headlessService); err != nil {
if client.IgnoreNotFound(err) != nil {
return err
}
return nil
}

if err := r.Delete(ctx, &headlessService); err != nil {
return err
}
return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *LeaderWorkerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -262,6 +340,9 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,
// Case 3:
// In normal cases, return the values directly.
if rollingUpdateCompleted {
if err := r.deleteMultipleHeadlessSevices(ctx, lws); err != nil {
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
return 0, 0, err
}
return 0, lwsReplicas, nil
}

Expand Down Expand Up @@ -582,6 +663,10 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor
podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
}
}

if lws.Spec.NetworkConfig != nil && lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
ahg-g marked this conversation as resolved.
Show resolved Hide resolved
podAnnotations[leaderworkerset.SubdomainPolicyAnnotationKey] = string(leaderworkerset.SubdomainUniquePerReplica)
}
podTemplateApplyConfiguration.WithAnnotations(podAnnotations)

// construct statefulset apply configuration
Expand Down
6 changes: 5 additions & 1 deletion pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,14 @@ func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws lead
}
acceleratorutils.AddTPUAnnotations(leaderPod, podAnnotations)
podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
serviceName := leaderPod.Name
if lws.Spec.NetworkConfig == nil || lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
serviceName = lws.Name
}
// construct statefulset apply configuration
statefulSetConfig := appsapplyv1.StatefulSet(leaderPod.Name, leaderPod.Namespace).
WithSpec(appsapplyv1.StatefulSetSpec().
WithServiceName(lws.Name).
WithServiceName(serviceName).
WithReplicas(*lws.Spec.LeaderWorkerTemplate.Size - 1).
WithPodManagementPolicy(appsv1.ParallelPodManagement).
WithTemplate(&podTemplateApplyConfiguration).
Expand Down
4 changes: 1 addition & 3 deletions pkg/utils/pod/pod_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,9 @@ func AddLWSVariables(pod *corev1.Pod) error {
return fmt.Errorf("Failure constructing environment variables, no group index label found for pod %v", klog.KObj(pod))
}

// The headless service name is assumed to be the same as the LWS name.
// See function [createHeadlessServiceIfNotExists](sigs.k8s.io/lws/pkg/controllers/leaderworkerset_controller.go).
leaderAddressEnvVar := corev1.EnvVar{
Name: leaderworkerset.LwsLeaderAddress,
Value: fmt.Sprintf("%s-%s.%s.%s", lwsName, groupIndex, lwsName, pod.ObjectMeta.Namespace),
Value: fmt.Sprintf("%s-%s.%s.%s", lwsName, groupIndex, pod.Spec.Subdomain, pod.ObjectMeta.Namespace),
}

size, found := pod.Annotations[leaderworkerset.SizeAnnotationKey]
Expand Down
7 changes: 6 additions & 1 deletion pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ func NonZeroValue(value int32) int32 {
}

func LeaderWorkerTemplateHash(lws *leaderworkerset.LeaderWorkerSet) string {
if lws.Spec.NetworkConfig == nil {
return Sha1Hash(lws.Spec.LeaderWorkerTemplate.LeaderTemplate.String() +
lws.Spec.LeaderWorkerTemplate.WorkerTemplate.String())
}

return Sha1Hash(lws.Spec.LeaderWorkerTemplate.LeaderTemplate.String() +
lws.Spec.LeaderWorkerTemplate.WorkerTemplate.String())
lws.Spec.LeaderWorkerTemplate.WorkerTemplate.String() + string(lws.Spec.NetworkConfig.SubdomainPolicy))
}

// SortByIndex returns an ascending list, the length of the list is always specified by the parameter.
Expand Down
6 changes: 6 additions & 0 deletions pkg/webhooks/leaderworkerset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func (r *LeaderWorkerSetWebhook) Default(ctx context.Context, obj runtime.Object
MaxSurge: intstr.FromInt32(0),
}
}

if lws.Spec.NetworkConfig == nil {
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
lws.Spec.NetworkConfig = &v1.NetworkConfig{}
lws.Spec.NetworkConfig.SubdomainPolicy = v1.SubdomainShared
}
return nil
}

Expand Down Expand Up @@ -97,6 +102,7 @@ func (r *LeaderWorkerSetWebhook) ValidateUpdate(ctx context.Context, oldObj, new
if newLws.Spec.LeaderWorkerTemplate.SubGroupPolicy == nil && oldLws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
allErrs = append(allErrs, field.Invalid(specPath.Child("leaderWorkerTemplate", "SubGroupPolicy", "subGroupSize"), oldLws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize, "cannot remove subGroupSize after enabled"))
}

ahg-g marked this conversation as resolved.
Show resolved Hide resolved
return nil, allErrs.ToAggregate()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/webhooks/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error {
}
pod.Labels[leaderworkerset.GroupIndexLabelKey] = fmt.Sprint(groupIndex)
}
subdomainPolicy, foundSubdomainPolicy := pod.Annotations[leaderworkerset.SubdomainPolicyAnnotationKey]
if foundSubdomainPolicy && subdomainPolicy == string(leaderworkerset.SubdomainUniquePerReplica) {
pod.Spec.Subdomain = pod.Name
}
// add group unique key label for exclusive placement, and use it to check whether the node affinity has been applied
var groupUniqueKey string
if _, foundGroupKey := pod.Labels[leaderworkerset.GroupUniqueHashLabelKey]; !foundGroupKey {
Expand Down
Loading