Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle fdinstall events from tcptracer-bpf (aka "accept before kretprobe" issue) #2518

Merged
merged 4 commits into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions integration/314_container_accept_before_kretprobe_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#! /bin/bash

# shellcheck disable=SC1091
. ./config.sh

start_suite "Test accept before kretprobe, see https://github.com/weaveworks/tcptracer-bpf/issues/10"

weave_on "$HOST1" launch

# Launch the server before Scope to make sure it calls accept() before Scope's
# kretprobe on the accept function is installed. We use busybox' nc instead of
# Alpine's nc so that it blocks on the accept() syscall.
weave_on "$HOST1" run -d --name server busybox /bin/sh -c "while true; do \
date ;
sleep 1 ;
done | nc -l -p 8080"

scope_on "$HOST1" launch --probe.ebpf.connections=true
wait_for_containers "$HOST1" 60 server
has_container "$HOST1" server

weave_on "$HOST1" run -d --name client busybox /bin/sh -c "ping -c 5 server.weave.local; \
while true; do \
date ;
sleep 1 ;
done | nc server.weave.local 8080"

wait_for_containers "$HOST1" 60 server client

has_container "$HOST1" client

list_containers "$HOST1"
list_connections "$HOST1"

has_connection containers "$HOST1" client server

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.


endpoints_have_ebpf "$HOST1"

scope_end_suite
7 changes: 4 additions & 3 deletions integration/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ scope_end_suite() {
list_containers() {
local host=$1
echo "Listing containers on ${host}:"
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("metadata")) | .metadata[] | select(.id == "docker_image_name") | .value'
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("metadata")) | { "image": .metadata[] | select(.id == "docker_image_name") | .value, "label": .label, "id": .id} | .id + " (" + .image + ", " + .label + ")"'

This comment was marked as abuse.

echo
}

list_connections() {
local host=$1
echo "Listing connections on ${host}:"
curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("adjacency")) | { "from": .id, "to": .adjacency[]} | .from + " -> " + .to'

curl -s "http://${host}:4040/api/topology/containers?system=show" | jq -r '.nodes[] | select(has("adjacency")) | { "from_name": .label, "from_id": .id, "to": .adjacency[]} | .from_id + " (" + .from_name+ ") -> " + .to'
echo
}

# this checks we have a named node in the given view
Expand Down
1 change: 1 addition & 0 deletions integration/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ setup_host() {
echo Prefetching Images on "$HOST"
docker_on "$HOST" pull peterbourgon/tns-db
docker_on "$HOST" pull alpine
docker_on "$HOST" pull busybox
docker_on "$HOST" pull nginx
}

Expand Down
12 changes: 10 additions & 2 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin
// once to initialize ebpfTracker
func (t *connectionTracker) getInitialState() {
var processCache *process.CachingWalker
processCache = process.NewCachingWalker(process.NewWalker(t.conf.ProcRoot))
walker := process.NewWalker(t.conf.ProcRoot, true)
processCache = process.NewCachingWalker(walker)
processCache.Tick()

scanner := procspy.NewSyncConnectionScanner(processCache)
Expand All @@ -194,7 +195,14 @@ func (t *connectionTracker) getInitialState() {
}
scanner.Stop()

t.ebpfTracker.feedInitialConnections(conns, seenTuples, report.MakeHostNodeID(t.conf.HostID))
processesWaitingInAccept := []int{}
processCache.Walk(func(p, prev process.Process) {
if p.IsWaitingInAccept {
processesWaitingInAccept = append(processesWaitingInAccept, p.PID)
}
})

t.ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(t.conf.HostID))
}

func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {
Expand Down
89 changes: 85 additions & 4 deletions probe/endpoint/ebpf.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package endpoint

import (
"bytes"
"fmt"
"regexp"
"strconv"
"sync"
"syscall"

log "github.com/Sirupsen/logrus"
"github.com/weaveworks/common/fs"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/tcptracer-bpf/pkg/tracer"
)

Expand All @@ -23,7 +27,7 @@ type ebpfConnection struct {
type eventTracker interface {
handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string)
walkConnections(f func(ebpfConnection))
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string)
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string)
isReadyToHandleConnections() bool
isDead() bool
stop()
Expand Down Expand Up @@ -111,8 +115,12 @@ func tcpEventCbV4(e tracer.TcpV4) {

lastTimestampV4 = e.Timestamp

tuple := fourTuple{e.SAddr.String(), e.DAddr.String(), e.SPort, e.DPort}
ebpfTracker.handleConnection(e.Type, tuple, int(e.Pid), strconv.Itoa(int(e.NetNS)))
if e.Type == tracer.EventFdInstall {
ebpfTracker.handleFdInstall(e.Type, int(e.Pid), int(e.Fd))
} else {
tuple := fourTuple{e.SAddr.String(), e.DAddr.String(), e.SPort, e.DPort}
ebpfTracker.handleConnection(e.Type, tuple, int(e.Pid), strconv.Itoa(int(e.NetNS)))
}
}

func tcpEventCbV6(e tracer.TcpV6) {
Expand All @@ -125,6 +133,73 @@ func lostCb(count uint64) {
ebpfTracker.stop()
}

func tupleFromPidFd(pid int, fd int) (tuple fourTuple, netns string, ok bool) {

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

// read /proc/$pid/ns/net
//
// probe/endpoint/procspy/proc_linux.go supports Linux < 3.8 but we
// don't need that here since ebpf-enabled kernels will be > 3.8
netnsIno, err := procspy.ReadNetnsFromPID(pid)
if err != nil {
log.Debugf("netns proc file for pid %d disappeared before we could read it: %v", pid, err)
return fourTuple{}, "", false
}
netns = fmt.Sprintf("%d", netnsIno)

// find /proc/$pid/fd/$fd's ino
fdFilename := fmt.Sprintf("/proc/%d/fd/%d", pid, fd)
var statFdFile syscall.Stat_t
if err := fs.Stat(fdFilename, &statFdFile); err != nil {
log.Debugf("proc file %q disappeared before we could read it", fdFilename)
return fourTuple{}, "", false
}

if statFdFile.Mode&syscall.S_IFMT != syscall.S_IFSOCK {
log.Errorf("file %q is not a socket", fdFilename)
return fourTuple{}, "", false
}
ino := statFdFile.Ino

// read both /proc/pid/net/{tcp,tcp6}
buf := bytes.NewBuffer(make([]byte, 0, 5000))
if _, err := procspy.ReadTCPFiles(pid, buf); err != nil {
log.Debugf("TCP proc file for pid %d disappeared before we could read it: %v", pid, err)
return fourTuple{}, "", false
}

// find /proc/$pid/fd/$fd's ino in /proc/pid/net/tcp
pn := procspy.NewProcNet(buf.Bytes())
for {
n := pn.Next()
if n == nil {
log.Debugf("connection for proc file %q not found. buf=%q", fdFilename, buf.String())
break
}
if n.Inode == ino {
return fourTuple{n.LocalAddress.String(), n.RemoteAddress.String(), n.LocalPort, n.RemotePort}, netns, true
}
}

return fourTuple{}, "", false
}

func (t *EbpfTracker) handleFdInstall(ev tracer.EventType, pid int, fd int) {
tuple, netns, ok := tupleFromPidFd(pid, fd)
log.Debugf("EbpfTracker: got fd-install event: pid=%d fd=%d -> tuple=%s netns=%s ok=%v", pid, fd, tuple, netns, ok)
if !ok {
return
}
conn := ebpfConnection{
incoming: true,
tuple: tuple,
pid: pid,
networkNamespace: netns,
}
t.openConnections[tuple.String()] = conn
if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) {
t.tracer.RemoveFdInstallWatcher(uint32(pid))
}
}

func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) {
t.Lock()
defer t.Unlock()
Expand Down Expand Up @@ -160,6 +235,8 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid
} else {
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace)
}
default:
log.Debugf("EbpfTracker: unknown event: %s (%d)", ev, ev)
}
}

Expand All @@ -178,7 +255,7 @@ func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) {
t.closedConnections = t.closedConnections[:0]
}

func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) {
func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) {
t.readyToHandleConnections = true
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
Expand All @@ -204,6 +281,10 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples
t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID)
}
}
for _, p := range processesWaitingInAccept {
t.tracer.AddFdInstallWatcher(uint32(p))
log.Debugf("EbpfTracker: install fd-install watcher: pid=%d", p)
}
}

func (t *EbpfTracker) isReadyToHandleConnections() bool {
Expand Down
16 changes: 16 additions & 0 deletions probe/endpoint/procspy/proc_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package procspy

import (
"bytes"
"fmt"
)

// ReadTCPFiles reads the proc files tcp and tcp6 for a pid
func ReadTCPFiles(pid int, buf *bytes.Buffer) (int64, error) {
return 0, fmt.Errorf("not supported on non-Linux systems")
}

// ReadNetnsFromPID gets the netns inode of the specified pid
func ReadNetnsFromPID(pid int) (uint64, error) {
return 0, fmt.Errorf("not supported on non-Linux systems")
}
2 changes: 1 addition & 1 deletion probe/endpoint/procspy/proc_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestWalkProcPid(t *testing.T) {
defer fs_hook.Restore()

buf := bytes.Buffer{}
walker := process.NewWalker(procRoot)
walker := process.NewWalker(procRoot, false)
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
pWalker := newPidWalker(walker, ticker.C, 1)
Expand Down
Loading