From ebc3cddf01825cb36a8b4243dd5f580f451b6821 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 11 Jul 2017 20:20:11 +0100 Subject: [PATCH 1/2] don't miss, or fail to forget, initial connections ...when initialising eBPF-based connection tracking. Previously we were ignoring all eBPF events until we had gathered the existing connections. That means we could a) miss connections created during the gathering, and b) fail to forget connections that got closed during the gathering. The fix comprises the following changes: 1. pay attention to eBPF events immediately. That way we do not miss anything. 2. remember connections for which we received a Close event during the initalisation phase, and subsequently drop gathered existing connections that match these. That way we do not erroneously consider a gathered connection as open when it got closed since the gathering. 3. drop gathered existing connections which match connections detected through eBPF events. The latter typically have more / current metadata. In particular, PIDs can be missing from the former. Fixes #2689. Fixes #2700. --- probe/endpoint/ebpf.go | 40 ++++++++++++----------- probe/endpoint/ebpf_test.go | 63 +++++++++++++++---------------------- 2 files changed, 46 insertions(+), 57 deletions(-) diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go index f9d57dcec2..f7bbd6861e 100644 --- a/probe/endpoint/ebpf.go +++ b/probe/endpoint/ebpf.go @@ -28,13 +28,14 @@ type ebpfConnection struct { // Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`. type EbpfTracker struct { sync.Mutex - tracer *tracer.Tracer - readyToHandleConnections bool - dead bool - lastTimestampV4 uint64 + tracer *tracer.Tracer + ready bool + dead bool + lastTimestampV4 uint64 openConnections map[fourTuple]ebpfConnection closedConnections []ebpfConnection + closedDuringInit map[fourTuple]struct{} } var releaseRegex = regexp.MustCompile(`^(\d+)\.(\d+).*$`) @@ -77,7 +78,8 @@ func newEbpfTracker() (*EbpfTracker, error) { } tracker := &EbpfTracker{ - openConnections: map[fourTuple]ebpfConnection{}, + openConnections: map[fourTuple]ebpfConnection{}, + closedDuringInit: map[fourTuple]struct{}{}, } tracer, err := tracer.NewTracer(tracker.tcpEventCbV4, tracker.tcpEventCbV6, tracker.lostCb) @@ -194,10 +196,6 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid t.Lock() defer t.Unlock() - if !t.isReadyToHandleConnections() { - return - } - log.Debugf("handleConnection(%v, [%v:%v --> %v:%v], pid=%v, netNS=%v)", ev, tuple.fromAddr, tuple.fromPort, tuple.toAddr, tuple.toPort, pid, networkNamespace) @@ -217,6 +215,9 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid networkNamespace: networkNamespace, } case tracer.EventClose: + if !t.ready { + t.closedDuringInit[tuple] = struct{}{} + } if deadConn, ok := t.openConnections[tuple]; ok { delete(t.openConnections, tuple) t.closedConnections = append(t.closedConnections, deadConn) @@ -247,14 +248,19 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples t.Lock() for conn := conns.Next(); conn != nil; conn = conns.Next() { tuple, namespaceID, incoming := connectionTuple(conn, seenTuples) - t.openConnections[tuple] = ebpfConnection{ - incoming: incoming, - tuple: tuple, - pid: int(conn.Proc.PID), - networkNamespace: namespaceID, + if _, ok := t.closedDuringInit[tuple]; !ok { + if _, ok := t.openConnections[tuple]; !ok { + t.openConnections[tuple] = ebpfConnection{ + incoming: incoming, + tuple: tuple, + pid: int(conn.Proc.PID), + networkNamespace: namespaceID, + } + } } } - t.readyToHandleConnections = true + t.closedDuringInit = nil + t.ready = true t.Unlock() for _, p := range processesWaitingInAccept { @@ -263,10 +269,6 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples } } -func (t *EbpfTracker) isReadyToHandleConnections() bool { - return t.readyToHandleConnections -} - func (t *EbpfTracker) isDead() bool { t.Lock() defer t.Unlock() diff --git a/probe/endpoint/ebpf_test.go b/probe/endpoint/ebpf_test.go index 24d96aab55..6586cf9d8e 100644 --- a/probe/endpoint/ebpf_test.go +++ b/probe/endpoint/ebpf_test.go @@ -9,6 +9,15 @@ import ( "github.com/weaveworks/tcptracer-bpf/pkg/tracer" ) +func newMockEbpfTracker() *EbpfTracker { + return &EbpfTracker{ + ready: true, + dead: false, + + openConnections: map[fourTuple]ebpfConnection{}, + } +} + func TestHandleConnection(t *testing.T) { var ( ServerPid uint32 = 42 @@ -92,13 +101,7 @@ func TestHandleConnection(t *testing.T) { } ) - mockEbpfTracker := &EbpfTracker{ - readyToHandleConnections: true, - dead: false, - - openConnections: map[fourTuple]ebpfConnection{}, - closedConnections: []ebpfConnection{}, - } + mockEbpfTracker := newMockEbpfTracker() tuple := fourTuple{IPv4ConnectEvent.SAddr.String(), IPv4ConnectEvent.DAddr.String(), uint16(IPv4ConnectEvent.SPort), uint16(IPv4ConnectEvent.DPort)} mockEbpfTracker.handleConnection(IPv4ConnectEvent.Type, tuple, int(IPv4ConnectEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectEvent.NetNS), 10)) @@ -114,13 +117,7 @@ func TestHandleConnection(t *testing.T) { mockEbpfTracker.openConnections[tuple]) } - mockEbpfTracker = &EbpfTracker{ - readyToHandleConnections: true, - dead: false, - - openConnections: map[fourTuple]ebpfConnection{}, - closedConnections: []ebpfConnection{}, - } + mockEbpfTracker = newMockEbpfTracker() tuple = fourTuple{IPv4AcceptEvent.SAddr.String(), IPv4AcceptEvent.DAddr.String(), uint16(IPv4AcceptEvent.SPort), uint16(IPv4AcceptEvent.DPort)} mockEbpfTracker.handleConnection(IPv4AcceptEvent.Type, tuple, int(IPv4AcceptEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptEvent.NetNS), 10)) @@ -155,26 +152,20 @@ func TestWalkConnections(t *testing.T) { toPort: 0, } ) - mockEbpfTracker := &EbpfTracker{ - readyToHandleConnections: true, - dead: false, - openConnections: map[fourTuple]ebpfConnection{ - activeTuple: { - tuple: activeTuple, - networkNamespace: "12345", - incoming: true, - pid: 0, - }, - }, - closedConnections: []ebpfConnection{ - { - tuple: inactiveTuple, - networkNamespace: "12345", - incoming: false, - pid: 0, - }, - }, + mockEbpfTracker := newMockEbpfTracker() + mockEbpfTracker.openConnections[activeTuple] = ebpfConnection{ + tuple: activeTuple, + networkNamespace: "12345", + incoming: true, + pid: 0, } + mockEbpfTracker.closedConnections = append(mockEbpfTracker.closedConnections, + ebpfConnection{ + tuple: inactiveTuple, + networkNamespace: "12345", + incoming: false, + pid: 0, + }) mockEbpfTracker.walkConnections(func(e ebpfConnection) { cnt++ }) @@ -204,11 +195,7 @@ func TestInvalidTimeStampDead(t *testing.T) { NetNS: NetNS, } ) - mockEbpfTracker := &EbpfTracker{ - readyToHandleConnections: true, - dead: false, - openConnections: map[fourTuple]ebpfConnection{}, - } + mockEbpfTracker := newMockEbpfTracker() event.Timestamp = 0 mockEbpfTracker.tcpEventCbV4(event) event2 := event From b39b3e627409c2ef2698b698fcfb25f84e732670 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 11 Jul 2017 20:52:33 +0100 Subject: [PATCH 2/2] re-add tests that were previously failing when using ebpf --- integration/330_process_edge_test.sh | 4 +-- .../331_process_edge_without_ebpf_test.sh | 23 ++++++++++++++ .../340_process_edge_across_host_2_test.sh | 6 ++-- ...ss_edge_across_host_without_ebpf_2_test.sh | 31 +++++++++++++++++++ 4 files changed, 59 insertions(+), 5 deletions(-) create mode 100755 integration/331_process_edge_without_ebpf_test.sh create mode 100755 integration/341_process_edge_across_host_without_ebpf_2_test.sh diff --git a/integration/330_process_edge_test.sh b/integration/330_process_edge_test.sh index e3f3ef8037..7fadbe94a6 100755 --- a/integration/330_process_edge_test.sh +++ b/integration/330_process_edge_test.sh @@ -3,10 +3,10 @@ # shellcheck disable=SC1091 . ./config.sh -start_suite "Test long connections (procspy) between processes" +start_suite "Test long connections between processes" weave_on "$HOST1" launch -scope_on "$HOST1" launch --probe.ebpf.connections=false --probe.conntrack=false +scope_on "$HOST1" launch server_on "$HOST1" weave_proxy_on "$HOST1" run -dti --name client alpine /bin/sh -c "while true; do \ diff --git a/integration/331_process_edge_without_ebpf_test.sh b/integration/331_process_edge_without_ebpf_test.sh new file mode 100755 index 0000000000..712820d4b5 --- /dev/null +++ b/integration/331_process_edge_without_ebpf_test.sh @@ -0,0 +1,23 @@ +#! /bin/bash + +# shellcheck disable=SC1091 +. ./config.sh + +start_suite "Test long connections (procspy) between processes" + +weave_on "$HOST1" launch +scope_on "$HOST1" launch --probe.ebpf.connections=false + +server_on "$HOST1" +weave_proxy_on "$HOST1" run -dti --name client alpine /bin/sh -c "while true; do \ + nc nginx.weave.local 80 || true; \ + sleep 1; \ +done" + +wait_for processes "$HOST1" 60 "nginx: worker process" nc + +has processes "$HOST1" "nginx: worker process" +has processes "$HOST1" nc +has_connection processes "$HOST1" nc "nginx: worker process" + +scope_end_suite diff --git a/integration/340_process_edge_across_host_2_test.sh b/integration/340_process_edge_across_host_2_test.sh index 161c249297..50d8e1006c 100755 --- a/integration/340_process_edge_across_host_2_test.sh +++ b/integration/340_process_edge_across_host_2_test.sh @@ -3,13 +3,13 @@ # shellcheck disable=SC1091 . ./config.sh -start_suite "Test long connections (procspy) between processes on different hosts" +start_suite "Test long connections between processes on different hosts" weave_on "$HOST1" launch "$HOST1" "$HOST2" weave_on "$HOST2" launch "$HOST1" "$HOST2" -scope_on "$HOST1" launch --probe.ebpf.connections=false --probe.conntrack=false -scope_on "$HOST2" launch --probe.ebpf.connections=false --probe.conntrack=false +scope_on "$HOST1" launch +scope_on "$HOST2" launch server_on "$HOST1" weave_proxy_on "$HOST2" run -dti --name client alpine /bin/sh -c "while true; do \ diff --git a/integration/341_process_edge_across_host_without_ebpf_2_test.sh b/integration/341_process_edge_across_host_without_ebpf_2_test.sh new file mode 100755 index 0000000000..5dd41c507b --- /dev/null +++ b/integration/341_process_edge_across_host_without_ebpf_2_test.sh @@ -0,0 +1,31 @@ +#! /bin/bash + +# shellcheck disable=SC1091 +. ./config.sh + +start_suite "Test long connections (procspy) between processes on different hosts" + +weave_on "$HOST1" launch "$HOST1" "$HOST2" +weave_on "$HOST2" launch "$HOST1" "$HOST2" + +scope_on "$HOST1" launch --probe.ebpf.connections=false +scope_on "$HOST2" launch --probe.ebpf.connections=false + +server_on "$HOST1" +weave_proxy_on "$HOST2" run -dti --name client alpine /bin/sh -c "while true; do \ + nc nginx.weave.local 80 || true; \ + sleep 1; \ +done" + +sleep 30 # need to allow the scopes to poll dns, resolve the other app ids, and send them reports + +check() { + has processes "$1" "nginx: worker process" + has processes "$1" nc + has_connection processes "$1" nc "nginx: worker process" +} + +check "$HOST1" +check "$HOST2" + +scope_end_suite