Skip to content

Commit

Permalink
connectivity: Some package refactoring
Browse files Browse the repository at this point in the history
- add check & tests subpackages
- use shorter type names for each of the tests
  • Loading branch information
errordeveloper authored and tgraf committed Mar 12, 2021
1 parent 3e55a2a commit 08338b2
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 128 deletions.
54 changes: 18 additions & 36 deletions connectivity/check.go → connectivity/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package connectivity
package check

import (
"bytes"
Expand Down Expand Up @@ -42,17 +42,14 @@ import (
)

const (
clientDeploymentName = "client"
ClientDeploymentName = "client"

echoSameNodeDeploymentName = "echo-same-node"
echoOtherNodeDeploymentName = "echo-other-node"
kindEchoName = "echo"
kindClientName = "client"
)

func curlCommand(target string) []string {
return []string{"curl", "-sS", "--fail", "--connect-timeout", "5", "-o", "/dev/null", target}
}

var serviceLabels = map[string]string{
"kind": kindEchoName,
}
Expand Down Expand Up @@ -176,7 +173,7 @@ type k8sConnectivityImplementation interface {
type PodContext struct {
// K8sClient is the Kubernetes client of the cluster this pod is
// running in
k8sClient k8sConnectivityImplementation
K8sClient k8sConnectivityImplementation

// Pod is the Kubernetes Pod resource
Pod *corev1.Pod
Expand Down Expand Up @@ -356,7 +353,7 @@ func (t *TestRun) settleFlows(ctx context.Context) error {

// ValidateFlows retrieves the flow pods of the specified pod and validates
// that all filters find a match. On failure, t.Failure() is called.
func (t *TestRun) ValidateFlows(ctx context.Context, pod, podIP string, filter []FilterPair) {
func (t *TestRun) ValidateFlows(ctx context.Context, pod, podIP string, filterPairs []filters.Pair) {
hubbleClient := t.context.HubbleClient()
if hubbleClient == nil {
return
Expand Down Expand Up @@ -396,7 +393,7 @@ func (t *TestRun) ValidateFlows(ctx context.Context, pod, podIP string, filter [

var goodLog []string

for _, p := range filter {
for _, p := range filterPairs {
if flows.Contains(p.Filter) != p.Expect {
for _, g := range goodLog {
t.context.Log(g)
Expand Down Expand Up @@ -637,14 +634,8 @@ func (f *flowsSet) Contains(filter filters.FlowFilterImplementation) bool {
return false
}

type FilterPair struct {
Filter filters.FlowFilterImplementation
Msg string
Expect bool
}

func (k *K8sConnectivityCheck) Validate(pod string, f *flowsSet, filter []FilterPair) (success bool) {
for _, p := range filter {
func (k *K8sConnectivityCheck) Validate(pod string, f *flowsSet, filterPairs []filters.Pair) (success bool) {
for _, p := range filterPairs {
if f.Contains(p.Filter) != p.Expect {
k.Log("❌ %s in pod %s", p.Msg, pod)
success = false
Expand Down Expand Up @@ -751,7 +742,7 @@ func (k *K8sConnectivityCheck) deleteDeployments(ctx context.Context, client k8s
k.Log("🔥 [%s] Deleting connectivity check deployments...", client.ClusterName())
client.DeleteDeployment(ctx, k.params.TestNamespace, echoSameNodeDeploymentName, metav1.DeleteOptions{})
client.DeleteDeployment(ctx, k.params.TestNamespace, echoOtherNodeDeploymentName, metav1.DeleteOptions{})
client.DeleteDeployment(ctx, k.params.TestNamespace, clientDeploymentName, metav1.DeleteOptions{})
client.DeleteDeployment(ctx, k.params.TestNamespace, ClientDeploymentName, metav1.DeleteOptions{})
client.DeleteService(ctx, k.params.TestNamespace, echoSameNodeDeploymentName, metav1.DeleteOptions{})
client.DeleteService(ctx, k.params.TestNamespace, echoOtherNodeDeploymentName, metav1.DeleteOptions{})
client.DeleteNamespace(ctx, k.params.TestNamespace, metav1.DeleteOptions{})
Expand All @@ -769,7 +760,7 @@ func (k *K8sConnectivityCheck) deleteDeployments(ctx context.Context, client k8s
}

func (k *K8sConnectivityCheck) deploymentList() (srcList []string, dstList []string) {
srcList = []string{clientDeploymentName, echoSameNodeDeploymentName}
srcList = []string{ClientDeploymentName, echoSameNodeDeploymentName}

if k.params.MultiCluster != "" || !k.params.SingleNode {
dstList = append(dstList, echoOtherNodeDeploymentName)
Expand Down Expand Up @@ -897,7 +888,7 @@ func (k *K8sConnectivityCheck) deploy(ctx context.Context) error {
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{Key: "name", Operator: metav1.LabelSelectorOpIn, Values: []string{clientDeploymentName}},
{Key: "name", Operator: metav1.LabelSelectorOpIn, Values: []string{ClientDeploymentName}},
},
},
TopologyKey: "kubernetes.io/hostname",
Expand All @@ -914,10 +905,10 @@ func (k *K8sConnectivityCheck) deploy(ctx context.Context) error {
}

k.Log("✨ [%s] Deploying client service...", k.clients.src.ClusterName())
clientDeployment := newDeployment(deploymentParameters{Name: clientDeploymentName, Kind: kindClientName, Port: 8080, Image: "quay.io/cilium/alpine-curl:1.0", Command: []string{"/bin/ash", "-c", "sleep 10000000"}})
clientDeployment := newDeployment(deploymentParameters{Name: ClientDeploymentName, Kind: kindClientName, Port: 8080, Image: "quay.io/cilium/alpine-curl:1.0", Command: []string{"/bin/ash", "-c", "sleep 10000000"}})
_, err = k.clients.src.CreateDeployment(ctx, k.params.TestNamespace, clientDeployment, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("unable to create deployment %s: %s", clientDeploymentName, err)
return fmt.Errorf("unable to create deployment %s: %s", ClientDeploymentName, err)
}
}

Expand Down Expand Up @@ -947,7 +938,7 @@ func (k *K8sConnectivityCheck) deploy(ctx context.Context) error {
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{Key: "name", Operator: metav1.LabelSelectorOpIn, Values: []string{clientDeploymentName}},
{Key: "name", Operator: metav1.LabelSelectorOpIn, Values: []string{ClientDeploymentName}},
},
},
TopologyKey: "kubernetes.io/hostname",
Expand Down Expand Up @@ -1020,7 +1011,7 @@ func (k *K8sConnectivityCheck) waitForService(ctx context.Context, client k8sCon
}

retry:
if _, _, err := client.ExecInPodWithStderr(ctx, clientPod.Pod.Namespace, clientPod.Pod.Name, clientDeploymentName, []string{"nslookup", service}); err != nil {
if _, _, err := client.ExecInPodWithStderr(ctx, clientPod.Pod.Namespace, clientPod.Pod.Name, ClientDeploymentName, []string{"nslookup", service}); err != nil {
select {
case <-time.After(time.Second):
case <-ctx.Done():
Expand Down Expand Up @@ -1055,7 +1046,7 @@ func (k *K8sConnectivityCheck) validateDeployment(ctx context.Context) error {
}

k.clientPods[pod.Name] = PodContext{
k8sClient: k.client,
K8sClient: k.client,
Pod: pod.DeepCopy(),
}
}
Expand All @@ -1074,7 +1065,7 @@ func (k *K8sConnectivityCheck) validateDeployment(ctx context.Context) error {
}

k.echoPods[echoPod.Name] = PodContext{
k8sClient: client,
K8sClient: client,
Pod: echoPod.DeepCopy(),
}
}
Expand Down Expand Up @@ -1148,16 +1139,7 @@ func (k *K8sConnectivityCheck) Report(r TestResult) {
k.results[r.Name] = r
}

var tests = []ConnectivityTest{
&connectivityTestPodToPod{},
&connectivityTestPodToService{},
&connectivityTestPodToNodePort{},
&connectivityTestPodToLocalNodePort{},
&connectivityTestPodToWorld{},
&connectivityTestPodToHost{},
}

func (k *K8sConnectivityCheck) Run(ctx context.Context) error {
func (k *K8sConnectivityCheck) Run(ctx context.Context, tests ...ConnectivityTest) error {
c, err := k.initClients(ctx)
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions connectivity/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type FlowFilterImplementation interface {
String() string
}

type Pair struct {
Filter FlowFilterImplementation
Msg string
Expect bool
}

type andFilter struct {
filters []FlowFilterImplementation
}
Expand Down
33 changes: 33 additions & 0 deletions connectivity/suite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2020-2021 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connectivity

import (
"context"

"github.com/cilium/cilium-cli/connectivity/check"
"github.com/cilium/cilium-cli/connectivity/tests"
)

func Run(ctx context.Context, k *check.K8sConnectivityCheck) error {
return k.Run(ctx,
&tests.PodToPod{},
&tests.PodToService{},
&tests.PodToNodePort{},
&tests.PodToLocalNodePort{},
&tests.PodToWorld{},
&tests.PodToHost{},
)
}
19 changes: 19 additions & 0 deletions connectivity/tests/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2020-2021 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tests

func curlCommand(target string) []string {
return []string{"curl", "--silent", "--fail", "--show-error", "--connect-timeout", "5", "--output", "/dev/null", target}
}
15 changes: 8 additions & 7 deletions connectivity/test_host.go → connectivity/tests/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package connectivity
package tests

import (
"context"

"github.com/cilium/cilium-cli/connectivity/check"
"github.com/cilium/cilium-cli/connectivity/filters"
)

type connectivityTestPodToHost struct{}
type PodToHost struct{}

func (p *connectivityTestPodToHost) Name() string {
func (t *PodToHost) Name() string {
return "pod-to-host"
}

func (p *connectivityTestPodToHost) Run(ctx context.Context, c TestContext) {
func (t *PodToHost) Run(ctx context.Context, c check.TestContext) {
// Construct a map of all unique host IPs where pods are running on.
// This will include:
// - The local host
Expand All @@ -43,14 +44,14 @@ func (p *connectivityTestPodToHost) Run(ctx context.Context, c TestContext) {
for _, client := range c.ClientPods() {
for hostIP := range hostIPs {
cmd := []string{"ping", "-c", "3", hostIP}
run := NewTestRun(p.Name(), c, client, NetworkEndpointContext{Peer: hostIP})
run := check.NewTestRun(t.Name(), c, client, check.NetworkEndpointContext{Peer: hostIP})

_, err := client.k8sClient.ExecInPod(ctx, client.Pod.Namespace, client.Pod.Name, clientDeploymentName, cmd)
_, err := client.K8sClient.ExecInPod(ctx, client.Pod.Namespace, client.Pod.Name, check.ClientDeploymentName, cmd)
if err != nil {
run.Failure("ping command failed: %s", err)
}

run.ValidateFlows(ctx, client.Name(), client.Pod.Status.PodIP, []FilterPair{
run.ValidateFlows(ctx, client.Name(), client.Pod.Status.PodIP, []filters.Pair{
{Filter: filters.Drop(), Expect: false, Msg: "Found drop"},
{Filter: filters.And(filters.IP(client.Pod.Status.PodIP, hostIP), filters.ICMP(8)), Expect: true, Msg: "ICMP request"},
{Filter: filters.And(filters.IP(hostIP, client.Pod.Status.PodIP), filters.ICMP(0)), Expect: true, Msg: "ICMP response"},
Expand Down
17 changes: 9 additions & 8 deletions connectivity/test_pod.go → connectivity/tests/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package connectivity
package tests

import (
"context"

"github.com/cilium/cilium-cli/connectivity/check"
"github.com/cilium/cilium-cli/connectivity/filters"
)

type connectivityTestPodToPod struct{}
type PodToPod struct{}

func (p *connectivityTestPodToPod) Name() string {
func (t *PodToPod) Name() string {
return "pod-to-pod"
}

func (p *connectivityTestPodToPod) Run(ctx context.Context, c TestContext) {
func (t *PodToPod) Run(ctx context.Context, c check.TestContext) {
for _, client := range c.ClientPods() {
for _, echo := range c.EchoPods() {
run := NewTestRun(p.Name(), c, client, echo)
run := check.NewTestRun(t.Name(), c, client, echo)

_, err := client.k8sClient.ExecInPod(ctx, client.Pod.Namespace, client.Pod.Name, clientDeploymentName, curlCommand(echo.Pod.Status.PodIP+":8080"))
_, err := client.K8sClient.ExecInPod(ctx, client.Pod.Namespace, client.Pod.Name, check.ClientDeploymentName, curlCommand(echo.Pod.Status.PodIP+":8080"))
if err != nil {
run.Failure("curl connectivity check command failed: %s", err)
}
Expand All @@ -41,14 +42,14 @@ func (p *connectivityTestPodToPod) Run(ctx context.Context, c TestContext) {
tcpRequest := filters.TCP(0, 8080) // request to port 8080
tcpResponse := filters.TCP(8080, 0) // response from port 8080

run.ValidateFlows(ctx, client.Name(), client.Pod.Status.PodIP, []FilterPair{
run.ValidateFlows(ctx, client.Name(), client.Pod.Status.PodIP, []filters.Pair{
{Filter: filters.Drop(), Expect: false, Msg: "Drop"},
{Filter: filters.RST(), Expect: false, Msg: "RST"},
{Filter: filters.And(echoToClient, tcpResponse, filters.SYNACK()), Expect: true, Msg: "SYN-ACK"},
{Filter: filters.And(echoToClient, tcpResponse, filters.FIN()), Expect: true, Msg: "FIN-ACK"},
})

run.ValidateFlows(ctx, echo.Name(), echo.Pod.Status.PodIP, []FilterPair{
run.ValidateFlows(ctx, echo.Name(), echo.Pod.Status.PodIP, []filters.Pair{
{Filter: filters.Drop(), Expect: false, Msg: "Drop"},
{Filter: filters.RST(), Expect: false, Msg: "RST"},
{Filter: filters.And(clientToEcho, tcpRequest, filters.SYN()), Expect: true, Msg: "SYN"},
Expand Down
Loading

0 comments on commit 08338b2

Please sign in to comment.