Skip to content

Commit

Permalink
Merge pull request #2518 from kinvolk/alban/fdinstall
Browse files Browse the repository at this point in the history
handle fdinstall events from tcptracer-bpf (aka "accept before kretprobe" issue)
  • Loading branch information
Alfonso Acosta authored May 19, 2017
2 parents dbdb648 + b761b5e commit 708c089
Show file tree
Hide file tree
Showing 28 changed files with 419 additions and 92 deletions.
39 changes: 39 additions & 0 deletions integration/314_container_accept_before_kretprobe_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#! /bin/bash

# shellcheck disable=SC1091
. ./config.sh

start_suite "Test accept before kretprobe, see https://github.com/weaveworks/tcptracer-bpf/issues/10"

weave_on "$HOST1" launch

# Launch the server before Scope to make sure it calls accept() before Scope's
# kretprobe on the accept function is installed. We use busybox' nc instead of
# Alpine's nc so that it blocks on the accept() syscall.
weave_on "$HOST1" run -d --name server busybox /bin/sh -c "while true; do \
date ;
sleep 1 ;
done | nc -l -p 8080"

scope_on "$HOST1" launch --probe.ebpf.connections=true
wait_for_containers "$HOST1" 60 server
has_container "$HOST1" server

weave_on "$HOST1" run -d --name client busybox /bin/sh -c "ping -c 5 server.weave.local; \
while true; do \
date ;
sleep 1 ;
done | nc server.weave.local 8080"

wait_for_containers "$HOST1" 60 server client

has_container "$HOST1" client

list_containers "$HOST1"
list_connections "$HOST1"

has_connection containers "$HOST1" client server

endpoints_have_ebpf "$HOST1"

scope_end_suite
7 changes: 4 additions & 3 deletions integration/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ scope_end_suite() {
list_containers() {
local host=$1
echo "Listing containers on ${host}:"
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("metadata")) | .metadata[] | select(.id == "docker_image_name") | .value'
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("metadata")) | { "image": .metadata[] | select(.id == "docker_image_name") | .value, "label": .label, "id": .id} | .id + " (" + .image + ", " + .label + ")"'
echo
}

list_connections() {
local host=$1
echo "Listing connections on ${host}:"
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("adjacency")) | { "from": .id, "to": .adjacency[]} | .from + " -> " + .to'

curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("adjacency")) | { "from_name": .label, "from_id": .id, "to": .adjacency[]} | .from_id + " (" + .from_name+ ") -> " + .to'
echo
}

# this checks we have a named node in the given view
Expand Down
1 change: 1 addition & 0 deletions integration/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ setup_host() {
echo Prefetching Images on "$HOST"
docker_on "$HOST" pull peterbourgon/tns-db
docker_on "$HOST" pull alpine
docker_on "$HOST" pull busybox
docker_on "$HOST" pull nginx
}

Expand Down
12 changes: 10 additions & 2 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin
// once to initialize ebpfTracker
func (t *connectionTracker) getInitialState() {
var processCache *process.CachingWalker
processCache = process.NewCachingWalker(process.NewWalker(t.conf.ProcRoot))
walker := process.NewWalker(t.conf.ProcRoot, true)
processCache = process.NewCachingWalker(walker)
processCache.Tick()

scanner := procspy.NewSyncConnectionScanner(processCache)
Expand All @@ -194,7 +195,14 @@ func (t *connectionTracker) getInitialState() {
}
scanner.Stop()

t.ebpfTracker.feedInitialConnections(conns, seenTuples, report.MakeHostNodeID(t.conf.HostID))
processesWaitingInAccept := []int{}
processCache.Walk(func(p, prev process.Process) {
if p.IsWaitingInAccept {
processesWaitingInAccept = append(processesWaitingInAccept, p.PID)
}
})

t.ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(t.conf.HostID))
}

func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {
Expand Down
89 changes: 85 additions & 4 deletions probe/endpoint/ebpf.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package endpoint

import (
"bytes"
"fmt"
"regexp"
"strconv"
"sync"
"syscall"

log "github.com/Sirupsen/logrus"
"github.com/weaveworks/common/fs"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/tcptracer-bpf/pkg/tracer"
)

Expand All @@ -23,7 +27,7 @@ type ebpfConnection struct {
type eventTracker interface {
handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string)
walkConnections(f func(ebpfConnection))
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string)
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string)
isReadyToHandleConnections() bool
isDead() bool
stop()
Expand Down Expand Up @@ -111,8 +115,12 @@ func tcpEventCbV4(e tracer.TcpV4) {

lastTimestampV4 = e.Timestamp

tuple := fourTuple{e.SAddr.String(), e.DAddr.String(), e.SPort, e.DPort}
ebpfTracker.handleConnection(e.Type, tuple, int(e.Pid), strconv.Itoa(int(e.NetNS)))
if e.Type == tracer.EventFdInstall {
ebpfTracker.handleFdInstall(e.Type, int(e.Pid), int(e.Fd))
} else {
tuple := fourTuple{e.SAddr.String(), e.DAddr.String(), e.SPort, e.DPort}
ebpfTracker.handleConnection(e.Type, tuple, int(e.Pid), strconv.Itoa(int(e.NetNS)))
}
}

func tcpEventCbV6(e tracer.TcpV6) {
Expand All @@ -125,6 +133,73 @@ func lostCb(count uint64) {
ebpfTracker.stop()
}

func tupleFromPidFd(pid int, fd int) (tuple fourTuple, netns string, ok bool) {
// read /proc/$pid/ns/net
//
// probe/endpoint/procspy/proc_linux.go supports Linux < 3.8 but we
// don't need that here since ebpf-enabled kernels will be > 3.8
netnsIno, err := procspy.ReadNetnsFromPID(pid)
if err != nil {
log.Debugf("netns proc file for pid %d disappeared before we could read it: %v", pid, err)
return fourTuple{}, "", false
}
netns = fmt.Sprintf("%d", netnsIno)

// find /proc/$pid/fd/$fd's ino
fdFilename := fmt.Sprintf("/proc/%d/fd/%d", pid, fd)
var statFdFile syscall.Stat_t
if err := fs.Stat(fdFilename, &statFdFile); err != nil {
log.Debugf("proc file %q disappeared before we could read it", fdFilename)
return fourTuple{}, "", false
}

if statFdFile.Mode&syscall.S_IFMT != syscall.S_IFSOCK {
log.Errorf("file %q is not a socket", fdFilename)
return fourTuple{}, "", false
}
ino := statFdFile.Ino

// read both /proc/pid/net/{tcp,tcp6}
buf := bytes.NewBuffer(make([]byte, 0, 5000))
if _, err := procspy.ReadTCPFiles(pid, buf); err != nil {
log.Debugf("TCP proc file for pid %d disappeared before we could read it: %v", pid, err)
return fourTuple{}, "", false
}

// find /proc/$pid/fd/$fd's ino in /proc/pid/net/tcp
pn := procspy.NewProcNet(buf.Bytes())
for {
n := pn.Next()
if n == nil {
log.Debugf("connection for proc file %q not found. buf=%q", fdFilename, buf.String())
break
}
if n.Inode == ino {
return fourTuple{n.LocalAddress.String(), n.RemoteAddress.String(), n.LocalPort, n.RemotePort}, netns, true
}
}

return fourTuple{}, "", false
}

func (t *EbpfTracker) handleFdInstall(ev tracer.EventType, pid int, fd int) {
tuple, netns, ok := tupleFromPidFd(pid, fd)
log.Debugf("EbpfTracker: got fd-install event: pid=%d fd=%d -> tuple=%s netns=%s ok=%v", pid, fd, tuple, netns, ok)
if !ok {
return
}
conn := ebpfConnection{
incoming: true,
tuple: tuple,
pid: pid,
networkNamespace: netns,
}
t.openConnections[tuple.String()] = conn
if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) {
t.tracer.RemoveFdInstallWatcher(uint32(pid))
}
}

func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) {
t.Lock()
defer t.Unlock()
Expand Down Expand Up @@ -160,6 +235,8 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid
} else {
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace)
}
default:
log.Debugf("EbpfTracker: unknown event: %s (%d)", ev, ev)
}
}

Expand All @@ -178,7 +255,7 @@ func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) {
t.closedConnections = t.closedConnections[:0]
}

func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) {
func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) {
t.readyToHandleConnections = true
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
Expand All @@ -204,6 +281,10 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples
t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID)
}
}
for _, p := range processesWaitingInAccept {
t.tracer.AddFdInstallWatcher(uint32(p))
log.Debugf("EbpfTracker: install fd-install watcher: pid=%d", p)
}
}

func (t *EbpfTracker) isReadyToHandleConnections() bool {
Expand Down
16 changes: 16 additions & 0 deletions probe/endpoint/procspy/proc_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package procspy

import (
"bytes"
"fmt"
)

// ReadTCPFiles reads the proc files tcp and tcp6 for a pid
func ReadTCPFiles(pid int, buf *bytes.Buffer) (int64, error) {
return 0, fmt.Errorf("not supported on non-Linux systems")
}

// ReadNetnsFromPID gets the netns inode of the specified pid
func ReadNetnsFromPID(pid int) (uint64, error) {
return 0, fmt.Errorf("not supported on non-Linux systems")
}
2 changes: 1 addition & 1 deletion probe/endpoint/procspy/proc_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestWalkProcPid(t *testing.T) {
defer fs_hook.Restore()

buf := bytes.Buffer{}
walker := process.NewWalker(procRoot)
walker := process.NewWalker(procRoot, false)
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
pWalker := newPidWalker(walker, ticker.C, 1)
Expand Down
Loading

0 comments on commit 708c089

Please sign in to comment.