From f1cccbdf6f765e44ed5c48ee09d3594e3a51aefb Mon Sep 17 00:00:00 2001 From: Alban Crequy Date: Thu, 11 May 2017 15:18:06 +0200 Subject: [PATCH] WIP: ebpf: handle fd_install --- probe/endpoint/connection_tracker.go | 12 ++++- probe/endpoint/ebpf.go | 28 ++++++++++- probe/endpoint/procspy/proc_internal_test.go | 2 +- .../procspy/spy_linux_internal_test.go | 2 +- probe/process/walker.go | 19 ++++---- probe/process/walker_linux.go | 46 +++++++++++++------ probe/process/walker_linux_test.go | 2 +- prog/probe.go | 2 +- 8 files changed, 83 insertions(+), 30 deletions(-) diff --git a/probe/endpoint/connection_tracker.go b/probe/endpoint/connection_tracker.go index 6b14de207d..dac8ff1acc 100644 --- a/probe/endpoint/connection_tracker.go +++ b/probe/endpoint/connection_tracker.go @@ -171,7 +171,8 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin // once to initialize ebpfTracker func (t *connectionTracker) getInitialState() { var processCache *process.CachingWalker - processCache = process.NewCachingWalker(process.NewWalker(t.conf.ProcRoot)) + walker := process.NewWalker(t.conf.ProcRoot, true) + processCache = process.NewCachingWalker(walker) processCache.Tick() scanner := procspy.NewSyncConnectionScanner(processCache) @@ -194,7 +195,14 @@ func (t *connectionTracker) getInitialState() { } scanner.Stop() - t.ebpfTracker.feedInitialConnections(conns, seenTuples, report.MakeHostNodeID(t.conf.HostID)) + processesWaitingInAccept := []int{} + processCache.Walk(func(p, prev process.Process) { + if p.IsWaitingInAccept { + processesWaitingInAccept = append(processesWaitingInAccept, p.PID) + } + }) + + t.ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(t.conf.HostID)) } func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error { diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go index 83726132ae..3eefee448e 100644 --- a/probe/endpoint/ebpf.go +++ b/probe/endpoint/ebpf.go @@ -9,6 +9,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/weaveworks/scope/probe/endpoint/procspy" "github.com/weaveworks/scope/probe/host" + "github.com/weaveworks/scope/probe/process" "github.com/weaveworks/tcptracer-bpf/pkg/tracer" ) @@ -23,7 +24,7 @@ type ebpfConnection struct { type eventTracker interface { handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) walkConnections(f func(ebpfConnection)) - feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) + feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) isReadyToHandleConnections() bool isDead() bool stop() @@ -125,6 +126,11 @@ func lostCb(count uint64) { ebpfTracker.stop() } +func tupleFromPidFd(pid int, fd int) fourTuple { + // TODO: read in /proc/pid/net/tcp... + return fourTuple{"192.168.35.216", "192.168.35.216", 8181, 8181} +} + func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) { t.Lock() defer t.Unlock() @@ -160,6 +166,20 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid } else { log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace) } + case tracer.EventFdInstall: + fd, _ := strconv.Atoi(networkNamespace) // FIXME + log.Debugf("EbpfTracker: got fd-install event: pid=%d fd=%d", pid, fd) + tuple := tupleFromPidFd(pid, fd) + conn := ebpfConnection{ + incoming: true, + tuple: tuple, + pid: pid, + networkNamespace: "4026531969", // TODO: read /proc/$pid/ns/net + } + t.openConnections[tuple.String()] = conn + if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) { + t.tracer.RemoveFdInstallWatcher(uint32(pid)) + } } } @@ -178,7 +198,7 @@ func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) { t.closedConnections = t.closedConnections[:0] } -func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) { +func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) { t.readyToHandleConnections = true for conn := conns.Next(); conn != nil; conn = conns.Next() { var ( @@ -204,6 +224,10 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID) } } + for _, p := range processesWaitingInAccept { + t.tracer.AddFdInstallWatcher(uint32(p)) + log.Debugf("EbpfTracker: install fd-install watcher: pid=%d", p) + } } func (t *EbpfTracker) isReadyToHandleConnections() bool { diff --git a/probe/endpoint/procspy/proc_internal_test.go b/probe/endpoint/procspy/proc_internal_test.go index aeba25cf4e..8dc510c21f 100644 --- a/probe/endpoint/procspy/proc_internal_test.go +++ b/probe/endpoint/procspy/proc_internal_test.go @@ -62,7 +62,7 @@ func TestWalkProcPid(t *testing.T) { defer fs_hook.Restore() buf := bytes.Buffer{} - walker := process.NewWalker(procRoot) + walker := process.NewWalker(procRoot, false) ticker := time.NewTicker(time.Millisecond) defer ticker.Stop() pWalker := newPidWalker(walker, ticker.C, 1) diff --git a/probe/endpoint/procspy/spy_linux_internal_test.go b/probe/endpoint/procspy/spy_linux_internal_test.go index 6f658c6f7a..da7924994b 100644 --- a/probe/endpoint/procspy/spy_linux_internal_test.go +++ b/probe/endpoint/procspy/spy_linux_internal_test.go @@ -14,7 +14,7 @@ import ( func TestLinuxConnections(t *testing.T) { fs_hook.Mock(mockFS) defer fs_hook.Restore() - scanner := NewConnectionScanner(process.NewWalker("/proc")) + scanner := NewConnectionScanner(process.NewWalker("/proc", false)) defer scanner.Stop() // let the background scanner finish its first pass diff --git a/probe/process/walker.go b/probe/process/walker.go index af77217be6..177c212056 100644 --- a/probe/process/walker.go +++ b/probe/process/walker.go @@ -4,15 +4,16 @@ import "sync" // Process represents a single process. type Process struct { - PID, PPID int - Name string - Cmdline string - Threads int - Jiffies uint64 - RSSBytes uint64 - RSSBytesLimit uint64 - OpenFilesCount int - OpenFilesLimit uint64 + PID, PPID int + Name string + Cmdline string + Threads int + Jiffies uint64 + RSSBytes uint64 + RSSBytesLimit uint64 + OpenFilesCount int + OpenFilesLimit uint64 + IsWaitingInAccept bool } // Walker is something that walks the /proc directory diff --git a/probe/process/walker_linux.go b/probe/process/walker_linux.go index 2191044e31..461ca7270d 100644 --- a/probe/process/walker_linux.go +++ b/probe/process/walker_linux.go @@ -17,7 +17,8 @@ import ( ) type walker struct { - procRoot string + procRoot string + gatheringWaitingInAccept bool } var ( @@ -38,8 +39,11 @@ const ( ) // NewWalker creates a new process Walker. -func NewWalker(procRoot string) Walker { - return &walker{procRoot: procRoot} +func NewWalker(procRoot string, gatheringWaitingInAccept bool) Walker { + return &walker{ + procRoot: procRoot, + gatheringWaitingInAccept: gatheringWaitingInAccept, + } } // skipNSpaces skips nSpaces in buf and updates the cursor 'pos' @@ -166,6 +170,16 @@ func (w *walker) readCmdline(filename string) (cmdline, name string) { return } +func IsProcInAccept(procRoot, filename string) (ret bool) { + // TODO: to support multithreaded apps, we need to look at /proc/*/tasks/*/wchan... + if buf, err := fs.ReadFile(path.Join(procRoot, filename, "wchan")); err == nil { + if strings.TrimSpace(string(buf)) == "inet_csk_accept" { + return true + } + } + return false +} + // Walk walks the supplied directory (expecting it to look like /proc) // and marshalls the files into instances of Process, which it then // passes one-by-one to the supplied function. Walk is only made public @@ -215,17 +229,23 @@ func (w *walker) Walk(f func(Process, Process)) error { cmdlineCache.Set([]byte(filename), []byte(fmt.Sprintf("%s\x00%s", cmdline, name)), cmdlineCacheTimeout) } + isWaitingInAccept := false + if w.gatheringWaitingInAccept { + isWaitingInAccept = IsProcInAccept(w.procRoot, filename) + } + f(Process{ - PID: pid, - PPID: ppid, - Name: name, - Cmdline: cmdline, - Threads: threads, - Jiffies: jiffies, - RSSBytes: rss, - RSSBytesLimit: rssLimit, - OpenFilesCount: openFilesCount, - OpenFilesLimit: openFilesLimit, + PID: pid, + PPID: ppid, + Name: name, + Cmdline: cmdline, + Threads: threads, + Jiffies: jiffies, + RSSBytes: rss, + RSSBytesLimit: rssLimit, + OpenFilesCount: openFilesCount, + OpenFilesLimit: openFilesLimit, + IsWaitingInAccept: isWaitingInAccept, }, Process{}) } diff --git a/probe/process/walker_linux_test.go b/probe/process/walker_linux_test.go index 1da50deb79..cfa18d8d97 100644 --- a/probe/process/walker_linux_test.go +++ b/probe/process/walker_linux_test.go @@ -88,7 +88,7 @@ func TestWalker(t *testing.T) { } have := map[int]process.Process{} - walker := process.NewWalker("/proc") + walker := process.NewWalker("/proc", false) err := walker.Walk(func(p, _ process.Process) { have[p.PID] = p }) diff --git a/prog/probe.go b/prog/probe.go index 33c579cc45..1fe6d95e57 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -158,7 +158,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { var processCache *process.CachingWalker if flags.procEnabled { - processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot)) + processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot, false)) p.AddTicker(processCache) p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies, flags.noCommandLineArguments)) }