Skip to content

Commit

Permalink
Fix scaling when using ingress-addressed nodes (#692)
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Apr 3, 2024
1 parent 8cbb5b0 commit e5c1271
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 46 deletions.
7 changes: 7 additions & 0 deletions api/v1beta1/solrcloud_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,13 @@ func (sc *SolrCloud) GetAllSolrPodNames() []string {
if sc.Spec.Replicas != nil {
replicas = int(*sc.Spec.Replicas)
}
if int(sc.Status.Replicas) > replicas {
replicas = int(sc.Status.Replicas)
}
return sc.GetSolrPodNames(replicas)
}

func (sc *SolrCloud) GetSolrPodNames(replicas int) []string {
podNames := make([]string, replicas)
statefulSetName := sc.StatefulSetName()
for i := range podNames {
Expand Down
22 changes: 19 additions & 3 deletions controllers/solr_cluster_ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func retryNextQueuedClusterOpWithQueue(statefulSet *appsv1.StatefulSet, clusterO
return hasOp, err
}

func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, blockReconciliationOfStatefulSet bool, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
desiredPods := int(*instance.Spec.Replicas)
configuredPods := int(*statefulSet.Spec.Replicas)
if desiredPods != configuredPods {
Expand All @@ -170,11 +170,26 @@ func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudRec
Metadata: strconv.Itoa(configuredPods - 1),
}
} else if desiredPods > configuredPods && (instance.Spec.Scaling.PopulatePodsOnScaleUp == nil || *instance.Spec.Scaling.PopulatePodsOnScaleUp) {
// We need to wait for all pods to become healthy to scale up in a managed fashion, otherwise
// the balancing will skip some pods
if len(podList) < configuredPods {
// There are not enough pods, the statefulSet controller has yet to create the previously desired pods.
// Do not start the scale up until these missing pods are created.
return nil, time.Second * 5, nil
}
// If Solr nodes are advertised by their individual node services (through an ingress)
// then make sure that the host aliases are set for all desired pods before starting a scale-up.
// If the host aliases do not already include the soon-to-be created pods, then Solr might not be able to balance
// replicas onto the new hosts.
// We need to make sure that the StatefulSet is updated with these new hostAliases before the scale up occurs.
if instance.UsesIndividualNodeServices() && instance.Spec.SolrAddressability.External.UseExternalAddress {
for _, pod := range podList {
if len(pod.Spec.HostAliases) < desiredPods {
return nil, time.Second * 5, nil
}
}
}

clusterOp = &SolrClusterOp{
Operation: ScaleUpLock,
Metadata: strconv.Itoa(desiredPods),
Expand Down Expand Up @@ -349,7 +364,8 @@ func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler
}
operationComplete = true
// Only do a re-balancing for rolling restarts that migrated replicas
if updateMetadata.RequiresReplicaMigration {
// If a scale-up will occur afterwards, skip the re-balancing, because it will occur after the scale-up anyway
if updateMetadata.RequiresReplicaMigration && *instance.Spec.Replicas <= *statefulSet.Spec.Replicas {
nextClusterOp = &SolrClusterOp{
Operation: BalanceReplicasLock,
Metadata: "RollingUpdateComplete",
Expand All @@ -371,7 +387,7 @@ func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler
// We won't kill pods that we need the cluster state for, but we can kill the pods that are already not running.
// This is important for scenarios where there is a bad pod config and nothing is running, but we need to do
// a restart to get a working pod config.
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, hasReadyPod, logger)
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, statefulSet, hasReadyPod, logger)
if apiError != nil {
return false, true, 0, nil, apiError
} else if !retryLater {
Expand Down
71 changes: 40 additions & 31 deletions controllers/solrcloud_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
hostNameIpMap := make(map[string]string)
// Generate a service for every Node
if instance.UsesIndividualNodeServices() {
// When updating the statefulSet below, the hostNameIpMap is just used to add new IPs or modify existing ones.
// When scaling down, the hostAliases that are no longer found here will not be removed from the hostAliases in the statefulSet pod spec.
// Therefore, it should be ok that we are not reconciling the node services that will be scaled down in the future.
// This is unfortunately the reality since we don't have the statefulSet yet to determine how many Solr pods are still running,
// we just have Spec.replicas which is the requested pod count.
for _, nodeName := range solrNodeNames {
err, ip := r.reconcileNodeService(ctx, logger, instance, nodeName)
if err != nil {
Expand All @@ -161,6 +166,7 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if instance.Spec.SolrAddressability.External.UseExternalAddress {
if ip == "" {
// If we are using this IP in the hostAliases of the statefulSet, it needs to be set for every service before trying to update the statefulSet
// TODO: Make an event here
blockReconciliationOfStatefulSet = true
} else {
hostNameIpMap[instance.AdvertisedNodeHost(nodeName)] = ip
Expand Down Expand Up @@ -319,36 +325,6 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}

extAddressabilityOpts := instance.Spec.SolrAddressability.External
if extAddressabilityOpts != nil && extAddressabilityOpts.Method == solrv1beta1.Ingress {
// Generate Ingress
ingress := util.GenerateIngress(instance, solrNodeNames)

// Check if the Ingress already exists
ingressLogger := logger.WithValues("ingress", ingress.Name)
foundIngress := &netv1.Ingress{}
err = r.Get(ctx, types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, foundIngress)
if err != nil && errors.IsNotFound(err) {
ingressLogger.Info("Creating Ingress")
if err = controllerutil.SetControllerReference(instance, ingress, r.Scheme); err == nil {
err = r.Create(ctx, ingress)
}
} else if err == nil {
var needsUpdate bool
needsUpdate, err = util.OvertakeControllerRef(instance, foundIngress, r.Scheme)
needsUpdate = util.CopyIngressFields(ingress, foundIngress, ingressLogger) || needsUpdate

// Update the found Ingress and write the result back if there are any changes
if needsUpdate && err == nil {
ingressLogger.Info("Updating Ingress")
err = r.Update(ctx, foundIngress)
}
}
if err != nil {
return requeueOrNot, err
}
}

var statefulSet *appsv1.StatefulSet

if !blockReconciliationOfStatefulSet {
Expand Down Expand Up @@ -416,6 +392,39 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
return requeueOrNot, err
}
if statefulSet != nil && statefulSet.Spec.Replicas != nil {
solrNodeNames = instance.GetSolrPodNames(int(*statefulSet.Spec.Replicas))
}

extAddressabilityOpts := instance.Spec.SolrAddressability.External
if extAddressabilityOpts != nil && extAddressabilityOpts.Method == solrv1beta1.Ingress {
// Generate Ingress
ingress := util.GenerateIngress(instance, solrNodeNames)

// Check if the Ingress already exists
ingressLogger := logger.WithValues("ingress", ingress.Name)
foundIngress := &netv1.Ingress{}
err = r.Get(ctx, types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, foundIngress)
if err != nil && errors.IsNotFound(err) {
ingressLogger.Info("Creating Ingress")
if err = controllerutil.SetControllerReference(instance, ingress, r.Scheme); err == nil {
err = r.Create(ctx, ingress)
}
} else if err == nil {
var needsUpdate bool
needsUpdate, err = util.OvertakeControllerRef(instance, foundIngress, r.Scheme)
needsUpdate = util.CopyIngressFields(ingress, foundIngress, ingressLogger) || needsUpdate

// Update the found Ingress and write the result back if there are any changes
if needsUpdate && err == nil {
ingressLogger.Info("Updating Ingress")
err = r.Update(ctx, foundIngress)
}
}
if err != nil {
return requeueOrNot, err
}
}

// *********************************************************
// The operations after this require a statefulSet to exist,
Expand Down Expand Up @@ -546,7 +555,7 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// a "locked" cluster operation
if clusterOp == nil {
_, scaleDownOpIsQueued := queuedRetryOps[ScaleDownLock]
clusterOp, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, scaleDownOpIsQueued, podList, logger)
clusterOp, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, scaleDownOpIsQueued, podList, blockReconciliationOfStatefulSet, logger)

// If the new clusterOperation is an update to a queued clusterOp, just change the operation that is already queued
if clusterOp != nil {
Expand Down
24 changes: 23 additions & 1 deletion controllers/util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,29 @@ func CopyPodTemplates(from, to *corev1.PodTemplateSpec, basePath string, logger

if !DeepEqualWithNils(to.Spec.HostAliases, from.Spec.HostAliases) {
requireUpdate = true
to.Spec.HostAliases = from.Spec.HostAliases
if to.Spec.HostAliases == nil {
to.Spec.HostAliases = from.Spec.HostAliases
} else {
// Do not remove aliases that are no longer used.
// This is in case Solr is scaling down and we want to keep the old addresses for future use.
for _, fromAlias := range from.Spec.HostAliases {
found := false
for i, toAlias := range to.Spec.HostAliases {
if fromAlias.Hostnames[0] == toAlias.Hostnames[0] {
found = true
if !DeepEqualWithNils(toAlias, fromAlias) {
requireUpdate = true
to.Spec.HostAliases[i] = fromAlias
break
}
}
}
if !found {
requireUpdate = true
to.Spec.HostAliases = append(to.Spec.HostAliases, fromAlias)
}
}
}
logger.Info("Update required because field changed", "field", basePath+"Spec.HostAliases", "from", to.Spec.HostAliases, "to", from.Spec.HostAliases)
}

Expand Down
14 changes: 12 additions & 2 deletions controllers/util/solr_update_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/solr-operator/controllers/util/solr_api"
"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"net/url"
Expand Down Expand Up @@ -119,7 +120,7 @@ func (state NodeReplicaState) PodHasReplicas(cloud *solr.SolrCloud, podName stri
return isInClusterState && contents.replicas > 0
}

func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, hasReadyPod bool, logger logr.Logger) (state NodeReplicaState, retryLater bool, err error) {
func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, hasReadyPod bool, logger logr.Logger) (state NodeReplicaState, retryLater bool, err error) {
clusterResp := &solr_api.SolrClusterStatusResponse{}
overseerResp := &solr_api.SolrOverseerStatusResponse{}

Expand All @@ -138,7 +139,7 @@ func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, hasReadyPod
}
}
if err == nil {
state = findSolrNodeContents(clusterResp.ClusterStatus, overseerResp.Leader, GetAllManagedSolrNodeNames(cloud))
state = findSolrNodeContents(clusterResp.ClusterStatus, overseerResp.Leader, GetManagedSolrNodeNames(cloud, int(*statefulSet.Spec.Replicas)))
} else {
logger.Error(err, "Could not fetch cluster state information for cloud")
}
Expand Down Expand Up @@ -535,6 +536,15 @@ func GetAllManagedSolrNodeNames(solrCloud *solr.SolrCloud) map[string]bool {
return allNodeNames
}

func GetManagedSolrNodeNames(solrCloud *solr.SolrCloud, currentlyConfiguredPodCount int) map[string]bool {
podNames := solrCloud.GetSolrPodNames(currentlyConfiguredPodCount)
allNodeNames := make(map[string]bool, len(podNames))
for _, podName := range podNames {
allNodeNames[SolrNodeName(solrCloud, podName)] = true
}
return allNodeNames
}

// EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod.
// For updates this will only be called for pods using ephemeral data.
// For scale-down operations, this can be called for pods using ephemeral or persistent data.
Expand Down
3 changes: 3 additions & 0 deletions controllers/util/solr_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,9 @@ func GenerateStatefulSet(solrCloud *solr.SolrCloud, solrCloudStatus *solr.SolrCl
VolumeClaimTemplates: pvcs,
},
}
if solrCloud.UsesHeadlessService() {
stateful.Spec.Template.Spec.Subdomain = solrCloud.HeadlessServiceName()
}

var imagePullSecrets []corev1.LocalObjectReference

Expand Down
8 changes: 7 additions & 1 deletion helm/solr-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ annotations:
url: https://github.com/apache/solr-operator/issues/624
- name: Github PR
url: https://github.com/apache/solr-operator/pull/648
- kind: fixed
description: Avoid reset of security.json if get request fails
links:
- name: Github Issue
Expand All @@ -89,6 +88,13 @@ annotations:
url: https://github.com/apache/solr-operator/issues/693
- name: Github PR
url: https://github.com/apache/solr-operator/pull/694
- kind: fixed
description: SolrClouds addressed via an Ingress now scale up and down safely.
links:
- name: Github Issue
url: https://github.com/apache/solr-operator/issues/682
- name: Github PR
url: https://github.com/apache/solr-operator/pull/692
artifacthub.io/images: |
- name: solr-operator
image: apache/solr-operator:v0.8.1-prerelease
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/solrcloud_rolling_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var _ = FDescribe("E2E - SolrCloud - Rolling Upgrades", func() {
}

By("waiting for the balanceReplicas to finish")
expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*30, time.Second, func(g Gomega, found *appsv1.StatefulSet) {
clusterOp, err := controllers.GetCurrentClusterOp(found)
g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a balanceReplicas lock after balancing is complete.")
Expand Down
Loading

0 comments on commit e5c1271

Please sign in to comment.