From 368b63866d3e8c96a3dd89cac5094c44bef2bcd8 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Fri, 11 Aug 2023 09:31:45 -0700 Subject: [PATCH] Add support for remote discovery (#11224) Adds support for remote discovery to the destination controller. When the destination controller gets a `Get` request for a Service with the `multicluster.linkerd.io/remote-discovery` label, this is an indication that the destination controller should discover the endpoints for this service from a remote cluster. The destination controller will look for a remote cluster which has been linked to it (using the `linkerd multicluster link` command) with that name. It will look at the `multicluster.linkerd.io/remote-discovery` label for the service name to look up in that cluster. It then streams back the endpoint data for that remote service. Since we now have multiple client-go informers for the same resource types (one for the local cluster and one for each linked remote cluster) we add a `cluster` label onto the prometheus metrics for the informers and EndpointWatchers to ensure that each of these components' metrics are correctly tracked and don't overwrite each other. --------- Signed-off-by: Alex Leong --- .../templates/destination-rbac.yaml | 39 ++++++++ ...install_controlplane_tracing_output.golden | 37 ++++++++ cli/cmd/testdata/install_custom_domain.golden | 37 ++++++++ .../testdata/install_custom_registry.golden | 37 ++++++++ cli/cmd/testdata/install_default.golden | 37 ++++++++ ...stall_default_override_dst_get_nets.golden | 37 ++++++++ cli/cmd/testdata/install_default_token.golden | 37 ++++++++ cli/cmd/testdata/install_ha_output.golden | 37 ++++++++ .../install_ha_with_overrides_output.golden | 37 ++++++++ .../install_heartbeat_disabled_output.golden | 37 ++++++++ .../install_helm_control_plane_output.golden | 37 ++++++++ ...nstall_helm_control_plane_output_ha.golden | 37 ++++++++ .../install_helm_output_ha_labels.golden | 37 ++++++++ ...l_helm_output_ha_namespace_selector.golden | 37 ++++++++ .../testdata/install_no_init_container.golden | 37 ++++++++ cli/cmd/testdata/install_output.golden | 37 ++++++++ cli/cmd/testdata/install_proxy_ignores.golden | 37 ++++++++ cli/cmd/testdata/install_values_file.golden | 37 ++++++++ controller/api/destination/server.go | 95 +++++++++++++++---- controller/api/destination/server_test.go | 34 +++++++ controller/api/destination/test_util.go | 92 +++++++++++++++++- .../api/destination/watcher/cluster_store.go | 45 +++++---- .../destination/watcher/cluster_store_test.go | 29 +----- .../destination/watcher/endpoints_watcher.go | 8 +- .../watcher/endpoints_watcher_test.go | 20 ++-- .../api/destination/watcher/prometheus.go | 5 +- .../api/destination/watcher/test_util.go | 19 ++++ controller/cmd/destination/main.go | 12 ++- controller/k8s/api.go | 65 +++++++------ controller/k8s/k8s.go | 2 +- controller/k8s/metadata_api.go | 21 ++-- controller/k8s/prometheus.go | 7 +- controller/k8s/test_helper.go | 2 + controller/webhook/launcher.go | 2 +- .../remote-access-service-mirror-rbac.yaml | 14 ++- multicluster/cmd/link.go | 10 +- multicluster/cmd/service-mirror/main.go | 1 + .../cmd/testdata/install_default.golden | 14 ++- multicluster/cmd/testdata/install_ha.golden | 14 ++- multicluster/cmd/testdata/install_psp.golden | 14 ++- .../service-mirror/cluster_watcher.go | 2 +- pkg/healthcheck/healthcheck.go | 2 +- pkg/k8s/fake.go | 11 ++- .../multicluster/testdata/allow.golden | 32 +++++-- viz/metrics-api/cmd/main.go | 1 + viz/tap/api/main.go | 1 + 46 files changed, 1097 insertions(+), 145 deletions(-) diff --git a/charts/linkerd-control-plane/templates/destination-rbac.yaml b/charts/linkerd-control-plane/templates/destination-rbac.yaml index 7da11806e10b6..6bbc4731b5745 100644 --- a/charts/linkerd-control-plane/templates/destination-rbac.yaml +++ b/charts/linkerd-control-plane/templates/destination-rbac.yaml @@ -264,3 +264,42 @@ subjects: - kind: ServiceAccount name: linkerd-destination namespace: {{.Release.Namespace}} +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: {{.Release.Namespace}} + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: {{.Release.Namespace}} + {{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }} +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: {{.Release.Namespace}} + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: {{.Release.Namespace}} + {{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: {{.Release.Namespace}} diff --git a/cli/cmd/testdata/install_controlplane_tracing_output.golden b/cli/cmd/testdata/install_controlplane_tracing_output.golden index c39773a66ff71..058c058258452 100644 --- a/cli/cmd/testdata/install_controlplane_tracing_output.golden +++ b/cli/cmd/testdata/install_controlplane_tracing_output.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_custom_domain.golden b/cli/cmd/testdata/install_custom_domain.golden index a544335ca603b..22f362526e475 100644 --- a/cli/cmd/testdata/install_custom_domain.golden +++ b/cli/cmd/testdata/install_custom_domain.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_custom_registry.golden b/cli/cmd/testdata/install_custom_registry.golden index 4699524b09bad..1e9842f2c454a 100644 --- a/cli/cmd/testdata/install_custom_registry.golden +++ b/cli/cmd/testdata/install_custom_registry.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_default.golden b/cli/cmd/testdata/install_default.golden index a544335ca603b..22f362526e475 100644 --- a/cli/cmd/testdata/install_default.golden +++ b/cli/cmd/testdata/install_default.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_default_override_dst_get_nets.golden b/cli/cmd/testdata/install_default_override_dst_get_nets.golden index b9a95a5f12c11..9ac9f547b9084 100644 --- a/cli/cmd/testdata/install_default_override_dst_get_nets.golden +++ b/cli/cmd/testdata/install_default_override_dst_get_nets.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_default_token.golden b/cli/cmd/testdata/install_default_token.golden index 4611edecac7c0..17a43bafce188 100644 --- a/cli/cmd/testdata/install_default_token.golden +++ b/cli/cmd/testdata/install_default_token.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_ha_output.golden b/cli/cmd/testdata/install_ha_output.golden index f62222b5a0b3d..a09c5e2078097 100644 --- a/cli/cmd/testdata/install_ha_output.golden +++ b/cli/cmd/testdata/install_ha_output.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_ha_with_overrides_output.golden b/cli/cmd/testdata/install_ha_with_overrides_output.golden index cf923d7278a21..6d2c7c8eda825 100644 --- a/cli/cmd/testdata/install_ha_with_overrides_output.golden +++ b/cli/cmd/testdata/install_ha_with_overrides_output.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_heartbeat_disabled_output.golden b/cli/cmd/testdata/install_heartbeat_disabled_output.golden index ee62f5600b2ab..503c88e1fd66b 100644 --- a/cli/cmd/testdata/install_heartbeat_disabled_output.golden +++ b/cli/cmd/testdata/install_heartbeat_disabled_output.golden @@ -258,6 +258,43 @@ subjects: - kind: ServiceAccount name: linkerd-destination namespace: linkerd +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd --- diff --git a/cli/cmd/testdata/install_helm_control_plane_output.golden b/cli/cmd/testdata/install_helm_control_plane_output.golden index f09332165001f..02d45bffb74e6 100644 --- a/cli/cmd/testdata/install_helm_control_plane_output.golden +++ b/cli/cmd/testdata/install_helm_control_plane_output.golden @@ -250,6 +250,43 @@ subjects: name: linkerd-destination namespace: linkerd-dev --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd-dev + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd-dev +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd-dev + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd-dev +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd-dev +--- # Source: linkerd-control-plane/templates/heartbeat-rbac.yaml --- ### diff --git a/cli/cmd/testdata/install_helm_control_plane_output_ha.golden b/cli/cmd/testdata/install_helm_control_plane_output_ha.golden index a86405796be14..96d51141dc167 100644 --- a/cli/cmd/testdata/install_helm_control_plane_output_ha.golden +++ b/cli/cmd/testdata/install_helm_control_plane_output_ha.golden @@ -250,6 +250,43 @@ subjects: name: linkerd-destination namespace: linkerd-dev --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd-dev + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd-dev +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd-dev + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd-dev +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd-dev +--- # Source: linkerd-control-plane/templates/heartbeat-rbac.yaml --- ### diff --git a/cli/cmd/testdata/install_helm_output_ha_labels.golden b/cli/cmd/testdata/install_helm_output_ha_labels.golden index 59cfa8165724c..80c37772b6ff3 100644 --- a/cli/cmd/testdata/install_helm_output_ha_labels.golden +++ b/cli/cmd/testdata/install_helm_output_ha_labels.golden @@ -250,6 +250,43 @@ subjects: name: linkerd-destination namespace: linkerd-dev --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd-dev + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd-dev +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd-dev + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd-dev +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd-dev +--- # Source: linkerd-control-plane/templates/heartbeat-rbac.yaml --- ### diff --git a/cli/cmd/testdata/install_helm_output_ha_namespace_selector.golden b/cli/cmd/testdata/install_helm_output_ha_namespace_selector.golden index 36e7ee8bd5b0e..47e9945d69a51 100644 --- a/cli/cmd/testdata/install_helm_output_ha_namespace_selector.golden +++ b/cli/cmd/testdata/install_helm_output_ha_namespace_selector.golden @@ -250,6 +250,43 @@ subjects: name: linkerd-destination namespace: linkerd-dev --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd-dev + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd-dev +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd-dev + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd-dev +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd-dev +--- # Source: linkerd-control-plane/templates/heartbeat-rbac.yaml --- ### diff --git a/cli/cmd/testdata/install_no_init_container.golden b/cli/cmd/testdata/install_no_init_container.golden index 2c717f16878ed..e27edc99c8eac 100644 --- a/cli/cmd/testdata/install_no_init_container.golden +++ b/cli/cmd/testdata/install_no_init_container.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_output.golden b/cli/cmd/testdata/install_output.golden index b16dfc354616b..5a5022bd18d44 100644 --- a/cli/cmd/testdata/install_output.golden +++ b/cli/cmd/testdata/install_output.golden @@ -256,6 +256,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_proxy_ignores.golden b/cli/cmd/testdata/install_proxy_ignores.golden index cd7c73c17999f..6e28381da7fcf 100644 --- a/cli/cmd/testdata/install_proxy_ignores.golden +++ b/cli/cmd/testdata/install_proxy_ignores.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/cli/cmd/testdata/install_values_file.golden b/cli/cmd/testdata/install_values_file.golden index 4dcb9b7e3817f..3d389a3500911 100644 --- a/cli/cmd/testdata/install_values_file.golden +++ b/cli/cmd/testdata/install_values_file.golden @@ -259,6 +259,43 @@ subjects: name: linkerd-destination namespace: linkerd --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: linkerd-destination-remote-discovery + namespace: linkerd + labels: + app.kubernetes.io/part-of: Linkerd + linkerd.io/control-plane-component: destination + linkerd.io/control-plane-ns: linkerd +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: remote-discovery +subjects: + - kind: ServiceAccount + name: linkerd-destination + namespace: linkerd +--- ### ### Heartbeat RBAC ### diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index a220270e0de63..47cb0f8ca4591 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -22,6 +22,7 @@ import ( "google.golang.org/grpc/peer" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" ) type ( @@ -33,6 +34,8 @@ type ( profiles *watcher.ProfileWatcher servers *watcher.ServerWatcher + clusterStore *watcher.ClusterStore + enableH2Upgrade bool controllerNS string identityTrustDomain string @@ -66,6 +69,7 @@ func NewServer( enableEndpointSlices bool, k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, + clusterStore *watcher.ClusterStore, clusterDomain string, defaultOpaquePorts map[uint32]struct{}, shutdown <-chan struct{}, @@ -81,7 +85,7 @@ func NewServer( return nil, err } - endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, enableEndpointSlices) + endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, enableEndpointSlices, "local") if err != nil { return nil, err } @@ -104,6 +108,7 @@ func NewServer( opaquePorts, profiles, servers, + clusterStore, enableH2Upgrade, controllerNS, identityTrustDomain, @@ -135,18 +140,6 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e log.Debugf("Dest token: %v", token) } - translator := newEndpointTranslator( - s.controllerNS, - s.identityTrustDomain, - s.enableH2Upgrade, - dest.GetPath(), - token.NodeName, - s.defaultOpaquePorts, - s.metadataAPI, - stream, - log, - ) - // The host must be fully-qualified or be an IP address. host, port, err := getHostAndPort(dest.GetPath()) if err != nil { @@ -165,17 +158,77 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath()) } - err = s.endpoints.Subscribe(service, port, instanceID, translator) + svc, err := s.k8sAPI.Svc().Lister().Services(service.Namespace).Get(service.Name) if err != nil { - var ise watcher.InvalidService - if errors.As(err, &ise) { - log.Debugf("Invalid service %s", dest.GetPath()) - return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath()) + if kerrors.IsNotFound(err) { + log.Debugf("Service not found %s", service) + return status.Errorf(codes.NotFound, "Service %s.%s not found", service.Name, service.Namespace) } - log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err) - return err + log.Debugf("Failed to get service %s: %v", service, err) + return status.Errorf(codes.Internal, "Failed to get service %s", dest.GetPath()) + } + + if cluster, found := svc.Labels[labels.RemoteDiscoveryLabel]; found { + // Remote discovery + remoteSvc, found := svc.Labels[labels.RemoteServiceLabel] + if !found { + log.Debugf("Remote discovery service missing remote service name %s", service) + return status.Errorf(codes.FailedPrecondition, "Remote discovery service missing remote service name %s", dest.GetPath()) + } + remoteWatcher, remoteConfig, found := s.clusterStore.Get(cluster) + if !found { + log.Errorf("Failed to get remote cluster %s", cluster) + return status.Errorf(codes.NotFound, "Remote cluster not found: %s", cluster) + } + translator := newEndpointTranslator( + s.controllerNS, + remoteConfig.TrustDomain, + s.enableH2Upgrade, + fmt.Sprintf("%s.%s.svc.%s:%d", remoteSvc, service.Namespace, remoteConfig.ClusterDomain, port), + token.NodeName, + s.defaultOpaquePorts, + s.metadataAPI, + stream, + log, + ) + err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, instanceID, translator) + if err != nil { + var ise watcher.InvalidService + if errors.As(err, &ise) { + log.Debugf("Invalid remote discovery service %s", dest.GetPath()) + return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath()) + } + log.Errorf("Failed to subscribe to remote disocvery service %q in cluster %s: %s", dest.GetPath(), cluster, err) + return err + } + defer remoteWatcher.Unsubscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, instanceID, translator) + + } else { + // Local discovery + translator := newEndpointTranslator( + s.controllerNS, + s.identityTrustDomain, + s.enableH2Upgrade, + dest.GetPath(), + token.NodeName, + s.defaultOpaquePorts, + s.metadataAPI, + stream, + log, + ) + + err = s.endpoints.Subscribe(service, port, instanceID, translator) + if err != nil { + var ise watcher.InvalidService + if errors.As(err, &ise) { + log.Debugf("Invalid service %s", dest.GetPath()) + return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath()) + } + log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err) + return err + } + defer s.endpoints.Unsubscribe(service, port, instanceID, translator) } - defer s.endpoints.Unsubscribe(service, port, instanceID, translator) select { case <-s.shutdown: diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index b9e2edb8e7f06..ee423643697f4 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "testing" + "time" pb "github.com/linkerd/linkerd2-proxy-api/go/destination" "github.com/linkerd/linkerd2-proxy-api/go/net" @@ -109,6 +110,39 @@ func TestGet(t *testing.T) { t.Fatalf("Expected TLS identity for %s to be nil but got %+v", path, addrs[0].TlsIdentity) } }) + + t.Run("Remote discovery", func(t *testing.T) { + server := makeServer(t) + + // Wait for cluster store to be synced. + time.Sleep(50 * time.Millisecond) + + stream := &bufferingGetStream{ + updates: []*pb.Update{}, + MockServerStream: util.NewMockServerStream(), + } + + // We cancel the stream before even sending the request so that we don't + // need to call server.Get in a separate goroutine. By preemptively + // cancelling, the behavior of Get becomes effectively synchronous and + // we will get only the initial update, which is what we want for this + // test. + stream.Cancel() + + err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", "foo-target.ns.svc.mycluster.local", 80)}, stream) + if err != nil { + t.Fatalf("Got error: %s", err) + } + + if len(stream.updates) != 1 { + t.Fatalf("Expected 1 update but got %d: %v", len(stream.updates), stream.updates) + } + + if updateAddAddress(t, stream.updates[0])[0] != fmt.Sprintf("%s:%d", "172.17.55.1", 80) { + t.Fatalf("Expected %s but got %s", fmt.Sprintf("%s:%d", podIP1, port), updateAddAddress(t, stream.updates[0])[0]) + } + + }) } func TestGetProfiles(t *testing.T) { diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index d96975497aa6e..77566eee88a25 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -352,6 +352,87 @@ spec: name: nginx-7777`, } + exportedServiceResources := []string{` +apiVersion: v1 +kind: Namespace +metadata: + name: ns`, + ` +apiVersion: v1 +kind: Service +metadata: + name: foo + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 80`, + ` +apiVersion: v1 +kind: Endpoints +metadata: + name: foo + namespace: ns +subsets: +- addresses: + - ip: 172.17.55.1 + targetRef: + kind: Pod + name: foo-1 + namespace: ns + ports: + - port: 80`, + ` +apiVersion: v1 +kind: Pod +metadata: + labels: + linkerd.io/control-plane-ns: linkerd + name: foo-1 + namespace: ns +status: + phase: Running + podIP: 172.17.55.1 + podIPs: + - ip: 172.17.55.1 +spec: + containers: + - env: + - name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR + value: 0.0.0.0:4143 + name: linkerd-proxy`, + } + + destinationCredentialsResources := []string{` +apiVersion: v1 +data: + kubeconfig: V2UncmUgbm8gc3RyYW5nZXJzIHRvIGxvdmUKWW91IGtub3cgdGhlIHJ1bGVzIGFuZCBzbyBkbyBJIChkbyBJKQpBIGZ1bGwgY29tbWl0bWVudCdzIHdoYXQgSSdtIHRoaW5raW5nIG9mCllvdSB3b3VsZG4ndCBnZXQgdGhpcyBmcm9tIGFueSBvdGhlciBndXkKSSBqdXN0IHdhbm5hIHRlbGwgeW91IGhvdyBJJ20gZmVlbGluZwpHb3R0YSBtYWtlIHlvdSB1bmRlcnN0YW5kCk5ldmVyIGdvbm5hIGdpdmUgeW91IHVwCk5ldmVyIGdvbm5hIGxldCB5b3UgZG93bgpOZXZlciBnb25uYSBydW4gYXJvdW5kIGFuZCBkZXNlcnQgeW91Ck5ldmVyIGdvbm5hIG1ha2UgeW91IGNyeQpOZXZlciBnb25uYSBzYXkgZ29vZGJ5ZQpOZXZlciBnb25uYSB0ZWxsIGEgbGllIGFuZCBodXJ0IHlvdQpXZSd2ZSBrbm93biBlYWNoIG90aGVyIGZvciBzbyBsb25nCllvdXIgaGVhcnQncyBiZWVuIGFjaGluZywgYnV0IHlvdSdyZSB0b28gc2h5IHRvIHNheSBpdCAoc2F5IGl0KQpJbnNpZGUsIHdlIGJvdGgga25vdyB3aGF0J3MgYmVlbiBnb2luZyBvbiAoZ29pbmcgb24pCldlIGtub3cgdGhlIGdhbWUgYW5kIHdlJ3JlIGdvbm5hIHBsYXkgaXQKQW5kIGlmIHlvdSBhc2sgbWUgaG93IEknbSBmZWVsaW5nCkRvbid0IHRlbGwgbWUgeW91J3JlIHRvbyBibGluZCB0byBzZWUKTmV2ZXIgZ29ubmEgZ2l2ZSB5b3UgdXAKTmV2ZXIgZ29ubmEgbGV0IHlvdSBkb3duCk5ldmVyIGdvbm5hIHJ1biBhcm91bmQgYW5kIGRlc2VydCB5b3UKTmV2ZXIgZ29ubmEgbWFrZSB5b3UgY3J5Ck5ldmVyIGdvbm5hIHNheSBnb29kYnllCk5ldmVyIGdvbm5hIHRlbGwgYSBsaWUgYW5kIGh1cnQgeW91 +kind: Secret +metadata: + annotations: + multicluster.linkerd.io/cluster-domain: cluster.local + multicluster.linkerd.io/trust-domain: cluster.local + labels: + multicluster.linkerd.io/cluster-name: target + name: cluster-credentials-target + namespace: linkerd +type: mirror.linkerd.io/remote-kubeconfig`} + + mirrorServiceResources := []string{` +apiVersion: v1 +kind: Service +metadata: + name: foo-target + namespace: ns + labels: + multicluster.linkerd.io/remote-discovery: target + multicluster.linkerd.io/remote-service: foo +spec: + type: LoadBalancer + ports: + - port: 80`, + } + res := append(meshedPodResources, clientSP...) res = append(res, unmeshedPod) res = append(res, meshedOpaquePodResources...) @@ -360,6 +441,8 @@ spec: res = append(res, meshedStatefulSetPodResource...) res = append(res, policyResources...) res = append(res, hostPortMapping...) + res = append(res, mirrorServiceResources...) + res = append(res, destinationCredentialsResources...) k8sAPI, err := k8s.NewFakeAPI(res...) if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) @@ -384,7 +467,7 @@ spec: t.Fatalf("initializeIndexers returned an error: %s", err) } - endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, false) + endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -401,10 +484,16 @@ spec: t.Fatalf("can't create Server watcher: %s", err) } + clusterStore, err := watcher.NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", false, watcher.CreateMockDecoder(exportedServiceResources...)) + if err != nil { + t.Fatalf("can't create cluster store: %s", err) + } + // Sync after creating watchers so that the the indexers added get updated // properly k8sAPI.Sync(nil) metadataAPI.Sync(nil) + clusterStore.Sync(nil) return &server{ pb.UnimplementedDestinationServer{}, @@ -412,6 +501,7 @@ spec: opaquePorts, profiles, servers, + clusterStore, true, "linkerd", "trust.domain", diff --git a/controller/api/destination/watcher/cluster_store.go b/controller/api/destination/watcher/cluster_store.go index ed1145d1cd015..43581701c9743 100644 --- a/controller/api/destination/watcher/cluster_store.go +++ b/controller/api/destination/watcher/cluster_store.go @@ -7,12 +7,12 @@ import ( "sync" "github.com/linkerd/linkerd2/controller/k8s" + pkgK8s "github.com/linkerd/linkerd2/pkg/k8s" logging "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" - - consts "github.com/linkerd/linkerd2/pkg/k8s" ) type ( @@ -25,7 +25,7 @@ type ( // Protects against illegal accesses sync.RWMutex - k8sAPI *k8s.API + api *k8s.API store map[string]remoteCluster enableEndpointSlices bool log *logging.Entry @@ -53,7 +53,7 @@ type ( // configDecoder is the type of a function that given a byte buffer, returns // a pair of API Server clients. The cache uses this function to dynamically // create clients after discovering a Secret. - configDecoder = func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) + configDecoder = func(data []byte, cluster string, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) ) const ( @@ -68,24 +68,30 @@ const ( // When created, a pair of event handlers are registered for the local cluster's // Secret informer. The event handlers are responsible for driving the discovery // of remote clusters and their configuration -func NewClusterStore(k8sAPI *k8s.API, enableEndpointSlices bool) (*ClusterStore, error) { - return newClusterStoreWithDecoder(k8sAPI, enableEndpointSlices, decodeK8sConfigFromSecret) +func NewClusterStore(client kubernetes.Interface, namespace string, enableEndpointSlices bool) (*ClusterStore, error) { + return NewClusterStoreWithDecoder(client, namespace, enableEndpointSlices, decodeK8sConfigFromSecret) +} + +func (cs *ClusterStore) Sync(stopCh <-chan struct{}) { + cs.api.Sync(stopCh) } // newClusterStoreWithDecoder is a helper function that allows the creation of a // store with an arbitrary `configDecoder` function. -func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error) { +func NewClusterStoreWithDecoder(client kubernetes.Interface, namespace string, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error) { + api := k8s.NewNamespacedAPI(client, nil, nil, namespace, "local", k8s.Secret) + cs := &ClusterStore{ store: make(map[string]remoteCluster), log: logging.WithFields(logging.Fields{ "component": "cluster-store", }), enableEndpointSlices: enableEndpointSlices, - k8sAPI: k8sAPI, + api: api, decodeFn: decodeFn, } - _, err := cs.k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := cs.api.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { secret, ok := obj.(*v1.Secret) if !ok { @@ -93,7 +99,7 @@ func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, deco return } - if secret.Type != consts.MirrorSecretType { + if secret.Type != pkgK8s.MirrorSecretType { cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: invalid type %s", secret.Namespace, secret.Name, secret.Type) return @@ -169,7 +175,7 @@ func (cs *ClusterStore) removeCluster(clusterName string) { r.watcher.removeHandlers() close(r.stopCh) delete(cs.store, clusterName) - cs.log.Tracef("Removed cluster %s from ClusterStore", clusterName) + cs.log.Infof("Removed cluster %s from ClusterStore", clusterName) } // addCluster is triggered by the cache's Secret informer when a secret is @@ -177,7 +183,7 @@ func (cs *ClusterStore) removeCluster(clusterName string) { // object, it creates an EndpointsWatcher for a remote cluster and syncs its // informers before returning. func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error { - data, found := secret.Data[consts.ConfigKeyName] + data, found := secret.Data[pkgK8s.ConfigKeyName] if !found { return errors.New("missing kubeconfig file") } @@ -192,7 +198,7 @@ func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation) } - remoteAPI, metadataAPI, err := cs.decodeFn(data, cs.enableEndpointSlices) + remoteAPI, metadataAPI, err := cs.decodeFn(data, clusterName, cs.enableEndpointSlices) if err != nil { return err } @@ -205,6 +211,7 @@ func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error "remote-cluster": clusterName, }), cs.enableEndpointSlices, + clusterName, ) if err != nil { return err @@ -224,7 +231,7 @@ func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error go remoteAPI.Sync(stopCh) go metadataAPI.Sync(stopCh) - cs.log.Tracef("Added cluster %s to ClusterStore", clusterName) + cs.log.Infof("Added cluster %s to ClusterStore", clusterName) return nil } @@ -232,7 +239,7 @@ func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error // decodeK8sConfigFromSecret implements the decoder function type. Given a byte // buffer, it attempts to parse it as a kubeconfig file. If successful, returns // a pair of API Server clients. -func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { +func decodeK8sConfigFromSecret(data []byte, cluster string, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { cfg, err := clientcmd.RESTConfigFromKubeConfig(data) if err != nil { return nil, nil, err @@ -245,21 +252,23 @@ func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API ctx, cfg, true, - k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + cluster, + k8s.ES, k8s.Pod, k8s.Svc, k8s.Srv, ) } else { remoteAPI, err = k8s.InitializeAPIForConfig( ctx, cfg, true, - k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + cluster, + k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.Srv, ) } if err != nil { return nil, nil, err } - metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, k8s.Node, k8s.RS) + metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, cluster, k8s.RS) if err != nil { return nil, nil, err } diff --git a/controller/api/destination/watcher/cluster_store_test.go b/controller/api/destination/watcher/cluster_store_test.go index dfda4b7ac71ac..92f7079f60334 100644 --- a/controller/api/destination/watcher/cluster_store_test.go +++ b/controller/api/destination/watcher/cluster_store_test.go @@ -7,24 +7,6 @@ import ( "github.com/linkerd/linkerd2/controller/k8s" ) -func CreateMockDecoder() configDecoder { - // Create a mock decoder with some random objs to satisfy client creation - return func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { - remoteAPI, err := k8s.NewFakeAPI([]string{}...) - if err != nil { - return nil, nil, err - } - - metadataAPI, err := k8s.NewFakeMetadataAPI(nil) - if err != nil { - return nil, nil, err - } - - return remoteAPI, metadataAPI, nil - } - -} - func TestClusterStoreHandlers(t *testing.T) { for _, tt := range []struct { name string @@ -93,24 +75,17 @@ func TestClusterStoreHandlers(t *testing.T) { } { tt := tt // Pin t.Run(tt.name, func(t *testing.T) { - // TODO (matei): use namespace scoped API here k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...) if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } - metadataAPI, err := k8s.NewFakeMetadataAPI(nil) - if err != nil { - t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) - } - - cs, err := newClusterStoreWithDecoder(k8sAPI, tt.enableEndpointSlices, CreateMockDecoder()) + cs, err := NewClusterStoreWithDecoder(k8sAPI.Client, "linkerd", tt.enableEndpointSlices, CreateMockDecoder()) if err != nil { t.Fatalf("Unexpected error when starting watcher cache: %s", err) } - k8sAPI.Sync(nil) - metadataAPI.Sync(nil) + cs.Sync(nil) // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on time.Sleep(50 * time.Millisecond) diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 05bca4c4e0932..454d63542bb29 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -75,6 +75,7 @@ type ( k8sAPI *k8s.API metadataAPI *k8s.MetadataAPI + cluster string log *logging.Entry enableEndpointSlices bool sync.RWMutex // This mutex protects modification of the map itself. @@ -108,6 +109,7 @@ type ( metadataAPI *k8s.MetadataAPI enableEndpointSlices bool localTrafficPolicy bool + cluster string ports map[portAndHostname]*portPublisher // All access to the servicePublisher and its portPublishers is explicitly synchronized by // this mutex. @@ -150,12 +152,13 @@ var undefinedEndpointPort = Port(0) // NewEndpointsWatcher creates an EndpointsWatcher and begins watching the // k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will // watch on Endpoints or EndpointSlice resources, depending on cluster configuration. -func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool) (*EndpointsWatcher, error) { +func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, cluster string) (*EndpointsWatcher, error) { ew := &EndpointsWatcher{ publishers: make(map[ServiceID]*servicePublisher), k8sAPI: k8sAPI, metadataAPI: metadataAPI, enableEndpointSlices: enableEndpointSlices, + cluster: cluster, log: log.WithFields(logging.Fields{ "component": "endpoints-watcher", }), @@ -438,6 +441,7 @@ func (ew *EndpointsWatcher) getOrNewServicePublisher(id ServiceID) *servicePubli }), k8sAPI: ew.k8sAPI, metadataAPI: ew.metadataAPI, + cluster: ew.cluster, ports: make(map[portAndHostname]*portPublisher), enableEndpointSlices: ew.enableEndpointSlices, } @@ -634,7 +638,7 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *por } func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus.Labels { - return endpointsLabels(sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname) + return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname) } func (sp *servicePublisher) updateServer(server *v1beta1.Server, isAdd bool) { diff --git a/controller/api/destination/watcher/endpoints_watcher_test.go b/controller/api/destination/watcher/endpoints_watcher_test.go index 240fc0bad0f60..afc3af6fbe08c 100644 --- a/controller/api/destination/watcher/endpoints_watcher_test.go +++ b/controller/api/destination/watcher/endpoints_watcher_test.go @@ -675,7 +675,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -1303,7 +1303,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -1433,7 +1433,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -1607,7 +1607,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -1844,7 +1844,7 @@ subsets: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2013,7 +2013,7 @@ subsets: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2145,7 +2145,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2247,7 +2247,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2384,7 +2384,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } @@ -2504,7 +2504,7 @@ status: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local") if err != nil { t.Fatalf("can't create Endpoints watcher: %s", err) } diff --git a/controller/api/destination/watcher/prometheus.go b/controller/api/destination/watcher/prometheus.go index 102329662a829..602134014303f 100644 --- a/controller/api/destination/watcher/prometheus.go +++ b/controller/api/destination/watcher/prometheus.go @@ -58,8 +58,9 @@ func newMetricsVecs(name string, labels []string) metricsVecs { } } -func endpointsLabels(namespace, service, port string, hostname string) prometheus.Labels { +func endpointsLabels(cluster, namespace, service, port string, hostname string) prometheus.Labels { return prometheus.Labels{ + "cluster": cluster, "namespace": namespace, "service": service, "port": port, @@ -76,7 +77,7 @@ func labelNames(labels prometheus.Labels) []string { } func newEndpointsMetricsVecs() endpointsMetricsVecs { - labels := labelNames(endpointsLabels("", "", "", "")) + labels := labelNames(endpointsLabels("", "", "", "", "")) vecs := newMetricsVecs("endpoints", labels) pods := promauto.NewGaugeVec( diff --git a/controller/api/destination/watcher/test_util.go b/controller/api/destination/watcher/test_util.go index a23c3b009bfbc..4070d411ce2dc 100644 --- a/controller/api/destination/watcher/test_util.go +++ b/controller/api/destination/watcher/test_util.go @@ -6,6 +6,7 @@ import ( "github.com/go-test/deep" sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" + "github.com/linkerd/linkerd2/controller/k8s" ) // DeletingProfileListener implements ProfileUpdateListener and registers @@ -42,6 +43,24 @@ func NewBufferingProfileListener() *BufferingProfileListener { } } +func CreateMockDecoder(configs ...string) configDecoder { + // Create a mock decoder with some random objs to satisfy client creation + return func(data []byte, cluster string, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { + remoteAPI, err := k8s.NewFakeAPI(configs...) + if err != nil { + return nil, nil, err + } + + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + return nil, nil, err + } + + return remoteAPI, metadataAPI, nil + } + +} + // Update stores the update in the internal buffer. func (bpl *BufferingProfileListener) Update(profile *sp.ServiceProfile) { bpl.mu.Lock() diff --git a/controller/cmd/destination/main.go b/controller/cmd/destination/main.go index 77ae50a63b2d3..68bb191f47575 100644 --- a/controller/cmd/destination/main.go +++ b/controller/cmd/destination/main.go @@ -9,6 +9,7 @@ import ( "syscall" "github.com/linkerd/linkerd2/controller/api/destination" + "github.com/linkerd/linkerd2/controller/api/destination/watcher" "github.com/linkerd/linkerd2/controller/k8s" "github.com/linkerd/linkerd2/pkg/admin" "github.com/linkerd/linkerd2/pkg/flags" @@ -100,6 +101,7 @@ func Main(args []string) { ctx, *kubeConfigPath, true, + "local", k8s.Endpoint, k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, ) } else { @@ -107,6 +109,7 @@ func Main(args []string) { ctx, *kubeConfigPath, true, + "local", k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, ) } @@ -114,11 +117,16 @@ func Main(args []string) { log.Fatalf("Failed to initialize K8s API: %s", err) } - metadataAPI, err := k8s.InitializeMetadataAPI(*kubeConfigPath, k8s.Node, k8s.RS) + metadataAPI, err := k8s.InitializeMetadataAPI(*kubeConfigPath, "local", k8s.Node, k8s.RS) if err != nil { log.Fatalf("Failed to initialize Kubernetes metadata API: %s", err) } + clusterStore, err := watcher.NewClusterStore(k8Client, *controllerNamespace, *enableEndpointSlices) + if err != nil { + log.Fatalf("Failed to initialize Cluster Store: %s", err) + } + server, err := destination.NewServer( *addr, *controllerNamespace, @@ -127,6 +135,7 @@ func Main(args []string) { *enableEndpointSlices, k8sAPI, metadataAPI, + clusterStore, *clusterDomain, opaquePorts, done, @@ -139,6 +148,7 @@ func Main(args []string) { // blocks until caches are synced k8sAPI.Sync(nil) metadataAPI.Sync(nil) + clusterStore.Sync(nil) go func() { log.Infof("starting gRPC server on %s", *addr) diff --git a/controller/k8s/api.go b/controller/k8s/api.go index 6ed38e6b98432..cdd123a70890b 100644 --- a/controller/k8s/api.go +++ b/controller/k8s/api.go @@ -68,7 +68,7 @@ type API struct { } // InitializeAPI creates Kubernetes clients and returns an initialized API wrapper. -func InitializeAPI(ctx context.Context, kubeConfig string, ensureClusterWideAccess bool, resources ...APIResource) (*API, error) { +func InitializeAPI(ctx context.Context, kubeConfig string, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) { config, err := k8s.GetConfig(kubeConfig, "") if err != nil { return nil, fmt.Errorf("error configuring Kubernetes API client: %w", err) @@ -84,20 +84,20 @@ func InitializeAPI(ctx context.Context, kubeConfig string, ensureClusterWideAcce return nil, err } - return initAPI(ctx, k8sClient, dynamicClient, config, ensureClusterWideAccess, resources...) + return initAPI(ctx, k8sClient, dynamicClient, config, ensureClusterWideAccess, cluster, resources...) } // InitializeAPIForConfig creates Kubernetes clients and returns an initialized API wrapper. -func InitializeAPIForConfig(ctx context.Context, kubeConfig *rest.Config, ensureClusterWideAccess bool, resources ...APIResource) (*API, error) { +func InitializeAPIForConfig(ctx context.Context, kubeConfig *rest.Config, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) { k8sClient, err := k8s.NewAPIForConfig(kubeConfig, "", []string{}, 0) if err != nil { return nil, err } - return initAPI(ctx, k8sClient, nil, kubeConfig, ensureClusterWideAccess, resources...) + return initAPI(ctx, k8sClient, nil, kubeConfig, ensureClusterWideAccess, cluster, resources...) } -func initAPI(ctx context.Context, k8sClient *k8s.KubernetesAPI, dynamicClient dynamic.Interface, kubeConfig *rest.Config, ensureClusterWideAccess bool, resources ...APIResource) (*API, error) { +func initAPI(ctx context.Context, k8sClient *k8s.KubernetesAPI, dynamicClient dynamic.Interface, kubeConfig *rest.Config, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) { // check for cluster-wide access var err error @@ -132,7 +132,7 @@ func initAPI(ctx context.Context, k8sClient *k8s.KubernetesAPI, dynamicClient dy break } - api := NewClusterScopedAPI(k8sClient, dynamicClient, l5dCrdClient, resources...) + api := NewClusterScopedAPI(k8sClient, dynamicClient, l5dCrdClient, cluster, resources...) for _, gauge := range api.gauges { if err := prometheus.Register(gauge); err != nil { log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err) @@ -146,10 +146,11 @@ func NewClusterScopedAPI( k8sClient kubernetes.Interface, dynamicClient dynamic.Interface, l5dCrdClient l5dcrdclient.Interface, + cluster string, resources ...APIResource, ) *API { - sharedInformers := informers.NewSharedInformerFactory(k8sClient, resyncTime) - return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, resources...) + sharedInformers := informers.NewSharedInformerFactory(k8sClient, ResyncTime) + return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, cluster, resources...) } // NewNamespacedAPI takes a Kubernetes client and returns an initialized API scoped to namespace. @@ -158,10 +159,11 @@ func NewNamespacedAPI( dynamicClient dynamic.Interface, l5dCrdClient l5dcrdclient.Interface, namespace string, + cluster string, resources ...APIResource, ) *API { - sharedInformers := informers.NewSharedInformerFactoryWithOptions(k8sClient, resyncTime, informers.WithNamespace(namespace)) - return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, resources...) + sharedInformers := informers.NewSharedInformerFactoryWithOptions(k8sClient, ResyncTime, informers.WithNamespace(namespace)) + return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, cluster, resources...) } // newAPI takes a Kubernetes client and returns an initialized API. @@ -170,11 +172,12 @@ func newAPI( dynamicClient dynamic.Interface, l5dCrdClient l5dcrdclient.Interface, sharedInformers informers.SharedInformerFactory, + cluster string, resources ...APIResource, ) *API { var l5dCrdSharedInformers l5dcrdinformer.SharedInformerFactory if l5dCrdClient != nil { - l5dCrdSharedInformers = l5dcrdinformer.NewSharedInformerFactory(l5dCrdClient, resyncTime) + l5dCrdSharedInformers = l5dcrdinformer.NewSharedInformerFactory(l5dCrdClient, ResyncTime) } api := &API{ @@ -185,86 +188,90 @@ func newAPI( l5dCrdSharedInformers: l5dCrdSharedInformers, } + informerLabels := prometheus.Labels{ + "cluster": cluster, + } + for _, resource := range resources { switch resource { case CJ: api.cj = sharedInformers.Batch().V1().CronJobs() api.syncChecks = append(api.syncChecks, api.cj.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.CronJob, api.cj.Informer()) + api.promGauges.addInformerSize(k8s.CronJob, informerLabels, api.cj.Informer()) case CM: api.cm = sharedInformers.Core().V1().ConfigMaps() api.syncChecks = append(api.syncChecks, api.cm.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.ConfigMap, api.cm.Informer()) + api.promGauges.addInformerSize(k8s.ConfigMap, informerLabels, api.cm.Informer()) case Deploy: api.deploy = sharedInformers.Apps().V1().Deployments() api.syncChecks = append(api.syncChecks, api.deploy.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Deployment, api.deploy.Informer()) + api.promGauges.addInformerSize(k8s.Deployment, informerLabels, api.deploy.Informer()) case DS: api.ds = sharedInformers.Apps().V1().DaemonSets() api.syncChecks = append(api.syncChecks, api.ds.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.DaemonSet, api.ds.Informer()) + api.promGauges.addInformerSize(k8s.DaemonSet, informerLabels, api.ds.Informer()) case Endpoint: api.endpoint = sharedInformers.Core().V1().Endpoints() api.syncChecks = append(api.syncChecks, api.endpoint.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Endpoints, api.endpoint.Informer()) + api.promGauges.addInformerSize(k8s.Endpoints, informerLabels, api.endpoint.Informer()) case ES: api.es = sharedInformers.Discovery().V1().EndpointSlices() api.syncChecks = append(api.syncChecks, api.es.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.EndpointSlices, api.es.Informer()) + api.promGauges.addInformerSize(k8s.EndpointSlices, informerLabels, api.es.Informer()) case Job: api.job = sharedInformers.Batch().V1().Jobs() api.syncChecks = append(api.syncChecks, api.job.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Job, api.job.Informer()) + api.promGauges.addInformerSize(k8s.Job, informerLabels, api.job.Informer()) case MWC: api.mwc = sharedInformers.Admissionregistration().V1().MutatingWebhookConfigurations() api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.MutatingWebhookConfig, api.mwc.Informer()) + api.promGauges.addInformerSize(k8s.MutatingWebhookConfig, informerLabels, api.mwc.Informer()) case NS: api.ns = sharedInformers.Core().V1().Namespaces() api.syncChecks = append(api.syncChecks, api.ns.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Namespace, api.ns.Informer()) + api.promGauges.addInformerSize(k8s.Namespace, informerLabels, api.ns.Informer()) case Pod: api.pod = sharedInformers.Core().V1().Pods() api.syncChecks = append(api.syncChecks, api.pod.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Pod, api.pod.Informer()) + api.promGauges.addInformerSize(k8s.Pod, informerLabels, api.pod.Informer()) case RC: api.rc = sharedInformers.Core().V1().ReplicationControllers() api.syncChecks = append(api.syncChecks, api.rc.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.ReplicationController, api.rc.Informer()) + api.promGauges.addInformerSize(k8s.ReplicationController, informerLabels, api.rc.Informer()) case RS: api.rs = sharedInformers.Apps().V1().ReplicaSets() api.syncChecks = append(api.syncChecks, api.rs.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.ReplicaSet, api.rs.Informer()) + api.promGauges.addInformerSize(k8s.ReplicaSet, informerLabels, api.rs.Informer()) case SP: if l5dCrdSharedInformers == nil { panic("Linkerd CRD shared informer not configured") } api.sp = l5dCrdSharedInformers.Linkerd().V1alpha2().ServiceProfiles() api.syncChecks = append(api.syncChecks, api.sp.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.ServiceProfile, api.sp.Informer()) + api.promGauges.addInformerSize(k8s.ServiceProfile, informerLabels, api.sp.Informer()) case Srv: if l5dCrdSharedInformers == nil { panic("Linkerd CRD shared informer not configured") } api.srv = l5dCrdSharedInformers.Server().V1beta1().Servers() api.syncChecks = append(api.syncChecks, api.srv.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Server, api.srv.Informer()) + api.promGauges.addInformerSize(k8s.Server, informerLabels, api.srv.Informer()) case SS: api.ss = sharedInformers.Apps().V1().StatefulSets() api.syncChecks = append(api.syncChecks, api.ss.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.StatefulSet, api.ss.Informer()) + api.promGauges.addInformerSize(k8s.StatefulSet, informerLabels, api.ss.Informer()) case Svc: api.svc = sharedInformers.Core().V1().Services() api.syncChecks = append(api.syncChecks, api.svc.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Service, api.svc.Informer()) + api.promGauges.addInformerSize(k8s.Service, informerLabels, api.svc.Informer()) case Node: api.node = sharedInformers.Core().V1().Nodes() api.syncChecks = append(api.syncChecks, api.node.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Node, api.node.Informer()) + api.promGauges.addInformerSize(k8s.Node, informerLabels, api.node.Informer()) case Secret: api.secret = sharedInformers.Core().V1().Secrets() api.syncChecks = append(api.syncChecks, api.secret.Informer().HasSynced) - api.promGauges.addInformerSize(k8s.Secret, api.secret.Informer()) + api.promGauges.addInformerSize(k8s.Secret, informerLabels, api.secret.Informer()) } } return api diff --git a/controller/k8s/k8s.go b/controller/k8s/k8s.go index 196d8fda8c045..5954cea829524 100644 --- a/controller/k8s/k8s.go +++ b/controller/k8s/k8s.go @@ -11,7 +11,7 @@ import ( "k8s.io/client-go/tools/cache" ) -const resyncTime = 10 * time.Minute +const ResyncTime = 10 * time.Minute func waitForCacheSync(syncChecks []cache.InformerSynced) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) diff --git a/controller/k8s/metadata_api.go b/controller/k8s/metadata_api.go index 3fbae843bfe9b..ed76a54f3c340 100644 --- a/controller/k8s/metadata_api.go +++ b/controller/k8s/metadata_api.go @@ -34,21 +34,21 @@ type MetadataAPI struct { // InitializeMetadataAPI returns an instance of MetadataAPI with metadata // informers for the provided resources -func InitializeMetadataAPI(kubeConfig string, resources ...APIResource) (*MetadataAPI, error) { +func InitializeMetadataAPI(kubeConfig string, cluster string, resources ...APIResource) (*MetadataAPI, error) { config, err := k8s.GetConfig(kubeConfig, "") if err != nil { return nil, fmt.Errorf("error configuring Kubernetes API client: %w", err) } - return InitializeMetadataAPIForConfig(config, resources...) + return InitializeMetadataAPIForConfig(config, cluster, resources...) } -func InitializeMetadataAPIForConfig(kubeConfig *rest.Config, resources ...APIResource) (*MetadataAPI, error) { +func InitializeMetadataAPIForConfig(kubeConfig *rest.Config, cluster string, resources ...APIResource) (*MetadataAPI, error) { client, err := metadata.NewForConfig(kubeConfig) if err != nil { return nil, err } - api, err := newClusterScopedMetadataAPI(client, resources...) + api, err := newClusterScopedMetadataAPI(client, cluster, resources...) if err != nil { return nil, err } @@ -64,11 +64,12 @@ func InitializeMetadataAPIForConfig(kubeConfig *rest.Config, resources ...APIRes func newClusterScopedMetadataAPI( metadataClient metadata.Interface, + cluster string, resources ...APIResource, ) (*MetadataAPI, error) { sharedInformers := metadatainformer.NewFilteredSharedInformerFactory( metadataClient, - resyncTime, + ResyncTime, metav1.NamespaceAll, nil, ) @@ -80,8 +81,12 @@ func newClusterScopedMetadataAPI( sharedInformers: sharedInformers, } + informerLabels := prometheus.Labels{ + "cluster": cluster, + } + for _, resource := range resources { - if err := api.addInformer(resource); err != nil { + if err := api.addInformer(resource, informerLabels); err != nil { return nil, err } } @@ -262,7 +267,7 @@ func (api *MetadataAPI) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod return strings.ToLower(parent.Kind), parent.Name, nil } -func (api *MetadataAPI) addInformer(res APIResource) error { +func (api *MetadataAPI) addInformer(res APIResource, informerLabels prometheus.Labels) error { gvk, err := res.GVK() if err != nil { return err @@ -270,7 +275,7 @@ func (api *MetadataAPI) addInformer(res APIResource) error { gvr, _ := meta.UnsafeGuessKindToResource(gvk) inf := api.sharedInformers.ForResource(gvr) api.syncChecks = append(api.syncChecks, inf.Informer().HasSynced) - api.promGauges.addInformerSize(strings.ToLower(gvk.Kind), inf.Informer()) + api.promGauges.addInformerSize(strings.ToLower(gvk.Kind), informerLabels, inf.Informer()) api.inf[res] = inf return nil diff --git a/controller/k8s/prometheus.go b/controller/k8s/prometheus.go index e77a09f1862c9..aed1739db8628 100644 --- a/controller/k8s/prometheus.go +++ b/controller/k8s/prometheus.go @@ -11,10 +11,11 @@ type promGauges struct { gauges []prometheus.GaugeFunc } -func (p *promGauges) addInformerSize(kind string, inf cache.SharedIndexInformer) { +func (p *promGauges) addInformerSize(kind string, labels prometheus.Labels, inf cache.SharedIndexInformer) { p.gauges = append(p.gauges, prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: fmt.Sprintf("%s_cache_size", kind), - Help: fmt.Sprintf("Number of items in the client-go %s cache", kind), + Name: fmt.Sprintf("%s_cache_size", kind), + Help: fmt.Sprintf("Number of items in the client-go %s cache", kind), + ConstLabels: labels, }, func() float64 { return float64(len(inf.GetStore().ListKeys())) })) diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 2b320080ceae3..7337cc99cacc9 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -20,6 +20,7 @@ func NewFakeAPI(configs ...string) (*API, error) { clientSet, nil, spClientSet, + "fake", CJ, CM, Deploy, @@ -63,6 +64,7 @@ func NewFakeMetadataAPI(configs []string) (*MetadataAPI, error) { return newClusterScopedMetadataAPI( metadataClient, + "fake", CJ, CM, Deploy, diff --git a/controller/webhook/launcher.go b/controller/webhook/launcher.go index 6a6cba619be0c..c653cb62b7122 100644 --- a/controller/webhook/launcher.go +++ b/controller/webhook/launcher.go @@ -50,7 +50,7 @@ func Launch( log.Fatalf("error configuring Kubernetes API client: %s", err) } - metadataAPI, err := k8s.InitializeMetadataAPI(kubeconfig, apiresources...) + metadataAPI, err := k8s.InitializeMetadataAPI(kubeconfig, "local", apiresources...) if err != nil { //nolint:gocritic log.Fatalf("failed to initialize Kubernetes API: %s", err) diff --git a/multicluster/charts/linkerd-multicluster/templates/remote-access-service-mirror-rbac.yaml b/multicluster/charts/linkerd-multicluster/templates/remote-access-service-mirror-rbac.yaml index 83f7782580d87..4fb6311c500db 100644 --- a/multicluster/charts/linkerd-multicluster/templates/remote-access-service-mirror-rbac.yaml +++ b/multicluster/charts/linkerd-multicluster/templates/remote-access-service-mirror-rbac.yaml @@ -15,8 +15,20 @@ metadata: annotations: {{ include "partials.annotations.created-by" $ }} rules: +- apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["list", "get", "watch"] +- apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["list", "get", "watch"] - apiGroups: [""] - resources: ["services", "endpoints"] + resources: ["pods", "endpoints", "services"] + verbs: ["list", "get", "watch"] +- apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["list", "get", "watch"] +- apiGroups: ["policy.linkerd.io"] + resources: ["servers"] verbs: ["list", "get", "watch"] - apiGroups: [""] resources: ["configmaps"] diff --git a/multicluster/cmd/link.go b/multicluster/cmd/link.go index c24ec1a3b5cc5..57d3e692d2735 100644 --- a/multicluster/cmd/link.go +++ b/multicluster/cmd/link.go @@ -31,7 +31,11 @@ import ( "sigs.k8s.io/yaml" ) -const clusterNameLabel = "multicluster.linkerd.io/cluster-name" +const ( + clusterNameLabel = "multicluster.linkerd.io/cluster-name" + trustDomainAnnotation = "multicluster.linkerd.io/trust-domain" + clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain" +) type ( linkOptions struct { @@ -190,6 +194,10 @@ A full list of configurable values can be found at https://github.com/linkerd/li Labels: map[string]string{ clusterNameLabel: opts.clusterName, }, + Annotations: map[string]string{ + trustDomainAnnotation: configMap.IdentityTrustDomain, + clusterDomainAnnotation: configMap.ClusterDomain, + }, }, Data: map[string][]byte{ k8s.ConfigKeyName: kubeconfig, diff --git a/multicluster/cmd/service-mirror/main.go b/multicluster/cmd/service-mirror/main.go index db3081d7c8a41..ed3e1fb611bc6 100644 --- a/multicluster/cmd/service-mirror/main.go +++ b/multicluster/cmd/service-mirror/main.go @@ -94,6 +94,7 @@ func Main(args []string) { rootCtx, *kubeConfigPath, false, + "local", controllerK8s.NS, controllerK8s.Svc, controllerK8s.Endpoint, diff --git a/multicluster/cmd/testdata/install_default.golden b/multicluster/cmd/testdata/install_default.golden index 3d96ce2dcdfbf..a135f52a8ca1d 100644 --- a/multicluster/cmd/testdata/install_default.golden +++ b/multicluster/cmd/testdata/install_default.golden @@ -172,8 +172,20 @@ metadata: annotations: linkerd.io/created-by: linkerd/helm linkerdVersionValue rules: +- apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["list", "get", "watch"] +- apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["list", "get", "watch"] - apiGroups: [""] - resources: ["services", "endpoints"] + resources: ["pods", "endpoints", "services"] + verbs: ["list", "get", "watch"] +- apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["list", "get", "watch"] +- apiGroups: ["policy.linkerd.io"] + resources: ["servers"] verbs: ["list", "get", "watch"] - apiGroups: [""] resources: ["configmaps"] diff --git a/multicluster/cmd/testdata/install_ha.golden b/multicluster/cmd/testdata/install_ha.golden index 1b7a3074ab243..59d38a7f7c8ed 100644 --- a/multicluster/cmd/testdata/install_ha.golden +++ b/multicluster/cmd/testdata/install_ha.golden @@ -244,8 +244,20 @@ metadata: annotations: linkerd.io/created-by: linkerd/helm linkerdVersionValue rules: +- apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["list", "get", "watch"] +- apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["list", "get", "watch"] - apiGroups: [""] - resources: ["services", "endpoints"] + resources: ["pods", "endpoints", "services"] + verbs: ["list", "get", "watch"] +- apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["list", "get", "watch"] +- apiGroups: ["policy.linkerd.io"] + resources: ["servers"] verbs: ["list", "get", "watch"] - apiGroups: [""] resources: ["configmaps"] diff --git a/multicluster/cmd/testdata/install_psp.golden b/multicluster/cmd/testdata/install_psp.golden index 9c5e2c3c5c959..ae4e894b1953e 100644 --- a/multicluster/cmd/testdata/install_psp.golden +++ b/multicluster/cmd/testdata/install_psp.golden @@ -206,8 +206,20 @@ metadata: annotations: linkerd.io/created-by: linkerd/helm linkerdVersionValue rules: +- apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["list", "get", "watch"] +- apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["list", "get", "watch"] - apiGroups: [""] - resources: ["services", "endpoints"] + resources: ["pods", "endpoints", "services"] + verbs: ["list", "get", "watch"] +- apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["list", "get", "watch"] +- apiGroups: ["policy.linkerd.io"] + resources: ["servers"] verbs: ["list", "get", "watch"] - apiGroups: [""] resources: ["configmaps"] diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index 76a85d6f87140..bfd6cefd467f1 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -168,7 +168,7 @@ func NewRemoteClusterServiceWatcher( liveness chan bool, enableHeadlessSvc bool, ) (*RemoteClusterServiceWatcher, error) { - remoteAPI, err := k8s.InitializeAPIForConfig(ctx, cfg, false, k8s.Svc, k8s.Endpoint) + remoteAPI, err := k8s.InitializeAPIForConfig(ctx, cfg, false, clusterName, k8s.Svc, k8s.Endpoint) if err != nil { return nil, fmt.Errorf("cannot initialize api for target cluster %s: %w", clusterName, err) } diff --git a/pkg/healthcheck/healthcheck.go b/pkg/healthcheck/healthcheck.go index 3022da97e2d08..6705f96a87649 100644 --- a/pkg/healthcheck/healthcheck.go +++ b/pkg/healthcheck/healthcheck.go @@ -2369,7 +2369,7 @@ func (hc *HealthChecker) checkMisconfiguredOpaquePortAnnotations(ctx context.Con // This is used instead of `hc.kubeAPI` to limit multiple k8s API requests // and use the caching logic in the shared informers // TODO: move the shared informer code out of `controller/`, and into `pkg` to simplify the dependency tree. - kubeAPI := controllerK8s.NewClusterScopedAPI(hc.kubeAPI, nil, nil, controllerK8s.Endpoint, controllerK8s.Pod, controllerK8s.Svc) + kubeAPI := controllerK8s.NewClusterScopedAPI(hc.kubeAPI, nil, nil, "local", controllerK8s.Endpoint, controllerK8s.Pod, controllerK8s.Svc) kubeAPI.Sync(ctx.Done()) services, err := kubeAPI.Svc().Lister().Services(hc.DataPlaneNamespace).List(labels.Everything()) diff --git a/pkg/k8s/fake.go b/pkg/k8s/fake.go index a242c9d0a83b4..6ca91fcec4c9d 100644 --- a/pkg/k8s/fake.go +++ b/pkg/k8s/fake.go @@ -32,6 +32,13 @@ import ( "sigs.k8s.io/yaml" ) +func init() { + apiextensionsv1beta1.AddToScheme(scheme.Scheme) + apiextensionsv1.AddToScheme(scheme.Scheme) + apiregistrationv1.AddToScheme(scheme.Scheme) + spscheme.AddToScheme(scheme.Scheme) +} + // NewFakeAPI provides a mock KubernetesAPI backed by hard-coded resources func NewFakeAPI(configs ...string) (*KubernetesAPI, error) { client, apiextClient, apiregClient, _, err := NewFakeClientSets(configs...) @@ -193,10 +200,6 @@ func newFakeClientSetsFromManifests(readers []io.Reader) ( // ToRuntimeObject deserializes Kubernetes YAML into a Runtime Object func ToRuntimeObject(config string) (runtime.Object, error) { - apiextensionsv1beta1.AddToScheme(scheme.Scheme) - apiextensionsv1.AddToScheme(scheme.Scheme) - apiregistrationv1.AddToScheme(scheme.Scheme) - spscheme.AddToScheme(scheme.Scheme) decode := scheme.Codecs.UniversalDeserializer().Decode obj, _, err := decode([]byte(config), nil, nil) return obj, err diff --git a/test/integration/multicluster/testdata/allow.golden b/test/integration/multicluster/testdata/allow.golden index eda209c36ff48..24360e59c1932 100644 --- a/test/integration/multicluster/testdata/allow.golden +++ b/test/integration/multicluster/testdata/allow.golden @@ -15,16 +15,28 @@ metadata: annotations: linkerd.io/created-by: linkerd/cli {{ .Version }} rules: - - apiGroups: [""] - resources: ["services", "endpoints"] - verbs: ["list", "get", "watch"] - - apiGroups: [""] - resources: ["configmaps"] - verbs: ["get"] - resourceNames: ["linkerd-config"] - - apiGroups: [""] - resources: ["events"] - verbs: ["create", "patch"] +- apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["list", "get", "watch"] +- apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["list", "get", "watch"] +- apiGroups: [""] + resources: ["pods", "endpoints", "services"] + verbs: ["list", "get", "watch"] +- apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["list", "get", "watch"] +- apiGroups: ["policy.linkerd.io"] + resources: ["servers"] + verbs: ["list", "get", "watch"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get"] + resourceNames: ["linkerd-config"] +- apiGroups: [""] + resources: ["events"] + verbs: ["create", "patch"] --- apiVersion: v1 kind: ServiceAccount diff --git a/viz/metrics-api/cmd/main.go b/viz/metrics-api/cmd/main.go index 7d6bb67e59cb3..7a72a6692354c 100644 --- a/viz/metrics-api/cmd/main.go +++ b/viz/metrics-api/cmd/main.go @@ -54,6 +54,7 @@ func main() { ctx, *kubeConfigPath, true, + "local", k8s.CJ, k8s.DS, k8s.Deploy, k8s.Job, k8s.NS, k8s.Pod, k8s.RC, k8s.RS, k8s.Svc, k8s.SS, k8s.SP, ) if err != nil { diff --git a/viz/tap/api/main.go b/viz/tap/api/main.go index 129d51613b4f0..248a7427a9404 100644 --- a/viz/tap/api/main.go +++ b/viz/tap/api/main.go @@ -53,6 +53,7 @@ func Main(args []string) { ctx, *kubeConfigPath, true, + "local", k8s.CJ, k8s.DS, k8s.SS,