Skip to content

Commit

Permalink
Apply review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrerun committed Apr 4, 2024
1 parent f50dda3 commit 2faea7b
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions pkg/ha/ha_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ package ha
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
ctlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -69,7 +69,7 @@ func (ha *HAService) setEndpoints(ctx context.Context) error {
// Bypass client cache to avoid triggering a cluster wide list-watch for Endpoints - our RBAC does not allow it
err := ha.manager.GetAPIReader().Get(ctx, client.ObjectKey{Namespace: ha.namespace, Name: app.Name}, &endpoints)
if err != nil {
if !errors.IsNotFound(err) {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("updating the service endpoint to point to the new leader: retrieving endpoints: %w", err)
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func (ha *HAService) Start(ctx context.Context) error {

select {
case <-ctx.Done():
_ = ha.cleanUp()
_ = ha.cleanUpServiceEndpoints()
return fmt.Errorf("starting HA service: %w", ctx.Err())
case <-ha.testIsolation.TimeAfter(retryPeriod):
}
Expand All @@ -111,16 +111,16 @@ func (ha *HAService) Start(ctx context.Context) error {
}

<-ctx.Done()
err := ha.cleanUp()
err := ha.cleanUpServiceEndpoints()
if err == nil {
err = ctx.Err()
}
return err
}

// cleanUp is executed upon ending leadership. Its purpose is to remove the Endpoints object created upon acquiring
// cleanUpServiceEndpoints is executed upon ending leadership. Its purpose is to remove the Endpoints object created upon acquiring
// leadership.
func (ha *HAService) cleanUp() error {
func (ha *HAService) cleanUpServiceEndpoints() error {
// Use our own context. This function executes when the main application context is closed.
// Also, try to finish before a potential 15 seconds termination grace timeout.
ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second)
Expand All @@ -130,8 +130,13 @@ func (ha *HAService) cleanUp() error {
attempt := 0
var err error
for {
endpoints := corev1.Endpoints{}
err = seedClient.Get(ctx, client.ObjectKey{Namespace: ha.namespace, Name: app.Name}, &endpoints)
endpoints := corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: app.Name,
Namespace: ha.namespace,
},
}
err = seedClient.Get(ctx, client.ObjectKeyFromObject(&endpoints), &endpoints)
if err != nil {
if apierrors.IsNotFound(err) {
ha.log.V(app.VerbosityVerbose).Info("The endpoints object cleanup succeeded: the object was missing")
Expand All @@ -141,14 +146,7 @@ func (ha *HAService) cleanUp() error {
ha.log.V(app.VerbosityInfo).Info("Failed to retrieve the endpoints object", "error", err.Error())
} else {
// Avoid data race. We don't want to delete the endpoint if it is sending traffic to a replica other than this one.
isEndpointStillPointingToOurReplica :=
len(endpoints.Subsets) == 1 &&
len(endpoints.Subsets[0].Addresses) == 1 &&
endpoints.Subsets[0].Addresses[0].IP == ha.servingIPAddress &&
len(endpoints.Subsets[0].Ports) == 1 &&
endpoints.Subsets[0].Ports[0].Port == int32(ha.servingPort) &&
endpoints.Subsets[0].Ports[0].Protocol == corev1.ProtocolTCP
if !isEndpointStillPointingToOurReplica {
if !ha.isEndpointStillPointingToOurReplica(&endpoints) {
// Someone else is using the endpoint. We can't perform safe cleanup. Abandon the object.
ha.log.V(app.VerbosityWarning).Info(
"Abandoning endpoints object because it was modified by an external actor")
Expand Down Expand Up @@ -177,3 +175,13 @@ func (ha *HAService) cleanUp() error {
ha.log.V(app.VerbosityError).Error(err, "All retries to delete the endpoints object failed. Abandoning object.")
return fmt.Errorf("HAService cleanup: deleting endponts object: retrying failed, last error: %w", err)
}

// Does the endpoints object hold the same values as the ones we previously set to it?
func (ha *HAService) isEndpointStillPointingToOurReplica(endpoints *corev1.Endpoints) bool {
return len(endpoints.Subsets) == 1 &&
len(endpoints.Subsets[0].Addresses) == 1 &&
endpoints.Subsets[0].Addresses[0].IP == ha.servingIPAddress &&
len(endpoints.Subsets[0].Ports) == 1 &&
endpoints.Subsets[0].Ports[0].Port == int32(ha.servingPort) &&
endpoints.Subsets[0].Ports[0].Protocol == corev1.ProtocolTCP
}

0 comments on commit 2faea7b

Please sign in to comment.