Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't miss, or fail to forget, initial connections #2704

Merged
merged 2 commits into from
Jul 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration/330_process_edge_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
# shellcheck disable=SC1091
. ./config.sh

start_suite "Test long connections (procspy) between processes"
start_suite "Test long connections between processes"

weave_on "$HOST1" launch
scope_on "$HOST1" launch --probe.ebpf.connections=false --probe.conntrack=false
scope_on "$HOST1" launch

server_on "$HOST1"
weave_proxy_on "$HOST1" run -dti --name client alpine /bin/sh -c "while true; do \
Expand Down
23 changes: 23 additions & 0 deletions integration/331_process_edge_without_ebpf_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#! /bin/bash

# shellcheck disable=SC1091
. ./config.sh

start_suite "Test long connections (procspy) between processes"

weave_on "$HOST1" launch
scope_on "$HOST1" launch --probe.ebpf.connections=false

server_on "$HOST1"
weave_proxy_on "$HOST1" run -dti --name client alpine /bin/sh -c "while true; do \
nc nginx.weave.local 80 || true; \
sleep 1; \
done"

wait_for processes "$HOST1" 60 "nginx: worker process" nc

has processes "$HOST1" "nginx: worker process"
has processes "$HOST1" nc
has_connection processes "$HOST1" nc "nginx: worker process"

scope_end_suite
6 changes: 3 additions & 3 deletions integration/340_process_edge_across_host_2_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
# shellcheck disable=SC1091
. ./config.sh

start_suite "Test long connections (procspy) between processes on different hosts"
start_suite "Test long connections between processes on different hosts"

weave_on "$HOST1" launch "$HOST1" "$HOST2"
weave_on "$HOST2" launch "$HOST1" "$HOST2"

scope_on "$HOST1" launch --probe.ebpf.connections=false --probe.conntrack=false
scope_on "$HOST2" launch --probe.ebpf.connections=false --probe.conntrack=false
scope_on "$HOST1" launch
scope_on "$HOST2" launch

server_on "$HOST1"
weave_proxy_on "$HOST2" run -dti --name client alpine /bin/sh -c "while true; do \
Expand Down
31 changes: 31 additions & 0 deletions integration/341_process_edge_across_host_without_ebpf_2_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#! /bin/bash

# shellcheck disable=SC1091
. ./config.sh

start_suite "Test long connections (procspy) between processes on different hosts"

weave_on "$HOST1" launch "$HOST1" "$HOST2"
weave_on "$HOST2" launch "$HOST1" "$HOST2"

scope_on "$HOST1" launch --probe.ebpf.connections=false
scope_on "$HOST2" launch --probe.ebpf.connections=false

server_on "$HOST1"
weave_proxy_on "$HOST2" run -dti --name client alpine /bin/sh -c "while true; do \
nc nginx.weave.local 80 || true; \
sleep 1; \
done"

sleep 30 # need to allow the scopes to poll dns, resolve the other app ids, and send them reports

check() {
has processes "$1" "nginx: worker process"
has processes "$1" nc
has_connection processes "$1" nc "nginx: worker process"
}

check "$HOST1"
check "$HOST2"

scope_end_suite
40 changes: 21 additions & 19 deletions probe/endpoint/ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ type ebpfConnection struct {
// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`.
type EbpfTracker struct {
sync.Mutex
tracer *tracer.Tracer
readyToHandleConnections bool
dead bool
lastTimestampV4 uint64
tracer *tracer.Tracer
ready bool
dead bool
lastTimestampV4 uint64

openConnections map[fourTuple]ebpfConnection
closedConnections []ebpfConnection
closedDuringInit map[fourTuple]struct{}
}

var releaseRegex = regexp.MustCompile(`^(\d+)\.(\d+).*$`)
Expand Down Expand Up @@ -77,7 +78,8 @@ func newEbpfTracker() (*EbpfTracker, error) {
}

tracker := &EbpfTracker{
openConnections: map[fourTuple]ebpfConnection{},
openConnections: map[fourTuple]ebpfConnection{},
closedDuringInit: map[fourTuple]struct{}{},
}

tracer, err := tracer.NewTracer(tracker.tcpEventCbV4, tracker.tcpEventCbV6, tracker.lostCb)
Expand Down Expand Up @@ -194,10 +196,6 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid
t.Lock()
defer t.Unlock()

if !t.isReadyToHandleConnections() {
return
}

log.Debugf("handleConnection(%v, [%v:%v --> %v:%v], pid=%v, netNS=%v)",
ev, tuple.fromAddr, tuple.fromPort, tuple.toAddr, tuple.toPort, pid, networkNamespace)

Expand All @@ -217,6 +215,9 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid
networkNamespace: networkNamespace,
}
case tracer.EventClose:
if !t.ready {
t.closedDuringInit[tuple] = struct{}{}
}
if deadConn, ok := t.openConnections[tuple]; ok {
delete(t.openConnections, tuple)
t.closedConnections = append(t.closedConnections, deadConn)
Expand Down Expand Up @@ -247,14 +248,19 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples
t.Lock()
for conn := conns.Next(); conn != nil; conn = conns.Next() {
tuple, namespaceID, incoming := connectionTuple(conn, seenTuples)
t.openConnections[tuple] = ebpfConnection{
incoming: incoming,
tuple: tuple,
pid: int(conn.Proc.PID),
networkNamespace: namespaceID,
if _, ok := t.closedDuringInit[tuple]; !ok {
if _, ok := t.openConnections[tuple]; !ok {
t.openConnections[tuple] = ebpfConnection{
incoming: incoming,
tuple: tuple,
pid: int(conn.Proc.PID),
networkNamespace: namespaceID,
}
}
}
}
t.readyToHandleConnections = true
t.closedDuringInit = nil
t.ready = true
t.Unlock()

for _, p := range processesWaitingInAccept {
Expand All @@ -263,10 +269,6 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples
}
}

func (t *EbpfTracker) isReadyToHandleConnections() bool {
return t.readyToHandleConnections
}

func (t *EbpfTracker) isDead() bool {
t.Lock()
defer t.Unlock()
Expand Down
63 changes: 25 additions & 38 deletions probe/endpoint/ebpf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ import (
"github.com/weaveworks/tcptracer-bpf/pkg/tracer"
)

func newMockEbpfTracker() *EbpfTracker {
return &EbpfTracker{
ready: true,
dead: false,

openConnections: map[fourTuple]ebpfConnection{},
}
}

func TestHandleConnection(t *testing.T) {
var (
ServerPid uint32 = 42
Expand Down Expand Up @@ -92,13 +101,7 @@ func TestHandleConnection(t *testing.T) {
}
)

mockEbpfTracker := &EbpfTracker{
readyToHandleConnections: true,
dead: false,

openConnections: map[fourTuple]ebpfConnection{},
closedConnections: []ebpfConnection{},
}
mockEbpfTracker := newMockEbpfTracker()

tuple := fourTuple{IPv4ConnectEvent.SAddr.String(), IPv4ConnectEvent.DAddr.String(), uint16(IPv4ConnectEvent.SPort), uint16(IPv4ConnectEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4ConnectEvent.Type, tuple, int(IPv4ConnectEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectEvent.NetNS), 10))
Expand All @@ -114,13 +117,7 @@ func TestHandleConnection(t *testing.T) {
mockEbpfTracker.openConnections[tuple])
}

mockEbpfTracker = &EbpfTracker{
readyToHandleConnections: true,
dead: false,

openConnections: map[fourTuple]ebpfConnection{},
closedConnections: []ebpfConnection{},
}
mockEbpfTracker = newMockEbpfTracker()

tuple = fourTuple{IPv4AcceptEvent.SAddr.String(), IPv4AcceptEvent.DAddr.String(), uint16(IPv4AcceptEvent.SPort), uint16(IPv4AcceptEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4AcceptEvent.Type, tuple, int(IPv4AcceptEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptEvent.NetNS), 10))
Expand Down Expand Up @@ -155,26 +152,20 @@ func TestWalkConnections(t *testing.T) {
toPort: 0,
}
)
mockEbpfTracker := &EbpfTracker{
readyToHandleConnections: true,
dead: false,
openConnections: map[fourTuple]ebpfConnection{
activeTuple: {
tuple: activeTuple,
networkNamespace: "12345",
incoming: true,
pid: 0,
},
},
closedConnections: []ebpfConnection{
{
tuple: inactiveTuple,
networkNamespace: "12345",
incoming: false,
pid: 0,
},
},
mockEbpfTracker := newMockEbpfTracker()
mockEbpfTracker.openConnections[activeTuple] = ebpfConnection{
tuple: activeTuple,
networkNamespace: "12345",
incoming: true,
pid: 0,
}
mockEbpfTracker.closedConnections = append(mockEbpfTracker.closedConnections,
ebpfConnection{
tuple: inactiveTuple,
networkNamespace: "12345",
incoming: false,
pid: 0,
})
mockEbpfTracker.walkConnections(func(e ebpfConnection) {
cnt++
})
Expand Down Expand Up @@ -204,11 +195,7 @@ func TestInvalidTimeStampDead(t *testing.T) {
NetNS: NetNS,
}
)
mockEbpfTracker := &EbpfTracker{
readyToHandleConnections: true,
dead: false,
openConnections: map[fourTuple]ebpfConnection{},
}
mockEbpfTracker := newMockEbpfTracker()
event.Timestamp = 0
mockEbpfTracker.tcpEventCbV4(event)
event2 := event
Expand Down