Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
ref(tcp): support tcp-server-first inside the mesh
Browse files Browse the repository at this point in the history
This change adds the capability for OSM to handle ports with
`appProtocol: tcp-server-first` on services within the mesh.
Specifically, the main change is to add handling for `tcp-server-first`
ports in the mesh when we calculate the predicate function used to
disable the TLS and HTTP inspector listener filters. Previously, this
was only done for `tcp-server-first` ports specified in an Egress
policy resource.

Fixes #4313

Signed-off-by: Jon Huhn <[email protected]>
  • Loading branch information
nojnhuh committed Nov 3, 2021
1 parent 2ce701c commit f6c14d6
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 67 deletions.
2 changes: 1 addition & 1 deletion demo/deploy-mysql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ spec:
- port: 3306
targetPort: 3306
name: client
appProtocol: tcp
appProtocol: tcp-server-first
selector:
app: mysql
clusterIP: None
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/inbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser

// Build the HTTP route configs for this service and port combination.
// If the port's protocol corresponds to TCP, we can skip this step
if upstreamSvc.Protocol == constants.ProtocolTCP {
if upstreamSvc.Protocol == constants.ProtocolTCP || upstreamSvc.Protocol == constants.ProtocolTCPServerFirst {
continue
}
// ---
Expand Down
15 changes: 14 additions & 1 deletion pkg/catalog/inbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
},
{
name: "multiple services with different protocol, SMI mode, 1 TrafficTarget, 1 HTTPRouteGroup, 0 TrafficSplit",
// Port ns1/s2:90 uses TCP, so HTTP route configs for it should not be built
// Ports ns1/s2:90 and ns1/s3:91 use TCP, so HTTP route configs for them should not be built
upstreamIdentity: upstreamSvcAccount.ToServiceIdentity(),
upstreamServices: []service.MeshService{
{
Expand All @@ -895,6 +895,13 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
TargetPort: 90,
Protocol: "tcp",
},
{
Name: "s3",
Namespace: "ns1",
Port: 91,
TargetPort: 91,
Protocol: "tcp-server-first",
},
},
permissiveMode: false,
trafficTargets: []*access.TrafficTarget{
Expand Down Expand Up @@ -1009,6 +1016,12 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
Address: "127.0.0.1",
Port: 90,
},
{
Name: "ns1/s3|91|local",
Service: service.MeshService{Namespace: "ns1", Name: "s3", Port: 91, TargetPort: 91, Protocol: "tcp-server-first"},
Address: "127.0.0.1",
Port: 91,
},
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/outbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.

// Build the HTTP route configs for this service and port combination.
// If the port's protocol corresponds to TCP, we can skip this step
if meshSvc.Protocol == constants.ProtocolTCP {
if meshSvc.Protocol == constants.ProtocolTCP || meshSvc.Protocol == constants.ProtocolTCPServerFirst {
continue
}
// Create a route to access the upstream service via it's hostnames and upstream weighted clusters
Expand Down
30 changes: 28 additions & 2 deletions pkg/catalog/outbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ func TestGetOutboundMeshTrafficPolicy(t *testing.T) {
meshSvc3V2 := service.MeshService{Name: "s3-v2", Namespace: "ns3", Port: 8080, TargetPort: 80, Protocol: "http"}
// MeshService for k8s service ns3/s4 with 1 port
meshSvc4 := service.MeshService{Name: "s4", Namespace: "ns3", Port: 9090, TargetPort: 90, Protocol: "tcp"}
// MeshService for k8s service ns3/s5 with 1 port
meshSvc5 := service.MeshService{Name: "s5", Namespace: "ns3", Port: 9091, TargetPort: 91, Protocol: "tcp-server-first"}

allMeshServices := []service.MeshService{meshSvc1P1, meshSvc1P2, meshSvc2, meshSvc3, meshSvc3V1, meshSvc3V2, meshSvc4}
allMeshServices := []service.MeshService{meshSvc1P1, meshSvc1P2, meshSvc2, meshSvc3, meshSvc3V1, meshSvc3V2, meshSvc4, meshSvc5}

svcToEndpointsMap := map[string][]endpoint.Endpoint{
meshSvc1P1.String(): {
Expand All @@ -66,12 +68,15 @@ func TestGetOutboundMeshTrafficPolicy(t *testing.T) {
meshSvc4.String(): {
{IP: net.ParseIP("10.0.4.1")},
},
meshSvc5.String(): {
{IP: net.ParseIP("10.0.5.1")},
},
}

svcIdentityToSvcMapping := map[string][]service.MeshService{
"sa1.ns1.cluster.local": {meshSvc1P1, meshSvc1P2},
"sa2.ns2.cluster.local": {meshSvc2}, // Client `downstreamIdentity` cannot access this upstream
"sa3.ns3.cluster.local": {meshSvc3, meshSvc3V1, meshSvc3V2, meshSvc4},
"sa3.ns3.cluster.local": {meshSvc3, meshSvc3V1, meshSvc3V2, meshSvc4, meshSvc5},
}

downstreamIdentity := identity.ServiceIdentity("sa-x.ns1.cluster.local")
Expand Down Expand Up @@ -241,6 +246,19 @@ func TestGetOutboundMeshTrafficPolicy(t *testing.T) {
},
},
},
{
// To match ns3/s5 on port 9091
Name: "ns3/s5_9091_tcp-server-first",
DestinationPort: 9091,
DestinationProtocol: "tcp-server-first",
DestinationIPRanges: []string{"10.0.5.1/32"},
WeightedClusters: []service.WeightedCluster{
{
ClusterName: "ns3/s5|91",
Weight: 100,
},
},
},
},
ClustersConfigs: []*trafficpolicy.MeshClusterConfig{
{
Expand All @@ -267,6 +285,10 @@ func TestGetOutboundMeshTrafficPolicy(t *testing.T) {
Name: "ns3/s4|90",
Service: meshSvc4,
},
{
Name: "ns3/s5|91",
Service: meshSvc5,
},
},
HTTPRouteConfigsPerPort: map[int][]*trafficpolicy.OutboundTrafficPolicy{
8080: {
Expand Down Expand Up @@ -523,6 +545,10 @@ func TestGetOutboundMeshTrafficPolicy(t *testing.T) {
Name: "ns3/s4|90",
Service: meshSvc4,
},
{
Name: "ns3/s5|91",
Service: meshSvc5,
},
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/envoy/lds/inmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (lb *listenerBuilder) getInboundMeshFilterChains(proxyService service.MeshS
}
filterChains = append(filterChains, filterChainForPort)

case constants.ProtocolTCP:
case constants.ProtocolTCP, constants.ProtocolTCPServerFirst:
filterChainForPort, err := lb.getInboundMeshTCPFilterChain(proxyService, uint32(proxyService.TargetPort))
if err != nil {
log.Error().Err(err).Msgf("Error building inbound TCP filter chain for proxy:port %s:%d", proxyService, proxyService.TargetPort)
Expand Down Expand Up @@ -407,7 +407,7 @@ func (lb *listenerBuilder) getOutboundFilterChainPerUpstream() []*xds_listener.F
filterChains = append(filterChains, httpFilterChain)
}

case constants.ProtocolTCP:
case constants.ProtocolTCP, constants.ProtocolTCPServerFirst:
// Construct TCP filter chain
if tcpFilterChain, err := lb.getOutboundTCPFilterChainForService(*trafficMatch); err != nil {
log.Error().Err(err).Msgf("Error constructing outbound TCP filter chain for traffic match %s on proxy with identity %s", trafficMatch.Name, lb.serviceIdentity)
Expand Down
5 changes: 4 additions & 1 deletion pkg/envoy/lds/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ func (lb *listenerBuilder) newOutboundListener() (*xds_listener.Listener, error)
}

if featureflags := lb.cfg.GetFeatureFlags(); featureflags.EnableEgressPolicy {
var trafficMatches []*trafficpolicy.TrafficMatch
var filterDisableMatchPredicate *xds_listener.ListenerFilterChainMatchPredicate
// Create filter chains for egress based on policies
if egressTrafficPolicy, err := lb.meshCatalog.GetEgressTrafficPolicy(lb.serviceIdentity); err != nil {
log.Error().Err(err).Msgf("Error retrieving egress policies for proxy with identity %s, skipping egress filters", lb.serviceIdentity)
} else if egressTrafficPolicy != nil {
egressFilterChains := lb.getEgressFilterChainsForMatches(egressTrafficPolicy.TrafficMatches)
listener.FilterChains = append(listener.FilterChains, egressFilterChains...)
filterDisableMatchPredicate = getFilterMatchPredicateForTrafficMatches(egressTrafficPolicy.TrafficMatches)
trafficMatches = append(trafficMatches, egressTrafficPolicy.TrafficMatches...)
}
trafficMatches = append(trafficMatches, lb.meshCatalog.GetOutboundMeshTrafficPolicy(lb.serviceIdentity).TrafficMatches...)
filterDisableMatchPredicate = getFilterMatchPredicateForTrafficMatches(trafficMatches)
additionalListenerFilters := []*xds_listener.ListenerFilter{
// Configure match predicate for ports serving server-first protocols (ex. mySQL, postgreSQL etc.).
// Ports corresponding to server-first protocols, where the server initiates the first byte of a connection, will
Expand Down
47 changes: 47 additions & 0 deletions pkg/envoy/lds/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ import (
. "github.com/onsi/gomega"
tassert "github.com/stretchr/testify/assert"

configv1alpha1 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha1"
"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
)

Expand Down Expand Up @@ -166,3 +170,46 @@ func TestGetFilterMatchPredicateForTrafficMatches(t *testing.T) {
})
}
}

func TestNewOutboundListener(t *testing.T) {
mockCtrl := gomock.NewController(t)
identity := identity.K8sServiceAccount{}.ToServiceIdentity()
meshCatalog := catalog.NewMockMeshCataloger(mockCtrl)
meshCatalog.EXPECT().GetEgressTrafficPolicy(gomock.Any()).Return(nil, nil).Times(1)
meshCatalog.EXPECT().GetOutboundMeshTrafficPolicy(identity).Return(&trafficpolicy.OutboundMeshTrafficPolicy{
TrafficMatches: []*trafficpolicy.TrafficMatch{
{
WeightedClusters: []service.WeightedCluster{{}},
DestinationIPRanges: []string{
"0.0.0.0/0",
},
DestinationPort: 1,
DestinationProtocol: constants.ProtocolTCPServerFirst,
},
},
}).Times(2)
cfg := configurator.NewMockConfigurator(mockCtrl)
cfg.EXPECT().IsEgressEnabled().Return(false).Times(1)
cfg.EXPECT().GetFeatureFlags().Return(configv1alpha1.FeatureFlags{
EnableEgressPolicy: true,
}).Times(1)

lb := newListenerBuilder(meshCatalog, identity, cfg, nil)

assert := tassert.New(t)
listener, err := lb.newOutboundListener()
assert.NoError(err)

assert.Len(listener.ListenerFilters, 3) // OriginalDst, TlsInspector, HttpInspector
assert.Equal(wellknown.TlsInspector, listener.ListenerFilters[1].Name)
assert.Equal(&xds_listener.ListenerFilterChainMatchPredicate{
Rule: &xds_listener.ListenerFilterChainMatchPredicate_DestinationPortRange{
DestinationPortRange: &xds_type.Int32Range{
Start: 1,
End: 2,
},
},
}, listener.ListenerFilters[1].FilterDisabled)
assert.Equal(wellknown.HttpInspector, listener.ListenerFilters[2].Name)
assert.Equal(listener.ListenerFilters[1].FilterDisabled, listener.ListenerFilters[2].FilterDisabled)
}
20 changes: 0 additions & 20 deletions pkg/k8s/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/service"
)

Expand Down Expand Up @@ -49,25 +48,6 @@ func GetServiceFromHostname(host string) string {
return strings.Split(service, ":")[0]
}

// GetAppProtocolFromPortName returns the port's application protocol from its name, defaults to 'http' if not specified.
func GetAppProtocolFromPortName(portName string) string {
portName = strings.ToLower(portName)

switch {
case strings.HasPrefix(portName, "http-"):
return "http"

case strings.HasPrefix(portName, "tcp-"):
return "tcp"

case strings.HasPrefix(portName, "grpc-"):
return "grpc"

default:
return constants.ProtocolHTTP
}
}

// GetKubernetesServerVersionNumber returns the Kubernetes server version number in chunks, ex. v1.19.3 => [1, 19, 3]
func GetKubernetesServerVersionNumber(kubeClient kubernetes.Interface) ([]int, error) {
if kubeClient == nil {
Expand Down
38 changes: 0 additions & 38 deletions pkg/k8s/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,44 +103,6 @@ func TestGetServiceFromHostname(t *testing.T) {
}
}

func TestGetAppProtocolFromPortName(t *testing.T) {
testCases := []struct {
name string
portName string
expectedProtocal string
}{
{
name: "tcp protocol",
portName: "tcp-port-test",
expectedProtocal: "tcp",
},
{
name: "http protocol",
portName: "http-port-test",
expectedProtocal: "http",
},
{
name: "grpc protocol",
portName: "grpc-port-test",
expectedProtocal: "grpc",
},
{
name: "default protocol",
portName: "port-test",
expectedProtocal: "http",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert := tassert.New(t)

actual := GetAppProtocolFromPortName(tc.portName)
assert.Equal(tc.expectedProtocal, actual)
})
}
}

func TestGetKubernetesServerVersionNumber(t *testing.T) {
testCases := []struct {
name string
Expand Down
85 changes: 85 additions & 0 deletions tests/e2e/e2e_tcp_server_first_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package e2e

import (
"strconv"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/tests/framework"
. "github.com/openservicemesh/osm/tests/framework"
)

var _ = OSMDescribe("TCP server-first traffic",
OSMDescribeInfo{
Tier: 1,
Bucket: 1,
},
func() {
var (
sourceNs = framework.RandomNameWithPrefix("client")
destNs = framework.RandomNameWithPrefix("server")
ns = []string{sourceNs, destNs}
)

It("TCP server-first traffic", func() {
// Install OSM
installOpts := Td.GetOSMInstallOpts()
installOpts.EnablePermissiveMode = true
Expect(Td.InstallOSM(installOpts)).To(Succeed())

// Create Test NS
for _, n := range ns {
Expect(Td.CreateNs(n, nil)).To(Succeed())
Expect(Td.AddNsToMesh(true, n)).To(Succeed())
}

destinationPort := 80

// Get simple pod definitions for the TCP server
svcAccDef, podDef, svcDef, err := Td.SimplePodApp(
SimplePodAppDef{
PodName: framework.RandomNameWithPrefix("server"),
Namespace: destNs,
Image: "busybox",
Command: []string{"nc", "-lkp", strconv.Itoa(destinationPort), "-e", "sh", "-c", "while yes; do :; done"},
Ports: []int{destinationPort},
AppProtocol: constants.ProtocolTCPServerFirst,
OS: Td.ClusterOS,
},
)

Expect(err).NotTo(HaveOccurred())

_, err = Td.CreateServiceAccount(destNs, &svcAccDef)
Expect(err).NotTo(HaveOccurred())
_, err = Td.CreatePod(destNs, podDef)
Expect(err).NotTo(HaveOccurred())
dstSvc, err := Td.CreateService(destNs, svcDef)
Expect(err).NotTo(HaveOccurred())

// Expect it to be up and running in it's receiver namespace
Expect(Td.WaitForPodsRunningReady(destNs, 120*time.Second, 1, nil)).To(Succeed())

svcAccDef, podDef, _, err = Td.SimplePodApp(SimplePodAppDef{
PodName: framework.RandomNameWithPrefix("client"),
Namespace: sourceNs,
Command: []string{"nc", dstSvc.Name + "." + dstSvc.Namespace, strconv.Itoa(destinationPort)},
Image: "busybox",
OS: Td.ClusterOS,
})
Expect(err).NotTo(HaveOccurred())
_, err = Td.CreateServiceAccount(sourceNs, &svcAccDef)
Expect(err).NotTo(HaveOccurred())
_, err = Td.CreatePod(sourceNs, podDef)
Expect(err).NotTo(HaveOccurred())

Expect(Td.WaitForPodsRunningReady(sourceNs, 120*time.Second, 1, nil)).To(Succeed())

Eventually(func() (string, error) {
return getPodLogs(sourceNs, podDef.Name, podDef.Name)
}, 5*time.Second).Should(ContainSubstring("\ny\n"), "Didn't get expected response from server")
})
})

0 comments on commit f6c14d6

Please sign in to comment.