Skip to content

Commit

Permalink
connectivity: extend encryption tests to validate both directions
Browse files Browse the repository at this point in the history
Currently, the encryption tests validate that no unencrypted packets are
leaked by capturing them on the source only. Since we can capture only
egressing packets (as the XFRM stack recirculates the ingressing ones
during decryption), this means that we are checking only one direction.

Let's extend this check to validate both directions, starting a tcpdump
capture both on the source and destination hosts. We leverage the
bidirectional validation for the pod-to-pod encryption case though,
as with the node-to-node one it is particularly tricky to construct
the correct filter without additional information (as packets might
be masquerated at the source, and in that case they should be sniffed
on a different interface).

Signed-off-by: Marco Iorio <[email protected]>
  • Loading branch information
giorio94 committed Oct 16, 2023
1 parent 0fe965c commit 33b2591
Showing 1 changed file with 132 additions and 74 deletions.
206 changes: 132 additions & 74 deletions connectivity/tests/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,124 +130,182 @@ func (s *podToPodEncryption) Run(ctx context.Context, t *check.Test) {
// clientHost is a pod running on the same node as the client pod, just in
// the host netns.
clientHost := ct.HostNetNSPodsByNode()[client.Pod.Spec.NodeName]
// serverHost is a pod running in a remote node's host netns.
serverHost := t.Context().HostNetNSPodsByNode()[server.Pod.Spec.NodeName]
assertNoLeaks, _ := ct.Features.MatchRequirements(s.reqs...)

if !assertNoLeaks {
t.Debugf("%s test running in sanity mode, expecting unencrypted packets", s.Name())
}

t.ForEachIPFamily(func(ipFam features.IPFamily) {
testNoTrafficLeak(ctx, t, s, client, &server, &clientHost, requestHTTP, ipFam, assertNoLeaks)
testNoTrafficLeak(ctx, t, s, client, &server, &clientHost, &serverHost, requestHTTP, ipFam, assertNoLeaks)
})
}

func testNoTrafficLeak(ctx context.Context, t *check.Test, s check.Scenario,
client, server, clientHost *check.Pod, reqType requestType, ipFam features.IPFamily,
assertNoLeaks bool,
) {
dstAddr := server.Address(ipFam)
iface := getInterNodeIface(ctx, t, clientHost, ipFam, client.Address(ipFam), dstAddr)
srcFilter := getSourceAddressFilter(ctx, t, client, clientHost, ipFam, dstAddr)

bgStdout := &safeBuffer{}
bgStderr := &safeBuffer{}
bgExited := make(chan struct{})
killCmdCtx, killCmd := context.WithCancel(context.Background())
// Start kubectl exec in bg (=goroutine)
type leakSniffer struct {
host *check.Pod
dumpPath string

stdout safeBuffer
cancel context.CancelFunc
exited chan error
}

func startLeakSniffer(ctx context.Context, t *check.Test, host *check.Pod,
iface string, filter string,
) (*leakSniffer, error) {
cmdctx, cancel := context.WithCancel(ctx)
sniffer := &leakSniffer{
host: host,
dumpPath: fmt.Sprintf("/tmp/%s-%s.pcap", t.Name(), host.Pod.Name),
cancel: cancel,
exited: make(chan error, 1),
}

go func() {
protoFilter := ""
switch reqType {
case requestHTTP:
protoFilter = "tcp"
case requestICMPEcho:
protoFilter = "icmp"
if ipFam == features.IPFamilyV6 {
protoFilter = "icmp6"
}
}
// Run tcpdump with -w instead of directly printing captured pkts. This
// is to avoid a race after sending ^C (triggered by bgCancel()) which
// might terminate the tcpdump process before it gets a chance to dump
// its captures.
cmd := []string{
"tcpdump", "-i", iface, "--immediate-mode", "-w", fmt.Sprintf("/tmp/%s.pcap", t.Name()),
// Capture egress traffic.
// Unfortunately, we cannot use "host %s and host %s" filter here,
// as IPsec recirculates replies to the iface netdev, which would
// make tcpdump to capture the pkts (false positive).
fmt.Sprintf("%s and dst host %s and %s", srcFilter, dstAddr, protoFilter),
"tcpdump", "-i", iface, "--immediate-mode",
"-w", sniffer.dumpPath, filter,
}

t.Debugf("Running in bg: %s", strings.Join(cmd, " "))
err := clientHost.K8sClient.ExecInPodWithWriters(ctx, killCmdCtx,
clientHost.Pod.Namespace, clientHost.Pod.Name, "", cmd, bgStdout, bgStderr)
err := host.K8sClient.ExecInPodWithWriters(ctx, cmdctx,
host.Pod.Namespace, host.Pod.Name, "", cmd, &sniffer.stdout, io.Discard)
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("Failed to execute tcpdump: %s", err)
sniffer.exited <- err
}
close(bgExited)

close(sniffer.exited)
}()

// Wait until tcpdump is ready to capture pkts
timeout := time.After(5 * time.Second)
for found := false; !found; {
wctx, wcancel := context.WithTimeout(ctx, 5*time.Second)
defer wcancel()
for {
select {
case <-timeout:
t.Fatalf("Failed to wait for tcpdump to be ready")
default:
line, err := bgStdout.ReadString('\n')
case <-wctx.Done():
return nil, fmt.Errorf("Failed to wait for tcpdump to be ready")
case err := <-sniffer.exited:
return nil, fmt.Errorf("Failed to execute tcpdump: %w", err)
case <-time.After(100 * time.Millisecond):
line, err := sniffer.stdout.ReadString('\n')
if err != nil && !errors.Is(err, io.EOF) {
t.Fatalf("Failed to read kubectl exec's stdout: %s", err)
return nil, fmt.Errorf("Failed to read kubectl exec's stdout: %w", err)
}
if strings.Contains(line, fmt.Sprintf("listening on %s", iface)) {
found = true
break
return sniffer, nil
}
time.Sleep(100 * time.Millisecond)
}
}
}

switch reqType {
case requestHTTP:
// Curl the server from the client to generate some traffic
t.NewAction(s, fmt.Sprintf("curl-%s", ipFam), client, server, ipFam).Run(func(a *check.Action) {
a.ExecInPod(ctx, t.Context().CurlCommand(server, ipFam))
})
case requestICMPEcho:
// Ping the server from the client to generate some traffic
t.NewAction(s, fmt.Sprintf("ping-%s", ipFam), client, server, ipFam).Run(func(a *check.Action) {
a.ExecInPod(ctx, t.Context().PingCommand(server, ipFam))
})
default:
t.Fatalf("Invalid request type: %d", reqType)
}

func (sniffer *leakSniffer) validate(ctx context.Context, a *check.Action, assertNoLeaks, debug bool) {
// Wait until tcpdump has exited
killCmd()
<-bgExited
sniffer.cancel()
if err := <-sniffer.exited; err != nil {
a.Fatalf("Failed to execute tcpdump: %w", err)
}

// Redirect stderr to /dev/null, as tcpdump logs to stderr, and ExecInPod
// will return an error if any char is written to stderr. Anyway, the count
// is written to stdout.
cmd := []string{"/bin/sh", "-c", fmt.Sprintf("tcpdump -r /tmp/%s.pcap --count 2>/dev/null", t.Name())}
count, err := clientHost.K8sClient.ExecInPod(ctx, clientHost.Pod.Namespace, clientHost.Pod.Name, "", cmd)
cmd := []string{"/bin/sh", "-c", fmt.Sprintf("tcpdump -r %s --count 2>/dev/null", sniffer.dumpPath)}
count, err := sniffer.host.K8sClient.ExecInPod(ctx, sniffer.host.Pod.Namespace, sniffer.host.Pod.Name, "", cmd)
if err != nil {
t.Fatalf("Failed to retrieve tcpdump pkt count: %s", err)
a.Fatalf("Failed to retrieve tcpdump pkt count: %s", err)
}

if !strings.Contains(count.String(), "packet") {
a.Fatalf("tcpdump output doesn't look correct: %s", count.String())
}

if !strings.HasPrefix(count.String(), "0 packets") && assertNoLeaks {
t.Failf("Captured unencrypted pkt (count=%s)", strings.TrimRight(count.String(), "\n\r"))
a.Failf("Captured unencrypted pkt (count=%s)", strings.TrimRight(count.String(), "\n\r"))

// If debug mode is enabled, dump the captured pkts
if t.Context().Params().Debug {
cmd := []string{"/bin/sh", "-c", fmt.Sprintf("tcpdump -r /tmp/%s.pcap 2>/dev/null", t.Name())}
out, err := clientHost.K8sClient.ExecInPod(ctx, clientHost.Pod.Namespace, clientHost.Pod.Name, "", cmd)
if debug {
cmd := []string{"/bin/sh", "-c", fmt.Sprintf("tcpdump -r %s 2>/dev/null", sniffer.dumpPath)}
out, err := sniffer.host.K8sClient.ExecInPod(ctx, sniffer.host.Pod.Namespace, sniffer.host.Pod.Name, "", cmd)
if err != nil {
t.Fatalf("Failed to retrieve tcpdump output: %s", err)
a.Fatalf("Failed to retrieve tcpdump output: %s", err)
}
t.Debugf("Captured pkts:\n%s", out.String())
a.Debugf("Captured pkts:\n%s", out.String())
}
}

if strings.HasPrefix(count.String(), "0 packets") && !assertNoLeaks {
t.Failf("Expected to see unencrypted packets, but none found. This check might be broken")
a.Failf("Expected to see unencrypted packets, but none found. This check might be broken")
}
}

func testNoTrafficLeak(ctx context.Context, t *check.Test, s check.Scenario,
client, server, clientHost *check.Pod, serverHost *check.Pod, /* serverHost=nil disables the bidirectional check */
reqType requestType, ipFam features.IPFamily, assertNoLeaks bool,
) {
protoFilter := ""
switch reqType {
case requestHTTP:
protoFilter = "tcp"
case requestICMPEcho:
protoFilter = "icmp"
if ipFam == features.IPFamilyV6 {
protoFilter = "icmp6"
}
default:
t.Fatalf("Invalid request type: %d", reqType)
}

srcAddr, dstAddr := client.Address(ipFam), server.Address(ipFam)
srcAddrFilter := getSourceAddressFilter(ctx, t, client, clientHost, ipFam, dstAddr)
srcIface := getInterNodeIface(ctx, t, clientHost, ipFam, client.Address(ipFam), dstAddr)

// Capture egress traffic.
// Unfortunately, we cannot use "host %s and host %s" filter here,
// as IPsec recirculates replies to the iface netdev, which would
// make tcpdump to capture the pkts (false positive).
srcFilter := fmt.Sprintf("%s and dst host %s and %s", srcAddrFilter, dstAddr, protoFilter)

srcSniffer, err := startLeakSniffer(ctx, t, clientHost, srcIface, srcFilter)
if err != nil {
t.Fatal(err)
}

var dstSniffer *leakSniffer
if serverHost != nil {
dstAddrFilter := strings.ReplaceAll(srcAddrFilter, "src", "dst")
dstIface := getInterNodeIface(ctx, t, serverHost, ipFam, server.Address(ipFam), srcAddr)
dstFilter := fmt.Sprintf("src host %s and %s and %s", dstAddr, dstAddrFilter, protoFilter)

dstSniffer, err = startLeakSniffer(ctx, t, serverHost, dstIface, dstFilter)
if err != nil {
t.Fatal(err)
}
}

switch reqType {
case requestHTTP:
// Curl the server from the client to generate some traffic
t.NewAction(s, fmt.Sprintf("curl-%s", ipFam), client, server, ipFam).Run(func(a *check.Action) {
a.ExecInPod(ctx, t.Context().CurlCommand(server, ipFam))
srcSniffer.validate(ctx, a, assertNoLeaks, t.Context().Params().Debug)
if dstSniffer != nil {
dstSniffer.validate(ctx, a, assertNoLeaks, t.Context().Params().Debug)
}
})
case requestICMPEcho:
// Ping the server from the client to generate some traffic
t.NewAction(s, fmt.Sprintf("ping-%s", ipFam), client, server, ipFam).Run(func(a *check.Action) {
a.ExecInPod(ctx, t.Context().PingCommand(server, ipFam))
srcSniffer.validate(ctx, a, assertNoLeaks, t.Context().Params().Debug)
if dstSniffer != nil {
dstSniffer.validate(ctx, a, assertNoLeaks, t.Context().Params().Debug)
}
})
}
}

Expand Down Expand Up @@ -320,10 +378,10 @@ func (s *nodeToNodeEncryption) Run(ctx context.Context, t *check.Test) {
t.ForEachIPFamily(func(ipFam features.IPFamily) {
// Test pod-to-remote-host (ICMP Echo instead of HTTP because a remote host
// does not have a HTTP server running)
testNoTrafficLeak(ctx, t, s, client, &serverHost, &clientHost, requestICMPEcho, ipFam, assertNoLeaks)
testNoTrafficLeak(ctx, t, s, client, &serverHost, &clientHost, nil, requestICMPEcho, ipFam, assertNoLeaks)
// Test host-to-remote-host
testNoTrafficLeak(ctx, t, s, &clientHost, &serverHost, &clientHost, requestICMPEcho, ipFam, assertNoLeaks)
testNoTrafficLeak(ctx, t, s, &clientHost, &serverHost, &clientHost, nil, requestICMPEcho, ipFam, assertNoLeaks)
// Test host-to-remote-pod
testNoTrafficLeak(ctx, t, s, &clientHost, &server, &clientHost, requestHTTP, ipFam, assertNoLeaks)
testNoTrafficLeak(ctx, t, s, &clientHost, &server, &clientHost, nil, requestHTTP, ipFam, assertNoLeaks)
})
}

0 comments on commit 33b2591

Please sign in to comment.