diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index dc6f83a8cc650..60a765c0968a8 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -29,6 +29,7 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local" const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local" const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local" const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local" +const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local" const clusterIP = "172.17.12.0" const clusterIPv6 = "2001:db8::88" const clusterIPOpaque = "172.17.12.1" @@ -158,6 +159,98 @@ func TestGet(t *testing.T) { } }) + t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) { + server, client := getServerWithClient(t) + defer server.clusterStore.UnregisterGauges() + + stream := &bufferingGetStream{ + updates: make(chan *pb.Update, 50), + MockServerStream: util.NewMockServerStream(), + } + defer stream.Cancel() + errs := make(chan error) + + path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80) + + // server.Get blocks until the grpc stream is complete so we call it + // in a goroutine and watch stream.updates for updates. + go func() { + err := server.Get(&pb.GetDestination{ + Scheme: "k8s", + Path: path, + }, stream) + if err != nil { + errs <- err + } + }() + + select { + case err := <-errs: + t.Fatalf("Got error: %s", err) + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { + t.Fatalf("Expected opaque transport for %s but was nil", path) + } + } + + // Update the Server's pod selector so that it no longer selects the + // pod. This should result in the proxy protocol no longer being marked + // as opaque. + srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + // PodSelector is updated to NOT select the pod + srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"} + _, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil { + t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport()) + } + case err := <-errs: + t.Fatalf("Got error: %s", err) + } + + // Update the Server's pod selector so that it once again selects the + // pod. This should result in the proxy protocol once again being marked + // as opaque. + srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"} + + _, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { + t.Fatalf("Expected opaque transport for %s but was nil", path) + } + case err := <-errs: + t.Fatalf("Got error: %s", err) + } + }) + t.Run("Remote discovery", func(t *testing.T) { server := makeServer(t) defer server.clusterStore.UnregisterGauges() @@ -863,7 +956,7 @@ func TestGetProfiles(t *testing.T) { } // Server is created, setting the port to opaque - (*l5dClient).ServerV1beta2().Servers("ns").Create(context.Background(), &v1beta2.Server{ + l5dClient.ServerV1beta2().Servers("ns").Create(context.Background(), &v1beta2.Server{ ObjectMeta: metav1.ObjectMeta{ Name: "srv-hostport-mapping-2", Namespace: "ns", diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 6aea49a9d5e19..769c06a72279b 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -17,7 +17,7 @@ func makeServer(t *testing.T) *server { return srv } -func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) { +func getServerWithClient(t *testing.T) (*server, l5dcrdclient.Interface) { meshedPodResources := []string{` apiVersion: v1 kind: Namespace @@ -303,6 +303,32 @@ status: policyResources := []string{ ` apiVersion: v1 +kind: Service +metadata: + name: policy-test + namespace: ns +spec: + type: LoadBalancer + clusterIP: 172.17.12.2 + ports: + - port: 80`, + ` +apiVersion: v1 +kind: Endpoints +metadata: + name: policy-test + namespace: ns +subsets: +- addresses: + - ip: 172.17.0.16 + targetRef: + kind: Pod + name: pod-policyResources + namespace: ns + ports: + - port: 80`, + ` +apiVersion: v1 kind: Pod metadata: labels: @@ -337,6 +363,15 @@ spec: podSelector: matchLabels: app: policy-test + port: 80 + proxyProtocol: opaque`, + ` +apiVersion: policy.linkerd.io/v1beta2 +kind: Server +metadata: + name: srv-external-workload + namespace: ns +spec: externalWorkloadSelector: matchLabels: app: external-workload-policy-test @@ -491,6 +526,32 @@ spec: status: conditions: ready: true`, + ` +apiVersion: v1 +kind: Service +metadata: + name: policy-test-external-workload + namespace: ns +spec: + type: LoadBalancer + clusterIP: 172.17.12.3 + ports: + - port: 80`, + ` +apiVersion: v1 +kind: Endpoints +metadata: + name: policy-test-external-workload + namespace: ns +subsets: +- addresses: + - ip: 200.1.1.2 + targetRef: + kind: ExternalWorkload + name: policy-test-workload + namespace: ns + ports: + - port: 80`, } extenalNameResources := []string{ ` diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index e5ac409dfe3c9..1bbd3c44d2f61 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -511,11 +511,14 @@ func (ew *EndpointsWatcher) addServer(obj interface{}) { defer ew.Unlock() server := obj.(*v1beta2.Server) for _, sp := range ew.publishers { - sp.updateServer(server, true) + sp.updateServer(nil, server) } } func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) { + ew.Lock() + defer ew.Unlock() + oldServer := oldObj.(*v1beta2.Server) newServer := newObj.(*v1beta2.Server) oldUpdated := latestUpdated(oldServer.ManagedFields) @@ -525,7 +528,9 @@ func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) serverInformerLag.Observe(delta.Seconds()) } - ew.addServer(newObj) + for _, sp := range ew.publishers { + sp.updateServer(oldServer, newServer) + } } func (ew *EndpointsWatcher) deleteServer(obj interface{}) { @@ -533,7 +538,7 @@ func (ew *EndpointsWatcher) deleteServer(obj interface{}) { defer ew.Unlock() server := obj.(*v1beta2.Server) for _, sp := range ew.publishers { - sp.updateServer(server, false) + sp.updateServer(server, nil) } } @@ -703,12 +708,12 @@ func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname) } -func (sp *servicePublisher) updateServer(server *v1beta2.Server, isAdd bool) { +func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta2.Server) { sp.Lock() defer sp.Unlock() for _, pp := range sp.ports { - pp.updateServer(server, isAdd) + pp.updateServer(oldServer, newServer) } } @@ -1237,70 +1242,12 @@ func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) { pp.metrics.setSubscribers(len(pp.listeners)) } -func (pp *portPublisher) updateServer(server *v1beta2.Server, isAdd bool) { +func (pp *portPublisher) updateServer(oldServer, newServer *v1beta2.Server) { updated := false for id, address := range pp.addresses.Addresses { - portMatch := false - if address.Pod != nil { - selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector) - if err != nil { - pp.log.Errorf("failed to create Selector: %s", err) - return - } - - if !selector.Matches(labels.Set(address.Pod.Labels)) { - continue - } - - switch server.Spec.Port.Type { - case intstr.Int: - if server.Spec.Port.IntVal == int32(address.Port) { - portMatch = true - } - case intstr.String: - for _, c := range address.Pod.Spec.Containers { - for _, p := range c.Ports { - if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal { - portMatch = true - } - } - } - default: - continue - } - - } else if address.ExternalWorkload != nil { - selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector) - if err != nil { - pp.log.Errorf("failed to create Selector: %s", err) - return - } - - if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) { - continue - } - switch server.Spec.Port.Type { - case intstr.Int: - if server.Spec.Port.IntVal == int32(address.Port) { - portMatch = true - } - case intstr.String: - for _, p := range address.ExternalWorkload.Spec.Ports { - if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal { - portMatch = true - } - } - default: - continue - } - - } else { - continue - } - - if portMatch { - if isAdd && server.Spec.ProxyProtocol == opaqueProtocol { + if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) { + if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol { address.OpaqueProtocol = true } else { address.OpaqueProtocol = false @@ -1319,6 +1266,64 @@ func (pp *portPublisher) updateServer(server *v1beta2.Server, isAdd bool) { } } +func (pp *portPublisher) isAddressSelected(address Address, server *v1beta2.Server) bool { + if server == nil { + return false + } + + if address.Pod != nil { + selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector) + if err != nil { + pp.log.Errorf("failed to create Selector: %s", err) + return false + } + + if !selector.Matches(labels.Set(address.Pod.Labels)) { + return false + } + + switch server.Spec.Port.Type { + case intstr.Int: + if server.Spec.Port.IntVal == int32(address.Port) { + return true + } + case intstr.String: + for _, c := range address.Pod.Spec.Containers { + for _, p := range c.Ports { + if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal { + return true + } + } + } + } + + } else if address.ExternalWorkload != nil { + selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector) + if err != nil { + pp.log.Errorf("failed to create Selector: %s", err) + return false + } + + if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) { + return false + } + + switch server.Spec.Port.Type { + case intstr.Int: + if server.Spec.Port.IntVal == int32(address.Port) { + return true + } + case intstr.String: + for _, p := range address.ExternalWorkload.Spec.Ports { + if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal { + return true + } + } + } + } + return false +} + //////////// /// util /// //////////// diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 89e10def9f408..7345f20c576ca 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -34,13 +34,13 @@ func NewFakeAPIWithActions(configs ...string) (*API, func() []testing.Action, er // NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like // NewFakeAPI, but it also returns the mock client for linkerd CRDs -func NewFakeAPIWithL5dClient(configs ...string) (*API, *l5dcrdclient.Interface, error) { +func NewFakeAPIWithL5dClient(configs ...string) (*API, l5dcrdclient.Interface, error) { clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...) if err != nil { return nil, nil, err } - return NewFakeClusterScopedAPI(clientSet, l5dClientSet), &l5dClientSet, nil + return NewFakeClusterScopedAPI(clientSet, l5dClientSet), l5dClientSet, nil } // NewFakeClusterScopedAPI provides a mock Kubernetes API for testing.