Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
egress: add UpstreamTrafficSetting support (#4594)
Browse files Browse the repository at this point in the history
- Implements capability to apply UpstreamTrafficSetting
  configuration to an Egress destination.

- Refactors CDS to reuse code related to connection
  settings.

- Updates validation webhook for Egress API.

Part of #4500

Signed-off-by: Shashank Ram <[email protected]>
  • Loading branch information
shashankram authored Mar 17, 2022
1 parent caaa189 commit 7ecd8e9
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 101 deletions.
65 changes: 55 additions & 10 deletions pkg/catalog/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ import (
"strings"

mapset "github.com/deckarep/golang-set"
"github.com/pkg/errors"
smiSpecs "github.com/servicemeshinterface/smi-sdk-go/pkg/apis/specs/v1alpha4"
"k8s.io/apimachinery/pkg/types"

policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"

policyV1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/smi"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
)

const (
// upstreamTrafficSettingKind is the upstreamTrafficSettingKind API kind
upstreamTrafficSettingKind = "UpstreamTrafficSetting"
)

// GetEgressTrafficPolicy returns the Egress traffic policy associated with the given service identity
func (mc *MeshCatalog) GetEgressTrafficPolicy(serviceIdentity identity.ServiceIdentity) (*trafficpolicy.EgressTrafficPolicy, error) {
if !mc.configurator.GetFeatureFlags().EnableEgressPolicy {
Expand All @@ -29,12 +38,18 @@ func (mc *MeshCatalog) GetEgressTrafficPolicy(serviceIdentity identity.ServiceId
egressResources := mc.policyController.ListEgressPoliciesForSourceIdentity(serviceIdentity.ToK8sServiceAccount())

for _, egress := range egressResources {
upstreamTrafficSetting, err := mc.getUpstreamTrafficSettingForEgress(egress)
if err != nil {
log.Error().Err(err).Msg("Ignoring invalid Egress policy")
continue
}

for _, portSpec := range egress.Spec.Ports {
switch strings.ToLower(portSpec.Protocol) {
case constants.ProtocolHTTP:
// ---
// Build the HTTP route configs for the given Egress policy
httpRouteConfigs, httpClusterConfigs := mc.buildHTTPRouteConfigs(egress, portSpec.Number)
httpRouteConfigs, httpClusterConfigs := mc.buildHTTPRouteConfigs(egress, portSpec.Number, upstreamTrafficSetting)
portToRouteConfigMap[portSpec.Number] = append(portToRouteConfigMap[portSpec.Number], httpRouteConfigs...)
clusterConfigs = append(clusterConfigs, httpClusterConfigs...)

Expand All @@ -48,8 +63,9 @@ func (mc *MeshCatalog) GetEgressTrafficPolicy(serviceIdentity identity.ServiceId
// ---
// Build the TCP cluster config for this port
clusterConfigs = append(clusterConfigs, &trafficpolicy.EgressClusterConfig{
Name: fmt.Sprintf("%d", portSpec.Number),
Port: portSpec.Number,
Name: fmt.Sprintf("%d", portSpec.Number),
Port: portSpec.Number,
UpstreamTrafficSetting: upstreamTrafficSetting,
})

// Configure port + IP range TrafficMatches
Expand All @@ -65,8 +81,9 @@ func (mc *MeshCatalog) GetEgressTrafficPolicy(serviceIdentity identity.ServiceId
// Build the HTTPS cluster config for this port
// HTTPS is TLS encrypted, so will be proxied as a TCP stream
clusterConfigs = append(clusterConfigs, &trafficpolicy.EgressClusterConfig{
Name: fmt.Sprintf("%d", portSpec.Number),
Port: portSpec.Number,
Name: fmt.Sprintf("%d", portSpec.Number),
Port: portSpec.Number,
UpstreamTrafficSetting: upstreamTrafficSetting,
})

// Configure port + IP range TrafficMatches
Expand Down Expand Up @@ -105,7 +122,34 @@ func (mc *MeshCatalog) GetEgressTrafficPolicy(serviceIdentity identity.ServiceId
}, nil
}

func (mc *MeshCatalog) buildHTTPRouteConfigs(egressPolicy *policyV1alpha1.Egress, port int) ([]*trafficpolicy.EgressHTTPRouteConfig, []*trafficpolicy.EgressClusterConfig) {
func (mc *MeshCatalog) getUpstreamTrafficSettingForEgress(egressPolicy *policyv1alpha1.Egress) (*policyv1alpha1.UpstreamTrafficSetting, error) {
if egressPolicy == nil {
return nil, nil
}

for _, match := range egressPolicy.Spec.Matches {
if match.APIGroup != nil && *match.APIGroup == policyv1alpha1.SchemeGroupVersion.String() && match.Kind == upstreamTrafficSettingKind {
namespacedName := types.NamespacedName{
Namespace: egressPolicy.Namespace,
Name: match.Name,
}
upstreamtrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{NamespacedName: &namespacedName})

if upstreamtrafficSetting == nil {
return nil, errors.Errorf("UpstreamTrafficSetting %s specified in Egress policy %s/%s could not be found, ignoring it",
namespacedName.String(), egressPolicy.Namespace, egressPolicy.Name)
}

return upstreamtrafficSetting, nil
}
}

return nil, nil
}

func (mc *MeshCatalog) buildHTTPRouteConfigs(egressPolicy *policyv1alpha1.Egress, port int,
upstreamTrafficSetting *policyv1alpha1.UpstreamTrafficSetting) ([]*trafficpolicy.EgressHTTPRouteConfig, []*trafficpolicy.EgressClusterConfig) {
if egressPolicy == nil {
return nil, nil
}
Expand Down Expand Up @@ -170,9 +214,10 @@ func (mc *MeshCatalog) buildHTTPRouteConfigs(egressPolicy *policyV1alpha1.Egress
// Create cluster config for this host and port combination
clusterName := hostnameWithPort
clusterConfig := &trafficpolicy.EgressClusterConfig{
Name: clusterName,
Host: host,
Port: port,
Name: clusterName,
Host: host,
Port: port,
UpstreamTrafficSetting: upstreamTrafficSetting,
}
clusterConfigs = append(clusterConfigs, clusterConfig)

Expand Down
185 changes: 178 additions & 7 deletions pkg/catalog/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ func TestGetEgressTrafficPolicy(t *testing.T) {

defer mockCtrl.Finish()

upstreamTrafficSetting := &policyv1alpha1.UpstreamTrafficSetting{
ObjectMeta: metav1.ObjectMeta{
Name: "u1",
Namespace: "ns1",
},
}

testCases := []struct {
name string
egressPolicies []*policyv1alpha1.Egress
egressPort int
httpRouteGroups []*specs.HTTPRouteGroup
expectedEgressPolicy *trafficpolicy.EgressTrafficPolicy
expectError bool
name string
egressPolicies []*policyv1alpha1.Egress
egressPort int
httpRouteGroups []*specs.HTTPRouteGroup
upstreamTrafficSetting *policyv1alpha1.UpstreamTrafficSetting
expectedEgressPolicy *trafficpolicy.EgressTrafficPolicy
expectError bool
}{
{
name: "multiple egress policies for HTTP ports",
Expand Down Expand Up @@ -335,6 +343,88 @@ func TestGetEgressTrafficPolicy(t *testing.T) {
},
expectError: false,
},
{
name: "policy with valid UpstreamTrafficSetting match is processed",
egressPolicies: []*policyv1alpha1.Egress{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: upstreamTrafficSetting.Namespace,
},
Spec: policyv1alpha1.EgressSpec{
Hosts: []string{
"foo.com",
},
Ports: []policyv1alpha1.PortSpec{
{
Number: 100,
Protocol: "https",
},
},
Matches: []corev1.TypedLocalObjectReference{
{
APIGroup: pointer.StringPtr("policy.openservicemesh.io/v1alpha1"),
Kind: "UpstreamTrafficSetting",
Name: upstreamTrafficSetting.Name,
},
},
},
},
},
httpRouteGroups: nil, // no SMI HTTP route matches
upstreamTrafficSetting: upstreamTrafficSetting,
expectedEgressPolicy: &trafficpolicy.EgressTrafficPolicy{
TrafficMatches: []*trafficpolicy.TrafficMatch{
{
DestinationPort: 100,
DestinationProtocol: "https",
ServerNames: []string{"foo.com"},
Cluster: "100",
},
},
HTTPRouteConfigsPerPort: map[int][]*trafficpolicy.EgressHTTPRouteConfig{},
ClustersConfigs: []*trafficpolicy.EgressClusterConfig{
{
// Same cluster used for both HTTPS and TCP on port 100
Name: "100",
Port: 100,
UpstreamTrafficSetting: upstreamTrafficSetting,
},
},
},
expectError: false,
},
{
name: "policy with invalid UpstreamTrafficSetting match is ignored",
egressPolicies: []*policyv1alpha1.Egress{
{
Spec: policyv1alpha1.EgressSpec{
Hosts: []string{
"foo.com",
},
Ports: []policyv1alpha1.PortSpec{
{
Number: 100,
Protocol: "https",
},
},
Matches: []corev1.TypedLocalObjectReference{
{
APIGroup: pointer.StringPtr("policy.openservicemesh.io/v1alpha1"),
Kind: "UpstreamTrafficSetting",
Name: "invalid",
},
},
},
},
},
httpRouteGroups: nil, // no SMI HTTP route matches
expectedEgressPolicy: &trafficpolicy.EgressTrafficPolicy{
TrafficMatches: nil,
HTTPRouteConfigsPerPort: map[int][]*trafficpolicy.EgressHTTPRouteConfig{},
ClustersConfigs: nil,
},
expectError: false,
},
}

testSourceIdentity := identity.ServiceIdentity("foo.bar.cluster.local")
Expand All @@ -348,6 +438,7 @@ func TestGetEgressTrafficPolicy(t *testing.T) {
mockMeshSpec.EXPECT().GetHTTPRouteGroup(fmt.Sprintf("%s/%s", rg.Namespace, rg.Name)).Return(rg).AnyTimes()
}
mockPolicyController.EXPECT().ListEgressPoliciesForSourceIdentity(gomock.Any()).Return(tc.egressPolicies).Times(1)
mockPolicyController.EXPECT().GetUpstreamTrafficSetting(gomock.Any()).Return(tc.upstreamTrafficSetting).AnyTimes()

mc := &MeshCatalog{
meshSpec: mockMeshSpec,
Expand All @@ -371,11 +462,19 @@ func TestBuildHTTPRouteConfigs(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

upstreamTrafficSetting := &policyv1alpha1.UpstreamTrafficSetting{
ObjectMeta: metav1.ObjectMeta{
Name: "u1",
Namespace: "ns1",
},
}

testCases := []struct {
name string
egressPolicy *policyv1alpha1.Egress
egressPort int
httpRouteGroups []*specs.HTTPRouteGroup
upstreamTrafficSetting *policyv1alpha1.UpstreamTrafficSetting
expectedRouteConfigs []*trafficpolicy.EgressHTTPRouteConfig
expectedClusterConfigs []*trafficpolicy.EgressClusterConfig
}{
Expand Down Expand Up @@ -649,6 +748,78 @@ func TestBuildHTTPRouteConfigs(t *testing.T) {
},
},
},
{
name: "egress policy with UpstreamTrafficSetting match specified",
egressPolicy: &policyv1alpha1.Egress{
Spec: policyv1alpha1.EgressSpec{
Hosts: []string{
"foo.com",
"bar.com",
},
Ports: []policyv1alpha1.PortSpec{
{
Number: 80,
Protocol: "http",
},
},
},
},
egressPort: 80,
httpRouteGroups: nil, // no matches specified in the egress policy via Spec.Matches
upstreamTrafficSetting: upstreamTrafficSetting,
expectedRouteConfigs: []*trafficpolicy.EgressHTTPRouteConfig{
{
Name: "foo.com",
Hostnames: []string{
"foo.com",
"foo.com:80",
},
RoutingRules: []*trafficpolicy.EgressHTTPRoutingRule{
{
Route: trafficpolicy.RouteWeightedClusters{
HTTPRouteMatch: trafficpolicy.WildCardRouteMatch,
WeightedClusters: mapset.NewSetFromSlice([]interface{}{
service.WeightedCluster{ClusterName: service.ClusterName("foo.com:80"), Weight: 100},
}),
},
AllowedDestinationIPRanges: nil,
},
},
},
{
Name: "bar.com",
Hostnames: []string{
"bar.com",
"bar.com:80",
},
RoutingRules: []*trafficpolicy.EgressHTTPRoutingRule{
{
Route: trafficpolicy.RouteWeightedClusters{
HTTPRouteMatch: trafficpolicy.WildCardRouteMatch,
WeightedClusters: mapset.NewSetFromSlice([]interface{}{
service.WeightedCluster{ClusterName: service.ClusterName("bar.com:80"), Weight: 100},
}),
},
AllowedDestinationIPRanges: nil,
},
},
},
},
expectedClusterConfigs: []*trafficpolicy.EgressClusterConfig{
{
Name: "foo.com:80",
Host: "foo.com",
Port: 80,
UpstreamTrafficSetting: upstreamTrafficSetting,
},
{
Name: "bar.com:80",
Host: "bar.com",
Port: 80,
UpstreamTrafficSetting: upstreamTrafficSetting,
},
},
},
}

for i, tc := range testCases {
Expand All @@ -663,7 +834,7 @@ func TestBuildHTTPRouteConfigs(t *testing.T) {
meshSpec: mockMeshSpec,
}

routeConfigs, clusterConfigs := mc.buildHTTPRouteConfigs(tc.egressPolicy, tc.egressPort)
routeConfigs, clusterConfigs := mc.buildHTTPRouteConfigs(tc.egressPolicy, tc.egressPort, tc.upstreamTrafficSetting)
assert.ElementsMatch(tc.expectedRouteConfigs, routeConfigs)
assert.ElementsMatch(tc.expectedClusterConfigs, clusterConfigs)
})
Expand Down
Loading

0 comments on commit 7ecd8e9

Please sign in to comment.