From 9c92f2de3da97a660fe63c210241d02786dda729 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Tue, 1 Aug 2023 22:08:05 +0000 Subject: [PATCH 1/2] WIP Signed-off-by: Alex Leong --- .../templates/link-crd.yaml | 28 ++++++ multicluster/cmd/link.go | 9 ++ .../service-mirror/cluster_watcher.go | 98 ++++++++++++++----- pkg/k8s/labels.go | 11 +++ pkg/multicluster/link.go | 14 +++ 5 files changed, 134 insertions(+), 26 deletions(-) diff --git a/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml b/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml index c345a412d9e2b..4ff9946daf204 100644 --- a/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml +++ b/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml @@ -77,6 +77,34 @@ spec: type: array items: type: string + remoteDiscoverySelector: + description: Selector for Services to mirror in remote discovery mode + type: object + properties: + matchLabels: + type: object + x-kubernetes-preserve-unknown-fields: true + matchExpressions: + description: List of selector requirements + type: array + items: + description: A selector item requires a key and an operator + type: object + required: + - key + - operator + properties: + key: + description: Label key that selector should apply to + type: string + operator: + description: Evaluation of a label in relation to set + type: string + enum: [In, NotIn, Exists, DoesNotExist] + values: + type: array + items: + type: string targetClusterName: description: Name of target cluster to link to type: string diff --git a/multicluster/cmd/link.go b/multicluster/cmd/link.go index f433ac9b741ad..7ca57a3833640 100644 --- a/multicluster/cmd/link.go +++ b/multicluster/cmd/link.go @@ -45,6 +45,7 @@ type ( controlPlaneVersion string dockerRegistry string selector string + remoteDiscoverySelector string gatewayAddresses string gatewayPort uint32 ha bool @@ -229,6 +230,11 @@ A full list of configurable values can be found at https://github.com/linkerd/li return err } + remoteDiscoverySelector, err := metav1.ParseToLabelSelector(opts.remoteDiscoverySelector) + if err != nil { + return err + } + link := mc.Link{ Name: opts.clusterName, Namespace: opts.namespace, @@ -241,6 +247,7 @@ A full list of configurable values can be found at https://github.com/linkerd/li GatewayIdentity: gatewayIdentity, ProbeSpec: probeSpec, Selector: *selector, + RemoteDiscoverySelector: *remoteDiscoverySelector, } obj, err := link.ToUnstructured() @@ -303,6 +310,7 @@ A full list of configurable values can be found at https://github.com/linkerd/li cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry, fmt.Sprintf("Docker registry to pull service mirror controller image from ($%s)", flags.EnvOverrideDockerRegistry)) cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror") + cmd.Flags().StringVar(&opts.remoteDiscoverySelector, "remote-discovery-selector", opts.remoteDiscoverySelector, "Selector (label query) to filter which services in the target cluster to mirror in remote discovery mode") cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified, overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)") cmd.Flags().Uint32Var(&opts.gatewayPort, "gateway-port", opts.gatewayPort, "If specified, overwrites gateway port when gateway service is not type LoadBalancer") cmd.Flags().BoolVar(&opts.ha, "ha", opts.ha, "Enable HA configuration for the service-mirror deployment (default false)") @@ -401,6 +409,7 @@ func newLinkOptionsWithDefault() (*linkOptions, error) { logLevel: defaults.LogLevel, logFormat: defaults.LogFormat, selector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true"), + remoteDiscoverySelector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "remote-discovery"), gatewayAddresses: "", gatewayPort: 0, ha: false, diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index d55200f1d6eaf..5545b0ae92901 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -235,6 +235,11 @@ func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceLabels(remoteService return labels } + if rcsw.isRemoteDiscovery(remoteService) { + labels[consts.RemoteDiscoveryLabel] = rcsw.link.TargetClusterName + labels[consts.RemoteServiceLabel] = remoteService.GetName() + } + for key, value := range remoteService.ObjectMeta.Labels { if strings.HasPrefix(key, consts.SvcMirrorPrefix) { continue @@ -430,27 +435,46 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context. // new gateway being assigned or additional ports exposed. This method takes care of that. func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error { rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name) - gatewayAddresses, err := rcsw.resolveGatewayAddress() - if err != nil { - return err - } - copiedEndpoints := ev.localEndpoints.DeepCopy() - copiedEndpoints.Subsets = []corev1.EndpointSubset{ - { - Addresses: gatewayAddresses, - Ports: rcsw.getEndpointsPorts(ev.remoteUpdate), - }, + if rcsw.isRemoteDiscovery(ev.remoteUpdate) { + if ev.localEndpoints != nil { + err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localService.Namespace).Delete(ctx, ev.localService.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err) + } + return RetryableError{[]error{err}} + } + return nil } - if copiedEndpoints.Annotations == nil { - copiedEndpoints.Annotations = make(map[string]string) - } - copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity + if ev.localEndpoints == nil { + err := rcsw.createGatewayEndpoints(ctx, ev.remoteUpdate) + if err != nil { + return err + } + } else { + gatewayAddresses, err := rcsw.resolveGatewayAddress() + if err != nil { + return err + } - err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints) - if err != nil { - return RetryableError{[]error{err}} + copiedEndpoints := ev.localEndpoints.DeepCopy() + copiedEndpoints.Subsets = []corev1.EndpointSubset{ + { + Addresses: gatewayAddresses, + Ports: rcsw.getEndpointsPorts(ev.remoteUpdate), + }, + } + + if copiedEndpoints.Annotations == nil { + copiedEndpoints.Annotations = make(map[string]string) + } + copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity + + err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints) + if err != nil { + return RetryableError{[]error{err}} + } } ev.localService.Labels = rcsw.getMirroredServiceLabels(ev.remoteUpdate) @@ -518,6 +542,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context. } } + if rcsw.isRemoteDiscovery(remoteService) { + // For remote discovery services, skip creating gateway endpoints. + return nil + } return rcsw.createGatewayEndpoints(ctx, remoteService) } @@ -657,15 +685,19 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S lastMirroredRemoteVersion, ok := localService.Annotations[consts.RemoteResourceVersionAnnotation] if ok && lastMirroredRemoteVersion != service.ResourceVersion { endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(service.Namespace).Get(localName) - if err == nil { - rcsw.eventsQueue.Add(&RemoteServiceUpdated{ - localService: localService, - localEndpoints: endpoints, - remoteUpdate: service, - }) - return nil + if err != nil { + if kerrors.IsNotFound(err) { + endpoints = nil + } else { + return RetryableError{[]error{err}} + } } - return RetryableError{[]error{err}} + rcsw.eventsQueue.Add(&RemoteServiceUpdated{ + localService: localService, + localEndpoints: endpoints, + remoteUpdate: service, + }) + return nil } return nil @@ -1179,5 +1211,19 @@ func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool { rcsw.log.Errorf("Invalid selector: %s", err) return false } - return selector.Matches(labels.Set(l)) + remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector) + if err != nil { + rcsw.log.Errorf("Invalid selector: %s", err) + return false + } + return selector.Matches(labels.Set(l)) || remoteDiscoverySelector.Matches(labels.Set(l)) +} + +func (rcsw *RemoteClusterServiceWatcher) isRemoteDiscovery(svc *corev1.Service) bool { + remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector) + if err != nil { + rcsw.log.Errorf("Invalid selector: %s", err) + return false + } + return remoteDiscoverySelector.Matches(labels.Set(svc.Labels)) } diff --git a/pkg/k8s/labels.go b/pkg/k8s/labels.go index 13205f983b327..64c62b0132460 100644 --- a/pkg/k8s/labels.go +++ b/pkg/k8s/labels.go @@ -394,6 +394,10 @@ const ( // and types used by the service mirror component SvcMirrorPrefix = "mirror.linkerd.io" + // MulticlusterPrefix is the prefix common to all labels and annotations + // used for multicluster services. + MulticlusterPrefix = "multicluster.linkerd.io" + // MirrorSecretType is the type of secret that is supposed to contain // the access information for remote clusters. MirrorSecretType = SvcMirrorPrefix + "/remote-kubeconfig" @@ -417,6 +421,13 @@ const ( // allows us to associate a mirrored service with a remote cluster RemoteClusterNameLabel = SvcMirrorPrefix + "/cluster-name" + // RemoteDiscoveryLabel indicates that this service is a remote discovery + // service and the value of this label is the name of the remote cluster. + RemoteDiscoveryLabel = MulticlusterPrefix + "/remote-discovery" + + // RemoteServiceLabel is the name of the service in the remote cluster. + RemoteServiceLabel = MulticlusterPrefix + "/remote-service" + // RemoteResourceVersionAnnotation is the last observed remote resource // version of a mirrored resource. Useful when doing updates RemoteResourceVersionAnnotation = SvcMirrorPrefix + "/remote-resource-version" diff --git a/pkg/multicluster/link.go b/pkg/multicluster/link.go index c683a63ca2d83..9626c52c2d33c 100644 --- a/pkg/multicluster/link.go +++ b/pkg/multicluster/link.go @@ -43,6 +43,7 @@ type ( GatewayIdentity string ProbeSpec ProbeSpec Selector metav1.LabelSelector + RemoteDiscoverySelector metav1.LabelSelector } ) @@ -135,6 +136,18 @@ func NewLink(u unstructured.Unstructured) (Link, error) { } } + remoteDiscoverySelector := metav1.LabelSelector{} + if selectorObj, ok := specObj["remoteDiscoverySelector"]; ok { + bytes, err := json.Marshal(selectorObj) + if err != nil { + return Link{}, err + } + err = json.Unmarshal(bytes, &remoteDiscoverySelector) + if err != nil { + return Link{}, err + } + } + return Link{ Name: u.GetName(), Namespace: u.GetNamespace(), @@ -147,6 +160,7 @@ func NewLink(u unstructured.Unstructured) (Link, error) { GatewayIdentity: gatewayIdentity, ProbeSpec: probeSpec, Selector: selector, + RemoteDiscoverySelector: remoteDiscoverySelector, }, nil } From d6b8daa699caf4f2bf552fd4bb5683a8402beb23 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Thu, 3 Aug 2023 23:42:21 +0000 Subject: [PATCH 2/2] Add test Signed-off-by: Alex Leong --- .../cmd/testdata/install_default.golden | 28 +++ multicluster/cmd/testdata/install_ha.golden | 28 +++ multicluster/cmd/testdata/install_psp.golden | 28 +++ .../service-mirror/cluster_watcher.go | 27 ++- .../cluster_watcher_mirroring_test.go | 68 ++++-- .../cluster_watcher_test_util.go | 220 ++++++++++++------ pkg/multicluster/link.go | 11 + 7 files changed, 311 insertions(+), 99 deletions(-) diff --git a/multicluster/cmd/testdata/install_default.golden b/multicluster/cmd/testdata/install_default.golden index 499658a159300..3d96ce2dcdfbf 100644 --- a/multicluster/cmd/testdata/install_default.golden +++ b/multicluster/cmd/testdata/install_default.golden @@ -299,6 +299,34 @@ spec: type: array items: type: string + remoteDiscoverySelector: + description: Selector for Services to mirror in remote discovery mode + type: object + properties: + matchLabels: + type: object + x-kubernetes-preserve-unknown-fields: true + matchExpressions: + description: List of selector requirements + type: array + items: + description: A selector item requires a key and an operator + type: object + required: + - key + - operator + properties: + key: + description: Label key that selector should apply to + type: string + operator: + description: Evaluation of a label in relation to set + type: string + enum: [In, NotIn, Exists, DoesNotExist] + values: + type: array + items: + type: string targetClusterName: description: Name of target cluster to link to type: string diff --git a/multicluster/cmd/testdata/install_ha.golden b/multicluster/cmd/testdata/install_ha.golden index cbcd33f5f34c9..a909aec4be378 100644 --- a/multicluster/cmd/testdata/install_ha.golden +++ b/multicluster/cmd/testdata/install_ha.golden @@ -371,6 +371,34 @@ spec: type: array items: type: string + remoteDiscoverySelector: + description: Selector for Services to mirror in remote discovery mode + type: object + properties: + matchLabels: + type: object + x-kubernetes-preserve-unknown-fields: true + matchExpressions: + description: List of selector requirements + type: array + items: + description: A selector item requires a key and an operator + type: object + required: + - key + - operator + properties: + key: + description: Label key that selector should apply to + type: string + operator: + description: Evaluation of a label in relation to set + type: string + enum: [In, NotIn, Exists, DoesNotExist] + values: + type: array + items: + type: string targetClusterName: description: Name of target cluster to link to type: string diff --git a/multicluster/cmd/testdata/install_psp.golden b/multicluster/cmd/testdata/install_psp.golden index b9d0e9ebeb704..9c5e2c3c5c959 100644 --- a/multicluster/cmd/testdata/install_psp.golden +++ b/multicluster/cmd/testdata/install_psp.golden @@ -333,6 +333,34 @@ spec: type: array items: type: string + remoteDiscoverySelector: + description: Selector for Services to mirror in remote discovery mode + type: object + properties: + matchLabels: + type: object + x-kubernetes-preserve-unknown-fields: true + matchExpressions: + description: List of selector requirements + type: array + items: + description: A selector item requires a key and an operator + type: object + required: + - key + - operator + properties: + key: + description: Label key that selector should apply to + type: string + operator: + description: Evaluation of a label in relation to set + type: string + enum: [In, NotIn, Exists, DoesNotExist] + values: + type: array + items: + type: string targetClusterName: description: Name of target cluster to link to type: string diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index 5545b0ae92901..b24e5b60a5262 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -437,22 +437,26 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context. rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name) if rcsw.isRemoteDiscovery(ev.remoteUpdate) { + // The service is mirrored in remote discovery mode and any local + // endpoints for it should be deleted if they exist. if ev.localEndpoints != nil { err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localService.Namespace).Delete(ctx, ev.localService.Name, metav1.DeleteOptions{}) if err != nil { - return fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err) + return RetryableError{[]error{ + fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err), + }} } - return RetryableError{[]error{err}} } - return nil - } - - if ev.localEndpoints == nil { + } else if ev.localEndpoints == nil { + // The service is mirrored in gateway mode and gateway endpoints should + // be created for it. err := rcsw.createGatewayEndpoints(ctx, ev.remoteUpdate) if err != nil { return err } } else { + // The service is mirrored in gateway mode and gateway endpoints already + // exist for it but may need to be updated. gatewayAddresses, err := rcsw.resolveGatewayAddress() if err != nil { return err @@ -1206,6 +1210,11 @@ func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpo } func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool { + // Treat an empty selector as "Nothing" instead of "Everything" so that + // when the selector field is unset, we don't export all Services. + if len(rcsw.link.Selector.MatchExpressions)+len(rcsw.link.Selector.MatchLabels) == 0 { + return false + } selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector) if err != nil { rcsw.log.Errorf("Invalid selector: %s", err) @@ -1220,6 +1229,12 @@ func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool { } func (rcsw *RemoteClusterServiceWatcher) isRemoteDiscovery(svc *corev1.Service) bool { + // Treat an empty remoteDisocverySelector as "Nothing" instead of + // "Everything" so that when the remoteDiscoverySelector field is unset, we + // don't export all Services. + if len(rcsw.link.RemoteDiscoverySelector.MatchExpressions)+len(rcsw.link.RemoteDiscoverySelector.MatchLabels) == 0 { + return false + } remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector) if err != nil { rcsw.log.Errorf("Invalid selector: %s", err) diff --git a/multicluster/service-mirror/cluster_watcher_mirroring_test.go b/multicluster/service-mirror/cluster_watcher_mirroring_test.go index eb7a58226f675..bdb9c8371ebc2 100644 --- a/multicluster/service-mirror/cluster_watcher_mirroring_test.go +++ b/multicluster/service-mirror/cluster_watcher_mirroring_test.go @@ -199,6 +199,29 @@ func TestRemoteServiceCreatedMirroring(t *testing.T) { }), }, }, + { + description: "remote discovery mirroring", + environment: createRemoteDiscoveryService, + expectedLocalServices: []*corev1.Service{ + remoteDiscoveryMirrorService( + "service-one", + "ns1", + "111", + []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + expectedLocalEndpoints: []*corev1.Endpoints{}, + }, } { tc := tt // pin tc.run(t) @@ -226,13 +249,14 @@ func TestLocalNamespaceCreatedAfterServiceExport(t *testing.T) { watcher := RemoteClusterServiceWatcher{ link: &multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, remoteAPIClient: remoteAPI, localAPIClient: localAPI, @@ -314,13 +338,14 @@ func TestServiceCreatedGatewayAlive(t *testing.T) { events := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) watcher := RemoteClusterServiceWatcher{ link: &multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.0.1", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.0.1", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, remoteAPIClient: remoteAPI, localAPIClient: localAPI, @@ -461,13 +486,14 @@ func TestServiceCreatedGatewayDown(t *testing.T) { events := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) watcher := RemoteClusterServiceWatcher{ link: &multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.0.1", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.0.1", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, remoteAPIClient: remoteAPI, localAPIClient: localAPI, diff --git a/multicluster/service-mirror/cluster_watcher_test_util.go b/multicluster/service-mirror/cluster_watcher_test_util.go index ac3a6de402e8f..4ba6bb99f7d1d 100644 --- a/multicluster/service-mirror/cluster_watcher_test_util.go +++ b/multicluster/service-mirror/cluster_watcher_test_util.go @@ -32,7 +32,8 @@ var ( Port: defaultProbePort, Period: time.Duration(defaultProbePeriod) * time.Second, } - defaultSelector, _ = metav1.ParseToLabelSelector(consts.DefaultExportedServiceSelector) + defaultSelector, _ = metav1.ParseToLabelSelector(consts.DefaultExportedServiceSelector + "=true") + defaultRemoteDiscoverySelector, _ = metav1.ParseToLabelSelector(consts.DefaultExportedServiceSelector + "=remote-discovery") ) type testEnvironment struct { @@ -107,13 +108,51 @@ var createExportedService = &testEnvironment{ namespaceAsYaml("ns1"), }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, + }, +} + +var createRemoteDiscoveryService = &testEnvironment{ + events: []interface{}{ + &RemoteServiceCreated{ + service: remoteService("service-one", "ns1", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "remote-discovery", + }, []corev1.ServicePort{ + { + Name: "port1", + Protocol: "TCP", + Port: 555, + }, + { + Name: "port2", + Protocol: "TCP", + Port: 666, + }, + }), + }, + }, + remoteResources: []string{ + endpointsAsYaml("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{}), + }, + localResources: []string{ + namespaceAsYaml("ns1"), + }, + link: multicluster.Link{ + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } @@ -192,7 +231,8 @@ var createExportedHeadlessService = &testEnvironment{ Path: "/probe1", Period: 120, }, - Selector: *defaultSelector, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } @@ -208,13 +248,14 @@ var deleteMirrorService = &testEnvironment{ endpointsAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "gateway-identity", nil), }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } @@ -301,13 +342,14 @@ var updateServiceWithChangedPorts = &testEnvironment{ }), }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } @@ -409,13 +451,14 @@ var updateEndpointsWithChangedHosts = &testEnvironment{ }), }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } var clusterUnregistered = &testEnvironment{ @@ -459,13 +502,14 @@ func onAddOrUpdateExportedSvc(isAdd bool) *testEnvironment { }, nil)), }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } @@ -483,13 +527,14 @@ func onAddOrUpdateRemoteServiceUpdated(isAdd bool) *testEnvironment { endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil), }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } } @@ -506,13 +551,14 @@ func onAddOrUpdateSameResVersion(isAdd bool) *testEnvironment { endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil), }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } } @@ -527,13 +573,14 @@ func serviceNotExportedAnymore(isAdd bool) *testEnvironment { endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil), }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } } @@ -547,13 +594,14 @@ var onDeleteExportedService = &testEnvironment{ }, }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } @@ -564,13 +612,14 @@ var onDeleteNonExportedService = &testEnvironment{ }, }, link: multicluster.Link{ - TargetClusterName: clusterName, - TargetClusterDomain: clusterDomain, - GatewayIdentity: "gateway-identity", - GatewayAddress: "192.0.2.127", - GatewayPort: 888, - ProbeSpec: defaultProbeSpec, - Selector: *defaultSelector, + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: 888, + ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, + RemoteDiscoverySelector: *defaultRemoteDiscoverySelector, }, } @@ -828,6 +877,33 @@ func endpointMirrorService(hostname, rootName, namespace, resourceVersion string } } +func remoteDiscoveryMirrorService(name, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service { + annotations := make(map[string]string) + annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion + annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace) + + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", name, clusterName), + Namespace: namespace, + Labels: map[string]string{ + consts.RemoteClusterNameLabel: clusterName, + consts.MirroredResourceLabel: "true", + consts.RemoteDiscoveryLabel: clusterName, + consts.RemoteServiceLabel: name, + }, + Annotations: annotations, + }, + Spec: corev1.ServiceSpec{ + Ports: ports, + }, + } +} + func mirrorServiceAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string { svc := mirrorService(name, namespace, resourceVersion, ports) diff --git a/pkg/multicluster/link.go b/pkg/multicluster/link.go index 9626c52c2d33c..49b36846de220 100644 --- a/pkg/multicluster/link.go +++ b/pkg/multicluster/link.go @@ -193,6 +193,17 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) { } spec["selector"] = selector + data, err = json.Marshal(l.RemoteDiscoverySelector) + if err != nil { + return unstructured.Unstructured{}, err + } + remoteDiscoverySelector := make(map[string]interface{}) + err = json.Unmarshal(data, &remoteDiscoverySelector) + if err != nil { + return unstructured.Unstructured{}, err + } + spec["remoteDiscoverySelector"] = remoteDiscoverySelector + return unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": k8s.LinkAPIGroupVersion,