diff --git a/config/samples/tcproute/server.yaml b/config/samples/tcproute/server.yaml index cc0942f2..cebc923b 100644 --- a/config/samples/tcproute/server.yaml +++ b/config/samples/tcproute/server.yaml @@ -16,11 +16,9 @@ spec: spec: containers: - name: server - image: ghcr.io/shaneutt/malutki + image: istio/tcp-echo-server:1.1 imagePullPolicy: IfNotPresent - env: - - name: LISTEN_PORT - value: "8080" + args: [ "8080", "blixt-tcproute-sample:" ] ports: - containerPort: 8080 protocol: TCP diff --git a/config/tests/tcproute-rr/kustomization.yaml b/config/tests/tcproute-rr/kustomization.yaml new file mode 100644 index 00000000..30ff7751 --- /dev/null +++ b/config/tests/tcproute-rr/kustomization.yaml @@ -0,0 +1,10 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +images: +- name: ghcr.io/kong/blixt-udp-test-server + newTag: integration-tests +resources: +- ../../samples/tcproute +- server.yaml +patches: +- path: patch.yaml diff --git a/config/tests/tcproute-rr/patch.yaml b/config/tests/tcproute-rr/patch.yaml new file mode 100644 index 00000000..31b102e7 --- /dev/null +++ b/config/tests/tcproute-rr/patch.yaml @@ -0,0 +1,18 @@ +apiVersion: gateway.networking.k8s.io/v1alpha2 +kind: TCPRoute +metadata: + name: blixt-tcproute-sample +spec: + parentRefs: + - name: blixt-tcproute-sample + port: 8080 + rules: + - backendRefs: + - name: blixt-tcproute-sample + port: 8080 + - backendRefs: + - name: tcproute-rr-v1 + port: 8080 + - backendRefs: + - name: tcproute-rr-v2 + port: 8080 diff --git a/config/tests/tcproute-rr/server.yaml b/config/tests/tcproute-rr/server.yaml new file mode 100644 index 00000000..5c323dd5 --- /dev/null +++ b/config/tests/tcproute-rr/server.yaml @@ -0,0 +1,76 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tcproute-rr-v1 + labels: + app: tcproute-rr-v1 +spec: + selector: + matchLabels: + app: tcproute-rr-v1 + template: + metadata: + labels: + app: tcproute-rr-v1 + spec: + containers: + - name: tcp-echo + image: istio/tcp-echo-server:1.1 + imagePullPolicy: IfNotPresent + args: [ "8080", "tcproute-rr-v1:" ] + ports: + - containerPort: 8080 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tcproute-rr-v2 + labels: + app: tcproute-rr-v2 +spec: + selector: + matchLabels: + app: tcproute-rr-v2 + template: + metadata: + labels: + app: tcproute-rr-v2 + spec: + containers: + - name: tcp-echo + image: istio/tcp-echo-server:1.1 + imagePullPolicy: IfNotPresent + args: [ "8080", "tcproute-rr-v2:" ] + ports: + - containerPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: tcproute-rr-v1 + name: tcproute-rr-v1 +spec: + ports: + - name: tcp + port: 8080 + protocol: TCP + selector: + app: tcproute-rr-v1 + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: tcproute-rr-v2 + name: tcproute-rr-v2 +spec: + ports: + - name: tcp + port: 8080 + protocol: TCP + selector: + app: tcproute-rr-v2 + type: ClusterIP diff --git a/test/integration/tcproute_test.go b/test/integration/tcproute_test.go index 0ac88143..a59ee722 100644 --- a/test/integration/tcproute_test.go +++ b/test/integration/tcproute_test.go @@ -20,9 +20,10 @@ limitations under the License. package integration import ( + "bufio" "context" "fmt" - "net/http" + "net" "strings" "testing" "time" @@ -38,9 +39,12 @@ import ( const ( tcprouteSampleKustomize = "../../config/tests/tcproute" + tcprouteRRKustomize = "../../config/tests/tcproute-rr" tcprouteSampleName = "blixt-tcproute-sample" ) +var tcpServerNames = []string{"blixt-tcproute-sample", "tcproute-rr-v1", "tcproute-rr-v2"} + func TestTCPRouteBasics(t *testing.T) { tcpRouteBasicsCleanupKey := "tcproutebasics" defer func() { @@ -69,38 +73,156 @@ func TestTCPRouteBasics(t *testing.T) { require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type) gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value) - t.Log("waiting for HTTP server to be available") + t.Log("waiting for TCP server to be available") require.Eventually(t, func() bool { server, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{}) require.NoError(t, err) return server.Status.AvailableReplicas > 0 }, time.Minute, time.Second) - t.Log("verifying HTTP connectivity to the server") - httpc := http.Client{Timeout: time.Second * 30} + t.Log("verifying TCP connectivity to the server") + var conn net.Conn require.Eventually(t, func() bool { - resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot)) + var err error + conn, err = net.Dial("tcp", gwaddr) if err != nil { - t.Logf("received error checking HTTP server: [%s], retrying...", err) + t.Logf("received error connecting to TCP server: [%s], retrying...", err) return false } - defer resp.Body.Close() - return resp.StatusCode == http.StatusTeapot + return true }, time.Minute*5, time.Second) - t.Log("deleting the TCPRoute and verifying that HTTP traffic stops") + writeAndReadTCP(t, conn, tcpServerNames[0]) + + t.Log("deleting the TCPRoute and verifying that TCP connection is closed") require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{})) - httpc = http.Client{Timeout: time.Second * 3} require.Eventually(t, func() bool { - resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot)) + _, err := conn.Write([]byte("blahhh\n")) + require.NoError(t, err) + + err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)) + require.NoError(t, err) + reader := bufio.NewReader(conn) + _, err = reader.ReadBytes(byte('\n')) if err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { + if strings.Contains(err.Error(), "i/o timeout") { return true } t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err) return false } - defer resp.Body.Close() return false }, time.Minute, time.Second) } + +func TestTCPRouteRoundRobin(t *testing.T) { + tcpRouteRRCleanupKey := "tcprouterr" + defer func() { + testutils.DumpDiagnosticsIfFailed(ctx, t, env.Cluster()) + if err := runCleanup(tcpRouteRRCleanupKey); err != nil { + t.Errorf("cleanup failed: %s", err) + } + }() + + t.Log("deploying config/samples/tcproute-rr kustomize") + require.NoError(t, clusters.KustomizeDeployForCluster(ctx, env.Cluster(), tcprouteRRKustomize)) + addCleanup(tcpRouteRRCleanupKey, func(ctx context.Context) error { + cleanupLog("cleaning up config/samples/tcproute-rr kustomize") + return clusters.KustomizeDeleteForCluster(ctx, env.Cluster(), tcprouteRRKustomize, "--ignore-not-found=true") + }) + + t.Log("waiting for Gateway to have an address") + var gw *gatewayv1beta1.Gateway + require.Eventually(t, func() bool { + var err error + gw, err = gwclient.GatewayV1beta1().Gateways(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{}) + require.NoError(t, err) + return len(gw.Status.Addresses) > 0 + }, time.Minute, time.Second) + require.NotNil(t, gw.Status.Addresses[0].Type) + require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type) + gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value) + + t.Log("waiting for TCP servers to be available") + labelSelector := metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: metav1.LabelSelectorOpIn, + Values: tcpServerNames, + }, + }, + } + require.Eventually(t, func() bool { + servers, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&labelSelector), + }) + require.NoError(t, err) + for _, server := range servers.Items { + if server.Status.AvailableReplicas <= 0 { + return false + } + } + return true + }, time.Minute, time.Second) + + t.Log("verifying TCP connectivity to the servers") + var conn1 net.Conn + require.Eventually(t, func() bool { + var err error + conn1, err = net.Dial("tcp", gwaddr) + if err != nil { + t.Logf("received error connecting to TCP server: [%s], retrying...", err) + return false + } + return true + }, time.Minute*5, time.Second) + conn2, err := net.Dial("tcp", gwaddr) + require.NoError(t, err) + conn3, err := net.Dial("tcp", gwaddr) + require.NoError(t, err) + conns := []net.Conn{conn1, conn2, conn3} + + for c := 0; c < 2; c++ { + for i, conn := range conns { + writeAndReadTCP(t, conn, tcpServerNames[i]) + } + } + + t.Log("deleting the TCPRoute and verifying that all TCP connections are closed") + require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{})) + require.Eventually(t, func() bool { + for _, conn := range conns { + _, err := conn.Write([]byte("blahhh\n")) + require.NoError(t, err) + err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)) + require.NoError(t, err) + + reader := bufio.NewReader(conn) + _, err = reader.ReadBytes(byte('\n')) + if err != nil { + if strings.Contains(err.Error(), "i/o timeout") { + continue + } + t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err) + } + return false + } + return true + }, time.Minute, time.Second) +} + +func writeAndReadTCP(t *testing.T, conn net.Conn, prefix string) { + t.Helper() + + t.Logf("writing data to TCP connection with server %s", prefix) + request := "wazzzaaaa" + _, err := conn.Write([]byte(request + "\n")) + require.NoError(t, err) + + t.Logf("reading data from TCP connection with server %s", prefix) + reader := bufio.NewReader(conn) + response, err := reader.ReadBytes(byte('\n')) + require.NoError(t, err) + require.Contains(t, string(response), fmt.Sprintf("%s: %s", prefix, string(request))) +}