From cef48f2bbbb0a61be5b334a80c0a2197a50b4d40 Mon Sep 17 00:00:00 2001 From: Whitney Griffith Date: Wed, 16 Jun 2021 16:56:21 -0700 Subject: [PATCH] provider: move kube endpoint provider client Signed-off-by: Whitney Griffith --- cmd/osm-controller/osm-controller.go | 4 +- pkg/catalog/endpoint_test.go | 2 + pkg/catalog/fake.go | 2 +- pkg/catalog/helpers_test.go | 2 +- pkg/catalog/inbound_traffic_policies_test.go | 4 + pkg/catalog/outbound_traffic_policies_test.go | 4 + pkg/catalog/types.go | 1 + pkg/{endpoint => }/providers/kube/client.go | 94 ++++++++++++++++++- .../providers/kube/client_test.go | 34 +++---- pkg/{endpoint => }/providers/kube/errors.go | 0 pkg/{endpoint => }/providers/kube/fake.go | 0 .../providers/kube/suite_test.go | 0 pkg/{endpoint => }/providers/kube/types.go | 0 13 files changed, 123 insertions(+), 24 deletions(-) rename pkg/{endpoint => }/providers/kube/client.go (67%) rename pkg/{endpoint => }/providers/kube/client_test.go (94%) rename pkg/{endpoint => }/providers/kube/errors.go (100%) rename pkg/{endpoint => }/providers/kube/fake.go (100%) rename pkg/{endpoint => }/providers/kube/suite_test.go (100%) rename pkg/{endpoint => }/providers/kube/types.go (100%) diff --git a/cmd/osm-controller/osm-controller.go b/cmd/osm-controller/osm-controller.go index 6838d3b800..b695233f60 100644 --- a/cmd/osm-controller/osm-controller.go +++ b/cmd/osm-controller/osm-controller.go @@ -31,7 +31,6 @@ import ( "github.com/openservicemesh/osm/pkg/constants" "github.com/openservicemesh/osm/pkg/debugger" "github.com/openservicemesh/osm/pkg/endpoint" - "github.com/openservicemesh/osm/pkg/endpoint/providers/kube" "github.com/openservicemesh/osm/pkg/envoy/ads" "github.com/openservicemesh/osm/pkg/envoy/registry" "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned" @@ -43,6 +42,7 @@ import ( "github.com/openservicemesh/osm/pkg/logger" "github.com/openservicemesh/osm/pkg/metricsstore" "github.com/openservicemesh/osm/pkg/policy" + "github.com/openservicemesh/osm/pkg/providers/kube" "github.com/openservicemesh/osm/pkg/signals" "github.com/openservicemesh/osm/pkg/smi" "github.com/openservicemesh/osm/pkg/validator" @@ -183,7 +183,7 @@ func main() { } } - kubeProvider, err := kube.NewProvider(kubernetesClient, constants.KubeProviderName, cfg) + kubeProvider, err := kube.NewClient(kubernetesClient, constants.KubeProviderName, cfg) if err != nil { events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes endpoints provider") } diff --git a/pkg/catalog/endpoint_test.go b/pkg/catalog/endpoint_test.go index 143d676433..1fb2b5f559 100644 --- a/pkg/catalog/endpoint_test.go +++ b/pkg/catalog/endpoint_test.go @@ -139,12 +139,14 @@ func TestListAllowedEndpointsForService(t *testing.T) { mockConfigurator := configurator.NewMockConfigurator(mockCtrl) mockKubeController := k8s.NewMockController(mockCtrl) mockEndpointProvider := endpoint.NewMockProvider(mockCtrl) + mockServiceProvider := service.NewMockProvider(mockCtrl) mockMeshSpec := smi.NewMockMeshSpec(mockCtrl) mc := MeshCatalog{ kubeController: mockKubeController, meshSpec: mockMeshSpec, endpointsProviders: []endpoint.Provider{mockEndpointProvider}, + serviceProviders: []service.Provider{mockServiceProvider}, } mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(false).AnyTimes() diff --git a/pkg/catalog/fake.go b/pkg/catalog/fake.go index 5d47678310..46bcdf6293 100644 --- a/pkg/catalog/fake.go +++ b/pkg/catalog/fake.go @@ -13,13 +13,13 @@ import ( "github.com/openservicemesh/osm/pkg/certificate/providers/tresor" "github.com/openservicemesh/osm/pkg/configurator" "github.com/openservicemesh/osm/pkg/endpoint" - "github.com/openservicemesh/osm/pkg/endpoint/providers/kube" "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned" configFake "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake" "github.com/openservicemesh/osm/pkg/identity" "github.com/openservicemesh/osm/pkg/ingress" k8s "github.com/openservicemesh/osm/pkg/kubernetes" "github.com/openservicemesh/osm/pkg/policy" + "github.com/openservicemesh/osm/pkg/providers/kube" "github.com/openservicemesh/osm/pkg/service" "github.com/openservicemesh/osm/pkg/smi" "github.com/openservicemesh/osm/pkg/tests" diff --git a/pkg/catalog/helpers_test.go b/pkg/catalog/helpers_test.go index 44e0665f81..e9e206e9c7 100644 --- a/pkg/catalog/helpers_test.go +++ b/pkg/catalog/helpers_test.go @@ -16,10 +16,10 @@ import ( "github.com/openservicemesh/osm/pkg/certificate/providers/tresor" "github.com/openservicemesh/osm/pkg/configurator" "github.com/openservicemesh/osm/pkg/endpoint" - "github.com/openservicemesh/osm/pkg/endpoint/providers/kube" "github.com/openservicemesh/osm/pkg/ingress" k8s "github.com/openservicemesh/osm/pkg/kubernetes" "github.com/openservicemesh/osm/pkg/policy" + "github.com/openservicemesh/osm/pkg/providers/kube" "github.com/openservicemesh/osm/pkg/service" "github.com/openservicemesh/osm/pkg/smi" "github.com/openservicemesh/osm/pkg/tests" diff --git a/pkg/catalog/inbound_traffic_policies_test.go b/pkg/catalog/inbound_traffic_policies_test.go index 172cb084a9..a4e0c0c1bc 100644 --- a/pkg/catalog/inbound_traffic_policies_test.go +++ b/pkg/catalog/inbound_traffic_policies_test.go @@ -1256,11 +1256,13 @@ func TestBuildInboundPolicies(t *testing.T) { mockKubeController := k8s.NewMockController(mockCtrl) mockMeshSpec := smi.NewMockMeshSpec(mockCtrl) mockEndpointProvider := endpoint.NewMockProvider(mockCtrl) + mockServiceProvider := service.NewMockProvider(mockCtrl) mc := MeshCatalog{ kubeController: mockKubeController, meshSpec: mockMeshSpec, endpointsProviders: []endpoint.Provider{mockEndpointProvider}, + serviceProviders: []service.Provider{mockServiceProvider}, } destK8sService := tests.NewServiceFixture(tc.inboundService.Name, tc.inboundService.Namespace, map[string]string{}) @@ -1571,11 +1573,13 @@ func TestListInboundPoliciesFromTrafficTargets(t *testing.T) { mockKubeController := k8s.NewMockController(mockCtrl) mockMeshSpec := smi.NewMockMeshSpec(mockCtrl) mockEndpointProvider := endpoint.NewMockProvider(mockCtrl) + mockServiceProvider := service.NewMockProvider(mockCtrl) mc := MeshCatalog{ kubeController: mockKubeController, meshSpec: mockMeshSpec, endpointsProviders: []endpoint.Provider{mockEndpointProvider}, + serviceProviders: []service.Provider{mockServiceProvider}, } for _, destMeshSvc := range tc.upstreamServices { diff --git a/pkg/catalog/outbound_traffic_policies_test.go b/pkg/catalog/outbound_traffic_policies_test.go index cd9d6d3f53..e27490415b 100644 --- a/pkg/catalog/outbound_traffic_policies_test.go +++ b/pkg/catalog/outbound_traffic_policies_test.go @@ -928,11 +928,13 @@ func TestBuildOutboundPolicies(t *testing.T) { mockKubeController := k8s.NewMockController(mockCtrl) mockMeshSpec := smi.NewMockMeshSpec(mockCtrl) mockEndpointProvider := endpoint.NewMockProvider(mockCtrl) + mockServiceProvider := service.NewMockProvider(mockCtrl) mc := MeshCatalog{ kubeController: mockKubeController, meshSpec: mockMeshSpec, endpointsProviders: []endpoint.Provider{mockEndpointProvider}, + serviceProviders: []service.Provider{mockServiceProvider}, } destK8sService := tests.NewServiceFixture(tc.destMeshService.Name, tc.destMeshService.Namespace, map[string]string{}) @@ -1074,10 +1076,12 @@ func TestGetDestinationServicesFromTrafficTarget(t *testing.T) { mockKubeController := k8s.NewMockController(mockCtrl) mockEndpointProvider := endpoint.NewMockProvider(mockCtrl) + mockServiceProvider := service.NewMockProvider(mockCtrl) mc := MeshCatalog{ kubeController: mockKubeController, endpointsProviders: []endpoint.Provider{mockEndpointProvider}, + serviceProviders: []service.Provider{mockServiceProvider}, } destSA := identity.K8sServiceAccount{ diff --git a/pkg/catalog/types.go b/pkg/catalog/types.go index 03430468a2..f18cf0e867 100644 --- a/pkg/catalog/types.go +++ b/pkg/catalog/types.go @@ -25,6 +25,7 @@ var ( // MeshCatalog is the struct for the service catalog type MeshCatalog struct { endpointsProviders []endpoint.Provider + serviceProviders []service.Provider meshSpec smi.MeshSpec certManager certificate.Manager ingressMonitor ingress.Monitor diff --git a/pkg/endpoint/providers/kube/client.go b/pkg/providers/kube/client.go similarity index 67% rename from pkg/endpoint/providers/kube/client.go rename to pkg/providers/kube/client.go index a660e5005f..79edf137d0 100644 --- a/pkg/endpoint/providers/kube/client.go +++ b/pkg/providers/kube/client.go @@ -13,10 +13,11 @@ import ( "github.com/openservicemesh/osm/pkg/identity" k8s "github.com/openservicemesh/osm/pkg/kubernetes" "github.com/openservicemesh/osm/pkg/service" + "github.com/openservicemesh/osm/pkg/utils" ) -// NewProvider implements mesh.EndpointsProvider, which creates a new Kubernetes cluster/compute provider. -func NewProvider(kubeController k8s.Controller, providerIdent string, cfg configurator.Configurator) (endpoint.Provider, error) { +// NewClient returns a client that has all components necessary to connect to and maintain state of a Kubernetes cluster. +func NewClient(kubeController k8s.Controller, providerIdent string, cfg configurator.Configurator) (*Client, error) { client := Client{ providerIdent: providerIdent, kubeController: kubeController, @@ -26,7 +27,7 @@ func NewProvider(kubeController k8s.Controller, providerIdent string, cfg config } // GetID returns a string descriptor / identifier of the compute provider. -// Required by interface: EndpointsProvider +// Required by interfaces: EndpointsProvider, ServiceProvider func (c *Client) GetID() string { return c.providerIdent } @@ -234,3 +235,90 @@ func (c *Client) GetResolvableEndpointsForService(svc service.MeshService) ([]en return endpoints, err } + +// ListServices returns a list of services that are part of monitored namespaces +func (c *Client) ListServices() ([]service.MeshService, error) { + var services []service.MeshService + for _, svc := range c.kubeController.ListServices() { + services = append(services, utils.K8sSvcToMeshSvc(svc)) + } + return services, nil +} + +// ListServiceIdentitiesForService lists the service identities associated with the given mesh service. +func (c *Client) ListServiceIdentitiesForService(svc service.MeshService) ([]identity.ServiceIdentity, error) { + serviceAccounts, err := c.kubeController.ListServiceIdentitiesForService(svc) + if err != nil { + log.Err(err).Msgf("Error getting ServiceAccounts for Service %s", svc) + return nil, err + } + + var serviceIdentities []identity.ServiceIdentity + for _, svcAccount := range serviceAccounts { + serviceIdentity := svcAccount.ToServiceIdentity() + serviceIdentities = append(serviceIdentities, serviceIdentity) + } + + return serviceIdentities, nil +} + +// GetPortToProtocolMappingForService returns a mapping of the service's ports to their corresponding application protocol, +// where the ports returned are the ones used by downstream clients in their requests. This can be different from the ports +// actually exposed by the application binary, ie. 'spec.ports[].port' instead of 'spec.ports[].targetPort' for a Kubernetes service. +func (c *Client) GetPortToProtocolMappingForService(svc service.MeshService) (map[uint32]string, error) { + portToProtocolMap := make(map[uint32]string) + + k8sSvc := c.kubeController.GetService(svc) + if k8sSvc == nil { + return nil, errors.Wrapf(errServiceNotFound, "Error retrieving k8s service %s", svc) + } + + for _, portSpec := range k8sSvc.Spec.Ports { + var appProtocol string + if portSpec.AppProtocol != nil { + appProtocol = *portSpec.AppProtocol + } else { + appProtocol = k8s.GetAppProtocolFromPortName(portSpec.Name) + } + portToProtocolMap[uint32(portSpec.Port)] = appProtocol + } + + return portToProtocolMap, nil +} + +// GetHostnamesForService returns a list of hostnames over which the service can be accessed within the local cluster. +func (c *Client) GetHostnamesForService(svc service.MeshService, locality service.Locality) ([]string, error) { + k8svc := c.kubeController.GetService(svc) + if k8svc == nil { + return nil, errors.Errorf("Error fetching service %q", svc) + } + + hostnames := k8s.GetHostnamesForService(k8svc, locality) + return hostnames, nil +} + +// GetServicesByLabels gets Kubernetes services whose selectors match the given labels +func (c *Client) GetServicesByLabels(podLabels map[string]string, namespace string) ([]service.MeshService, error) { + var finalList []service.MeshService + serviceList := c.kubeController.ListServices() + + for _, svc := range serviceList { + // TODO: #1684 Introduce APIs to dynamically allow applying selectors, instead of callers implementing + // filtering themselves + if svc.Namespace != namespace { + continue + } + + svcRawSelector := svc.Spec.Selector + // service has no selectors, we do not need to match against the pod label + if len(svcRawSelector) == 0 { + continue + } + selector := labels.Set(svcRawSelector).AsSelector() + if selector.Matches(labels.Set(podLabels)) { + finalList = append(finalList, utils.K8sSvcToMeshSvc(svc)) + } + } + + return finalList, nil +} diff --git a/pkg/endpoint/providers/kube/client_test.go b/pkg/providers/kube/client_test.go similarity index 94% rename from pkg/endpoint/providers/kube/client_test.go rename to pkg/providers/kube/client_test.go index a74ba4d47e..a014e71d19 100644 --- a/pkg/endpoint/providers/kube/client_test.go +++ b/pkg/providers/kube/client_test.go @@ -34,8 +34,8 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() { mockKubeController *k8s.MockController mockConfigurator *configurator.MockConfigurator - provider endpoint.Provider - err error + client *Client + err error ) mockCtrl = gomock.NewController(GinkgoT()) @@ -47,12 +47,12 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() { mockKubeController.EXPECT().IsMonitoredNamespace(tests.BookbuyerService.Namespace).Return(true).AnyTimes() BeforeEach(func() { - provider, err = NewProvider(mockKubeController, providerID, mockConfigurator) + client, err = NewClient(mockKubeController, providerID, mockConfigurator) Expect(err).ToNot(HaveOccurred()) }) It("tests GetID", func() { - Expect(provider.GetID()).To(Equal(providerID)) + Expect(client.GetID()).To(Equal(providerID)) }) It("should correctly return a list of endpoints for a service", func() { @@ -77,7 +77,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() { }, }, nil) - Expect(provider.ListEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ + Expect(client.ListEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ { IP: net.IPv4(8, 8, 8, 8), Port: 88, @@ -105,7 +105,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() { }, }) - Expect(provider.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ + Expect(client.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ { IP: net.IPv4(192, 168, 0, 1), Port: tests.ServicePort, @@ -154,7 +154,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() { }, }, nil) - Expect(provider.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ + Expect(client.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ { IP: net.IPv4(8, 8, 8, 8), Port: 88, @@ -205,7 +205,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() { }, }, nil) - Expect(provider.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ + Expect(client.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ { IP: net.IPv4(8, 8, 8, 8), Port: 88, @@ -273,7 +273,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() { }, }, nil) - portToProtocolMap, err := provider.GetTargetPortToProtocolMappingForService(tests.BookbuyerService) + portToProtocolMap, err := client.GetTargetPortToProtocolMappingForService(tests.BookbuyerService) Expect(err).To(BeNil()) expectedPortToProtocolMap := map[uint32]string{70: "tcp", 80: "http", 90: "http", 100: "tcp", 110: "grpc", 120: "http", 130: "tcp"} @@ -287,7 +287,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() { kubeController k8s.Controller mockConfigurator *configurator.MockConfigurator fakeClientSet *testclient.Clientset - provider endpoint.Provider + client *Client err error ) mockCtrl = gomock.NewController(GinkgoT()) @@ -317,7 +317,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() { }, 3*time.Second).Should(BeTrue()) Expect(err).ToNot(HaveOccurred()) - provider, err = NewProvider(kubeController, providerID, mockConfigurator) + client, err = NewClient(kubeController, providerID, mockConfigurator) Expect(err).ToNot(HaveOccurred()) }) @@ -346,7 +346,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() { _, err := fakeClientSet.CoreV1().Services(testNamespace).Create(context.TODO(), svc, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - services, err := provider.GetServicesForServiceAccount(tests.BookbuyerServiceAccount) + services, err := client.GetServicesForServiceAccount(tests.BookbuyerServiceAccount) Expect(err).To(HaveOccurred()) Expect(services).To(BeNil()) @@ -425,7 +425,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() { // Expect a MeshService that corresponds to a Service that matches the Pod spec labels expectedMeshSvc := utils.K8sSvcToMeshSvc(svc) - meshSvcs, err := provider.GetServicesForServiceAccount(givenSvcAccount) + meshSvcs, err := client.GetServicesForServiceAccount(givenSvcAccount) Expect(err).ToNot(HaveOccurred()) expectedMeshSvcs := []service.MeshService{expectedMeshSvc} Expect(meshSvcs).To(Equal(expectedMeshSvcs)) @@ -498,7 +498,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() { } // Expect a MeshService that corresponds to a Service that matches the Deployment spec labels - svcs, err := provider.GetServicesForServiceAccount(givenSvcAccount) + svcs, err := client.GetServicesForServiceAccount(givenSvcAccount) Expect(err).To(HaveOccurred()) Expect(svcs).To(BeNil()) @@ -566,7 +566,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() { } // Expect a MeshService that corresponds to a Service that matches the Deployment spec labels - svcs, err := provider.GetServicesForServiceAccount(givenSvcAccount) + svcs, err := client.GetServicesForServiceAccount(givenSvcAccount) Expect(err).To(HaveOccurred()) Expect(svcs).To(BeNil()) @@ -652,7 +652,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() { Name: "test-service-account", // Should match the service account in the Deployment spec above } - meshServices, err := provider.GetServicesForServiceAccount(givenSvcAccount) + meshServices, err := client.GetServicesForServiceAccount(givenSvcAccount) Expect(err).ToNot(HaveOccurred()) expectedServices := []service.MeshService{ {Name: "test-1", Namespace: testNamespace}, @@ -721,7 +721,7 @@ func TestListEndpointsForIdentity(t *testing.T) { mockConfigurator := configurator.NewMockConfigurator(mockCtrl) providerID := "provider" - provider, err := NewProvider(mockKubeController, providerID, mockConfigurator) + provider, err := NewClient(mockKubeController, providerID, mockConfigurator) assert.Nil(err) var pods []*corev1.Pod diff --git a/pkg/endpoint/providers/kube/errors.go b/pkg/providers/kube/errors.go similarity index 100% rename from pkg/endpoint/providers/kube/errors.go rename to pkg/providers/kube/errors.go diff --git a/pkg/endpoint/providers/kube/fake.go b/pkg/providers/kube/fake.go similarity index 100% rename from pkg/endpoint/providers/kube/fake.go rename to pkg/providers/kube/fake.go diff --git a/pkg/endpoint/providers/kube/suite_test.go b/pkg/providers/kube/suite_test.go similarity index 100% rename from pkg/endpoint/providers/kube/suite_test.go rename to pkg/providers/kube/suite_test.go diff --git a/pkg/endpoint/providers/kube/types.go b/pkg/providers/kube/types.go similarity index 100% rename from pkg/endpoint/providers/kube/types.go rename to pkg/providers/kube/types.go