Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
provider: move kube endpoint provider client
Browse files Browse the repository at this point in the history
Signed-off-by: Whitney Griffith <[email protected]>
  • Loading branch information
whitneygriffith committed Jun 21, 2021
1 parent 548facb commit cef48f2
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 24 deletions.
4 changes: 2 additions & 2 deletions cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/debugger"
"github.com/openservicemesh/osm/pkg/endpoint"
"github.com/openservicemesh/osm/pkg/endpoint/providers/kube"
"github.com/openservicemesh/osm/pkg/envoy/ads"
"github.com/openservicemesh/osm/pkg/envoy/registry"
"github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
Expand All @@ -43,6 +42,7 @@ import (
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/metricsstore"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/providers/kube"
"github.com/openservicemesh/osm/pkg/signals"
"github.com/openservicemesh/osm/pkg/smi"
"github.com/openservicemesh/osm/pkg/validator"
Expand Down Expand Up @@ -183,7 +183,7 @@ func main() {
}
}

kubeProvider, err := kube.NewProvider(kubernetesClient, constants.KubeProviderName, cfg)
kubeProvider, err := kube.NewClient(kubernetesClient, constants.KubeProviderName, cfg)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes endpoints provider")
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/catalog/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ func TestListAllowedEndpointsForService(t *testing.T) {
mockConfigurator := configurator.NewMockConfigurator(mockCtrl)
mockKubeController := k8s.NewMockController(mockCtrl)
mockEndpointProvider := endpoint.NewMockProvider(mockCtrl)
mockServiceProvider := service.NewMockProvider(mockCtrl)
mockMeshSpec := smi.NewMockMeshSpec(mockCtrl)

mc := MeshCatalog{
kubeController: mockKubeController,
meshSpec: mockMeshSpec,
endpointsProviders: []endpoint.Provider{mockEndpointProvider},
serviceProviders: []service.Provider{mockServiceProvider},
}

mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(false).AnyTimes()
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
"github.com/openservicemesh/osm/pkg/certificate/providers/tresor"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/endpoint"
"github.com/openservicemesh/osm/pkg/endpoint/providers/kube"
"github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
configFake "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/ingress"
k8s "github.com/openservicemesh/osm/pkg/kubernetes"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/providers/kube"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/smi"
"github.com/openservicemesh/osm/pkg/tests"
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
"github.com/openservicemesh/osm/pkg/certificate/providers/tresor"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/endpoint"
"github.com/openservicemesh/osm/pkg/endpoint/providers/kube"
"github.com/openservicemesh/osm/pkg/ingress"
k8s "github.com/openservicemesh/osm/pkg/kubernetes"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/providers/kube"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/smi"
"github.com/openservicemesh/osm/pkg/tests"
Expand Down
4 changes: 4 additions & 0 deletions pkg/catalog/inbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,11 +1256,13 @@ func TestBuildInboundPolicies(t *testing.T) {
mockKubeController := k8s.NewMockController(mockCtrl)
mockMeshSpec := smi.NewMockMeshSpec(mockCtrl)
mockEndpointProvider := endpoint.NewMockProvider(mockCtrl)
mockServiceProvider := service.NewMockProvider(mockCtrl)

mc := MeshCatalog{
kubeController: mockKubeController,
meshSpec: mockMeshSpec,
endpointsProviders: []endpoint.Provider{mockEndpointProvider},
serviceProviders: []service.Provider{mockServiceProvider},
}

destK8sService := tests.NewServiceFixture(tc.inboundService.Name, tc.inboundService.Namespace, map[string]string{})
Expand Down Expand Up @@ -1571,11 +1573,13 @@ func TestListInboundPoliciesFromTrafficTargets(t *testing.T) {
mockKubeController := k8s.NewMockController(mockCtrl)
mockMeshSpec := smi.NewMockMeshSpec(mockCtrl)
mockEndpointProvider := endpoint.NewMockProvider(mockCtrl)
mockServiceProvider := service.NewMockProvider(mockCtrl)

mc := MeshCatalog{
kubeController: mockKubeController,
meshSpec: mockMeshSpec,
endpointsProviders: []endpoint.Provider{mockEndpointProvider},
serviceProviders: []service.Provider{mockServiceProvider},
}

for _, destMeshSvc := range tc.upstreamServices {
Expand Down
4 changes: 4 additions & 0 deletions pkg/catalog/outbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,13 @@ func TestBuildOutboundPolicies(t *testing.T) {
mockKubeController := k8s.NewMockController(mockCtrl)
mockMeshSpec := smi.NewMockMeshSpec(mockCtrl)
mockEndpointProvider := endpoint.NewMockProvider(mockCtrl)
mockServiceProvider := service.NewMockProvider(mockCtrl)

mc := MeshCatalog{
kubeController: mockKubeController,
meshSpec: mockMeshSpec,
endpointsProviders: []endpoint.Provider{mockEndpointProvider},
serviceProviders: []service.Provider{mockServiceProvider},
}

destK8sService := tests.NewServiceFixture(tc.destMeshService.Name, tc.destMeshService.Namespace, map[string]string{})
Expand Down Expand Up @@ -1074,10 +1076,12 @@ func TestGetDestinationServicesFromTrafficTarget(t *testing.T) {

mockKubeController := k8s.NewMockController(mockCtrl)
mockEndpointProvider := endpoint.NewMockProvider(mockCtrl)
mockServiceProvider := service.NewMockProvider(mockCtrl)

mc := MeshCatalog{
kubeController: mockKubeController,
endpointsProviders: []endpoint.Provider{mockEndpointProvider},
serviceProviders: []service.Provider{mockServiceProvider},
}

destSA := identity.K8sServiceAccount{
Expand Down
1 change: 1 addition & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
// MeshCatalog is the struct for the service catalog
type MeshCatalog struct {
endpointsProviders []endpoint.Provider
serviceProviders []service.Provider
meshSpec smi.MeshSpec
certManager certificate.Manager
ingressMonitor ingress.Monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
"github.com/openservicemesh/osm/pkg/identity"
k8s "github.com/openservicemesh/osm/pkg/kubernetes"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/utils"
)

// NewProvider implements mesh.EndpointsProvider, which creates a new Kubernetes cluster/compute provider.
func NewProvider(kubeController k8s.Controller, providerIdent string, cfg configurator.Configurator) (endpoint.Provider, error) {
// NewClient returns a client that has all components necessary to connect to and maintain state of a Kubernetes cluster.
func NewClient(kubeController k8s.Controller, providerIdent string, cfg configurator.Configurator) (*Client, error) {
client := Client{
providerIdent: providerIdent,
kubeController: kubeController,
Expand All @@ -26,7 +27,7 @@ func NewProvider(kubeController k8s.Controller, providerIdent string, cfg config
}

// GetID returns a string descriptor / identifier of the compute provider.
// Required by interface: EndpointsProvider
// Required by interfaces: EndpointsProvider, ServiceProvider
func (c *Client) GetID() string {
return c.providerIdent
}
Expand Down Expand Up @@ -234,3 +235,90 @@ func (c *Client) GetResolvableEndpointsForService(svc service.MeshService) ([]en

return endpoints, err
}

// ListServices returns a list of services that are part of monitored namespaces
func (c *Client) ListServices() ([]service.MeshService, error) {
var services []service.MeshService
for _, svc := range c.kubeController.ListServices() {
services = append(services, utils.K8sSvcToMeshSvc(svc))
}
return services, nil
}

// ListServiceIdentitiesForService lists the service identities associated with the given mesh service.
func (c *Client) ListServiceIdentitiesForService(svc service.MeshService) ([]identity.ServiceIdentity, error) {
serviceAccounts, err := c.kubeController.ListServiceIdentitiesForService(svc)
if err != nil {
log.Err(err).Msgf("Error getting ServiceAccounts for Service %s", svc)
return nil, err
}

var serviceIdentities []identity.ServiceIdentity
for _, svcAccount := range serviceAccounts {
serviceIdentity := svcAccount.ToServiceIdentity()
serviceIdentities = append(serviceIdentities, serviceIdentity)
}

return serviceIdentities, nil
}

// GetPortToProtocolMappingForService returns a mapping of the service's ports to their corresponding application protocol,
// where the ports returned are the ones used by downstream clients in their requests. This can be different from the ports
// actually exposed by the application binary, ie. 'spec.ports[].port' instead of 'spec.ports[].targetPort' for a Kubernetes service.
func (c *Client) GetPortToProtocolMappingForService(svc service.MeshService) (map[uint32]string, error) {
portToProtocolMap := make(map[uint32]string)

k8sSvc := c.kubeController.GetService(svc)
if k8sSvc == nil {
return nil, errors.Wrapf(errServiceNotFound, "Error retrieving k8s service %s", svc)
}

for _, portSpec := range k8sSvc.Spec.Ports {
var appProtocol string
if portSpec.AppProtocol != nil {
appProtocol = *portSpec.AppProtocol
} else {
appProtocol = k8s.GetAppProtocolFromPortName(portSpec.Name)
}
portToProtocolMap[uint32(portSpec.Port)] = appProtocol
}

return portToProtocolMap, nil
}

// GetHostnamesForService returns a list of hostnames over which the service can be accessed within the local cluster.
func (c *Client) GetHostnamesForService(svc service.MeshService, locality service.Locality) ([]string, error) {
k8svc := c.kubeController.GetService(svc)
if k8svc == nil {
return nil, errors.Errorf("Error fetching service %q", svc)
}

hostnames := k8s.GetHostnamesForService(k8svc, locality)
return hostnames, nil
}

// GetServicesByLabels gets Kubernetes services whose selectors match the given labels
func (c *Client) GetServicesByLabels(podLabels map[string]string, namespace string) ([]service.MeshService, error) {
var finalList []service.MeshService
serviceList := c.kubeController.ListServices()

for _, svc := range serviceList {
// TODO: #1684 Introduce APIs to dynamically allow applying selectors, instead of callers implementing
// filtering themselves
if svc.Namespace != namespace {
continue
}

svcRawSelector := svc.Spec.Selector
// service has no selectors, we do not need to match against the pod label
if len(svcRawSelector) == 0 {
continue
}
selector := labels.Set(svcRawSelector).AsSelector()
if selector.Matches(labels.Set(podLabels)) {
finalList = append(finalList, utils.K8sSvcToMeshSvc(svc))
}
}

return finalList, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() {
mockKubeController *k8s.MockController
mockConfigurator *configurator.MockConfigurator

provider endpoint.Provider
err error
client *Client
err error
)

mockCtrl = gomock.NewController(GinkgoT())
Expand All @@ -47,12 +47,12 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() {
mockKubeController.EXPECT().IsMonitoredNamespace(tests.BookbuyerService.Namespace).Return(true).AnyTimes()

BeforeEach(func() {
provider, err = NewProvider(mockKubeController, providerID, mockConfigurator)
client, err = NewClient(mockKubeController, providerID, mockConfigurator)
Expect(err).ToNot(HaveOccurred())
})

It("tests GetID", func() {
Expect(provider.GetID()).To(Equal(providerID))
Expect(client.GetID()).To(Equal(providerID))
})

It("should correctly return a list of endpoints for a service", func() {
Expand All @@ -77,7 +77,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() {
},
}, nil)

Expect(provider.ListEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{
Expect(client.ListEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{
{
IP: net.IPv4(8, 8, 8, 8),
Port: 88,
Expand Down Expand Up @@ -105,7 +105,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() {
},
})

Expect(provider.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{
Expect(client.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{
{
IP: net.IPv4(192, 168, 0, 1),
Port: tests.ServicePort,
Expand Down Expand Up @@ -154,7 +154,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() {
},
}, nil)

Expect(provider.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{
Expect(client.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{
{
IP: net.IPv4(8, 8, 8, 8),
Port: 88,
Expand Down Expand Up @@ -205,7 +205,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() {
},
}, nil)

Expect(provider.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{
Expect(client.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{
{
IP: net.IPv4(8, 8, 8, 8),
Port: 88,
Expand Down Expand Up @@ -273,7 +273,7 @@ var _ = Describe("Test Kube Client Provider (w/o kubecontroller)", func() {
},
}, nil)

portToProtocolMap, err := provider.GetTargetPortToProtocolMappingForService(tests.BookbuyerService)
portToProtocolMap, err := client.GetTargetPortToProtocolMappingForService(tests.BookbuyerService)
Expect(err).To(BeNil())

expectedPortToProtocolMap := map[uint32]string{70: "tcp", 80: "http", 90: "http", 100: "tcp", 110: "grpc", 120: "http", 130: "tcp"}
Expand All @@ -287,7 +287,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() {
kubeController k8s.Controller
mockConfigurator *configurator.MockConfigurator
fakeClientSet *testclient.Clientset
provider endpoint.Provider
client *Client
err error
)
mockCtrl = gomock.NewController(GinkgoT())
Expand Down Expand Up @@ -317,7 +317,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() {
}, 3*time.Second).Should(BeTrue())

Expect(err).ToNot(HaveOccurred())
provider, err = NewProvider(kubeController, providerID, mockConfigurator)
client, err = NewClient(kubeController, providerID, mockConfigurator)
Expect(err).ToNot(HaveOccurred())
})

Expand Down Expand Up @@ -346,7 +346,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() {
_, err := fakeClientSet.CoreV1().Services(testNamespace).Create(context.TODO(), svc, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

services, err := provider.GetServicesForServiceAccount(tests.BookbuyerServiceAccount)
services, err := client.GetServicesForServiceAccount(tests.BookbuyerServiceAccount)
Expect(err).To(HaveOccurred())
Expect(services).To(BeNil())

Expand Down Expand Up @@ -425,7 +425,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() {
// Expect a MeshService that corresponds to a Service that matches the Pod spec labels
expectedMeshSvc := utils.K8sSvcToMeshSvc(svc)

meshSvcs, err := provider.GetServicesForServiceAccount(givenSvcAccount)
meshSvcs, err := client.GetServicesForServiceAccount(givenSvcAccount)
Expect(err).ToNot(HaveOccurred())
expectedMeshSvcs := []service.MeshService{expectedMeshSvc}
Expect(meshSvcs).To(Equal(expectedMeshSvcs))
Expand Down Expand Up @@ -498,7 +498,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() {
}

// Expect a MeshService that corresponds to a Service that matches the Deployment spec labels
svcs, err := provider.GetServicesForServiceAccount(givenSvcAccount)
svcs, err := client.GetServicesForServiceAccount(givenSvcAccount)
Expect(err).To(HaveOccurred())
Expect(svcs).To(BeNil())

Expand Down Expand Up @@ -566,7 +566,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() {
}

// Expect a MeshService that corresponds to a Service that matches the Deployment spec labels
svcs, err := provider.GetServicesForServiceAccount(givenSvcAccount)
svcs, err := client.GetServicesForServiceAccount(givenSvcAccount)
Expect(err).To(HaveOccurred())
Expect(svcs).To(BeNil())

Expand Down Expand Up @@ -652,7 +652,7 @@ var _ = Describe("Test Kube Client Provider (/w kubecontroller)", func() {
Name: "test-service-account", // Should match the service account in the Deployment spec above
}

meshServices, err := provider.GetServicesForServiceAccount(givenSvcAccount)
meshServices, err := client.GetServicesForServiceAccount(givenSvcAccount)
Expect(err).ToNot(HaveOccurred())
expectedServices := []service.MeshService{
{Name: "test-1", Namespace: testNamespace},
Expand Down Expand Up @@ -721,7 +721,7 @@ func TestListEndpointsForIdentity(t *testing.T) {
mockConfigurator := configurator.NewMockConfigurator(mockCtrl)
providerID := "provider"

provider, err := NewProvider(mockKubeController, providerID, mockConfigurator)
provider, err := NewClient(mockKubeController, providerID, mockConfigurator)
assert.Nil(err)

var pods []*corev1.Pod
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit cef48f2

Please sign in to comment.