Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete stale EndpointSlices on restart #1430

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/agent/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
validations "k8s.io/apimachinery/pkg/util/validation"
Expand Down Expand Up @@ -113,14 +114,18 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN
localSyncer: agentController.serviceExportSyncer,
}

agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient)
agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient,
agentController.serviceSyncer)
if err != nil {
return nil, err
}

agentController.serviceImportController, err = newServiceImportController(spec, syncerMetricNames, syncerConf,
agentController.endpointSliceController.syncer.GetBrokerClient(),
agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient)
agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient,
func(selector k8slabels.Selector) []runtime.Object {
return agentController.endpointSliceController.syncer.ListLocalResourcesBySelector(&discovery.EndpointSlice{}, selector)
})
if err != nil {
return nil, err
}
Expand Down
29 changes: 27 additions & 2 deletions pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/submariner-io/lighthouse/pkg/constants"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -43,11 +44,12 @@ import (

//nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer.
func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.SyncerConfig,
serviceExportClient *ServiceExportClient,
serviceExportClient *ServiceExportClient, serviceSyncer syncer.Interface,
) (*EndpointSliceController, error) {
c := &EndpointSliceController{
clusterID: spec.ClusterID,
serviceExportClient: serviceExportClient,
serviceSyncer: serviceSyncer,
conflictCheckWorkQueue: workqueue.New("ConflictChecker"),
}

Expand Down Expand Up @@ -116,8 +118,31 @@ func (c *EndpointSliceController) onLocalEndpointSlice(obj runtime.Object, _ int
return nil, false
}

serviceName := endpointSlice.Labels[mcsv1a1.LabelServiceName]

logger.V(log.DEBUG).Infof("Local EndpointSlice \"%s/%s\" for service %q %sd",
endpointSlice.Namespace, endpointSlice.Name, endpointSlice.Labels[mcsv1a1.LabelServiceName], op)
endpointSlice.Namespace, endpointSlice.Name, serviceName, op)

// Check if the associated Service exists and, if not, delete the EndpointSlice. On restart, it's possible the Service could've been
// deleted.
if op == syncer.Create {
_, found, _ := c.serviceSyncer.GetResource(serviceName, endpointSlice.Namespace)
if !found {
logger.Infof("The service %q for EndpointSlice \"%s/%s\" does not exist - deleting it",
serviceName, endpointSlice.Namespace, endpointSlice.Name)

err := c.syncer.GetLocalFederator().Delete(ctx, endpointSlice)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope this deletes the EndpointSlice from the Broker as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup - the admiral syncer does that.

if apierrors.IsNotFound(err) {
err = nil
}

if err != nil {
logger.Errorf(err, "Error deleting EndpointSlice %s/%s", endpointSlice.Namespace, endpointSlice.Name)
}

return nil, err != nil
}
}

return obj, false
}
Expand Down
86 changes: 85 additions & 1 deletion pkg/agent/controller/reconciliation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/syncer/test"
testutil "github.com/submariner-io/admiral/pkg/test"
"github.com/submariner-io/lighthouse/pkg/constants"
Expand Down Expand Up @@ -59,7 +60,11 @@ var _ = Describe("Reconciliation", func() {
t.cluster1.createService()
t.cluster1.createServiceExport()

t.awaitNonHeadlessServiceExported(&t.cluster1)
if t.cluster1.service.Spec.ClusterIP == corev1.ClusterIPNone {
t.awaitHeadlessServiceExported(&t.cluster1)
} else {
t.awaitNonHeadlessServiceExported(&t.cluster1)
}

var err error

Expand Down Expand Up @@ -144,6 +149,7 @@ var _ = Describe("Reconciliation", func() {
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
t.cluster1.createService()
t.cluster1.createServiceEndpointSlices()
t.cluster1.start(t, *t.syncerConfig)

t.awaitServiceUnexported(&t.cluster1)
Expand All @@ -157,6 +163,7 @@ var _ = Describe("Reconciliation", func() {

restoreBrokerResources()
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
t.cluster1.createServiceExport()
t.cluster1.start(t, *t.syncerConfig)

Expand Down Expand Up @@ -231,6 +238,83 @@ var _ = Describe("Reconciliation", func() {
t.cluster1.service.Name, t.cluster1.clusterID)
})
})

When("a local EndpointSlice is stale on startup", func() {
Context("because the service no longer exists", func() {
It("should delete it from the local datastore", func() {
t.afterEach()
t = newTestDiver()

By("Restarting controllers")

restoreBrokerResources()
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
t.cluster1.start(t, *t.syncerConfig)

t.awaitServiceUnexported(&t.cluster1)
})
})

Context("because the K8s EndpointSlice no longer exists", func() {
BeforeEach(func() {
t.cluster1.service.Spec.ClusterIP = corev1.ClusterIPNone
})

It("should delete it from the local datastore", func() {
t.afterEach()
t = newTestDiver()

t.cluster1.service.Spec.ClusterIP = corev1.ClusterIPNone

By("Restarting controllers")

restoreBrokerResources()
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
test.CreateResource(t.cluster1.localServiceExportClient, serviceExport)
t.cluster1.createService()

// Create a remote EPS for the same service and ensure it's not deleted.
remoteEndpointSlice := localEndpointSlice.DeepCopy()
remoteEndpointSlice.Name = "remote-eps"
remoteEndpointSlice.Labels[constants.MCSLabelSourceCluster] = t.cluster2.clusterID
remoteEndpointSlice.Labels[federate.ClusterIDLabelKey] = t.cluster2.clusterID
test.CreateResource(t.cluster1.localEndpointSliceClient, remoteEndpointSlice)

remoteEndpointSlice.Namespace = test.RemoteNamespace
test.CreateResource(t.brokerEndpointSliceClient, remoteEndpointSlice)

// Create an EPS for a service in another namespace and ensure it's not deleted.
otherNS := "other-ns"
otherNSEndpointSlice := localEndpointSlice.DeepCopy()
otherNSEndpointSlice.Name = "other-ns-eps"
otherNSEndpointSlice.Namespace = otherNS
otherNSEndpointSlice.Labels[constants.LabelSourceNamespace] = otherNS
test.CreateResource(endpointSliceClientFor(t.cluster1.localDynClient, otherNS), otherNSEndpointSlice)

test.CreateResource(t.cluster1.dynamicServiceClientFor().Namespace(otherNS), &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: t.cluster1.service.Name,
Namespace: otherNS,
},
})

t.cluster1.start(t, *t.syncerConfig)

t.awaitNoEndpointSlice(&t.cluster1)

Consistently(func() bool {
test.AwaitResource(t.cluster1.localEndpointSliceClient, remoteEndpointSlice.Name)
return true
}).Should(BeTrue())

Consistently(func() bool {
test.AwaitResource(endpointSliceClientFor(t.cluster1.localDynClient, otherNS), otherNSEndpointSlice.Name)
return true
}).Should(BeTrue())
})
})
})
})

var _ = Describe("EndpointSlice migration", func() {
Expand Down
24 changes: 24 additions & 0 deletions pkg/agent/controller/service_endpoint_slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (

func startEndpointSliceController(localClient dynamic.Interface, restMapper meta.RESTMapper, scheme *runtime.Scheme,
serviceImport *mcsv1a1.ServiceImport, clusterID string, globalIngressIPCache *globalIngressIPCache,
localLHEndpointSliceLister EndpointSliceListerFn,
) (*ServiceEndpointSliceController, error) {
serviceNamespace := serviceImport.Labels[constants.LabelSourceNamespace]
serviceName := serviceImportSourceName(serviceImport)
Expand Down Expand Up @@ -89,6 +90,29 @@ func startEndpointSliceController(localClient dynamic.Interface, restMapper meta
return nil, errors.Wrap(err, "error starting Endpoints syncer")
}

if controller.isHeadless() {
controller.epsSyncer.Reconcile(func() []runtime.Object {
list := localLHEndpointSliceLister(k8slabels.SelectorFromSet(map[string]string{
constants.LabelSourceNamespace: serviceNamespace,
mcsv1a1.LabelServiceName: serviceName,
constants.MCSLabelSourceCluster: clusterID,
}))

retList := make([]runtime.Object, 0, len(list))
for _, o := range list {
eps := o.(*discovery.EndpointSlice)
retList = append(retList, &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: eps.Labels[constants.LabelSourceName],
Namespace: serviceNamespace,
},
})
}

return retList
})
}

return controller, nil
}

Expand Down
18 changes: 10 additions & 8 deletions pkg/agent/controller/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ import (
//nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer.
func newServiceImportController(spec *AgentSpecification, syncerMetricNames AgentConfig, syncerConfig broker.SyncerConfig,
brokerClient dynamic.Interface, brokerNamespace string, serviceExportClient *ServiceExportClient,
localLHEndpointSliceLister EndpointSliceListerFn,
) (*ServiceImportController, error) {
controller := &ServiceImportController{
localClient: syncerConfig.LocalClient,
restMapper: syncerConfig.RestMapper,
clusterID: spec.ClusterID,
localNamespace: spec.Namespace,
converter: converter{scheme: syncerConfig.Scheme},
serviceImportAggregator: newServiceImportAggregator(brokerClient, brokerNamespace, spec.ClusterID, syncerConfig.Scheme),
serviceExportClient: serviceExportClient,
localClient: syncerConfig.LocalClient,
restMapper: syncerConfig.RestMapper,
clusterID: spec.ClusterID,
localNamespace: spec.Namespace,
converter: converter{scheme: syncerConfig.Scheme},
serviceImportAggregator: newServiceImportAggregator(brokerClient, brokerNamespace, spec.ClusterID, syncerConfig.Scheme),
serviceExportClient: serviceExportClient,
localLHEndpointSliceLister: localLHEndpointSliceLister,
}

var err error
Expand Down Expand Up @@ -222,7 +224,7 @@ func (c *ServiceImportController) startEndpointsController(serviceImport *mcsv1a
}

endpointController, err := startEndpointSliceController(c.localClient, c.restMapper, c.converter.scheme,
serviceImport, c.clusterID, c.globalIngressIPCache)
serviceImport, c.clusterID, c.globalIngressIPCache, c.localLHEndpointSliceLister)
if err != nil {
return errors.Wrapf(err, "failed to start endpoints controller for %q", key)
}
Expand Down
29 changes: 17 additions & 12 deletions pkg/agent/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/submariner-io/admiral/pkg/watcher"
"github.com/submariner-io/admiral/pkg/workqueue"
"k8s.io/apimachinery/pkg/api/meta"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
Expand All @@ -41,6 +42,8 @@ const (

var BrokerResyncPeriod = time.Minute * 2

type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object

type converter struct {
scheme *runtime.Scheme
}
Expand Down Expand Up @@ -78,18 +81,19 @@ type ServiceImportAggregator struct {
// from the submariner namespace and creates/updates the aggregated ServiceImport on the broker; the other that syncs
// aggregated ServiceImports from the broker to the local service namespace. It also creates a ServiceEndpointSliceController.
type ServiceImportController struct {
localClient dynamic.Interface
restMapper meta.RESTMapper
serviceImportAggregator *ServiceImportAggregator
serviceImportMigrator *ServiceImportMigrator
serviceExportClient *ServiceExportClient
localSyncer syncer.Interface
remoteSyncer syncer.Interface
endpointControllers sync.Map
clusterID string
localNamespace string
converter converter
globalIngressIPCache *globalIngressIPCache
localClient dynamic.Interface
restMapper meta.RESTMapper
serviceImportAggregator *ServiceImportAggregator
serviceImportMigrator *ServiceImportMigrator
serviceExportClient *ServiceExportClient
localSyncer syncer.Interface
remoteSyncer syncer.Interface
endpointControllers sync.Map
clusterID string
localNamespace string
converter converter
globalIngressIPCache *globalIngressIPCache
localLHEndpointSliceLister EndpointSliceListerFn
}

// Each ServiceEndpointSliceController watches for the EndpointSlices that backs a Service and have a ServiceImport.
Expand All @@ -115,6 +119,7 @@ type EndpointSliceController struct {
syncer *broker.Syncer
serviceImportAggregator *ServiceImportAggregator
serviceExportClient *ServiceExportClient
serviceSyncer syncer.Interface
conflictCheckWorkQueue workqueue.Interface
}

Expand Down
Loading