Skip to content

Commit

Permalink
WIP: ebpf: handle fd_install
Browse files Browse the repository at this point in the history
  • Loading branch information
alban committed May 15, 2017
1 parent 96af9f1 commit d438e74
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 31 deletions.
12 changes: 10 additions & 2 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin
// once to initialize ebpfTracker
func (t *connectionTracker) getInitialState() {
var processCache *process.CachingWalker
processCache = process.NewCachingWalker(process.NewWalker(t.conf.ProcRoot))
walker := process.NewWalker(t.conf.ProcRoot, true)
processCache = process.NewCachingWalker(walker)
processCache.Tick()

scanner := procspy.NewSyncConnectionScanner(processCache)
Expand All @@ -194,7 +195,14 @@ func (t *connectionTracker) getInitialState() {
}
scanner.Stop()

t.ebpfTracker.feedInitialConnections(conns, seenTuples, report.MakeHostNodeID(t.conf.HostID))
processesWaitingInAccept := []int{}
processCache.Walk(func(p, prev process.Process) {
if p.IsWaitingInAccept {
processesWaitingInAccept = append(processesWaitingInAccept, p.PID)
}
})

t.ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(t.conf.HostID))
}

func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {
Expand Down
28 changes: 26 additions & 2 deletions probe/endpoint/ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/tcptracer-bpf/pkg/tracer"
)

Expand All @@ -23,7 +24,7 @@ type ebpfConnection struct {
type eventTracker interface {
handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string)
walkConnections(f func(ebpfConnection))
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string)
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string)
isReadyToHandleConnections() bool
isDead() bool
stop()
Expand Down Expand Up @@ -125,6 +126,11 @@ func lostCb(count uint64) {
ebpfTracker.stop()
}

func tupleFromPidFd(pid int, fd int) fourTuple {
// TODO: read in /proc/pid/net/tcp...
return fourTuple{"192.168.35.216", "192.168.35.216", 8181, 8181}
}

func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) {
t.Lock()
defer t.Unlock()
Expand Down Expand Up @@ -160,6 +166,20 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid
} else {
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace)
}
case tracer.EventFdInstall:
fd, _ := strconv.Atoi(networkNamespace) // FIXME
log.Debugf("EbpfTracker: got fd-install event: pid=%d fd=%d", pid, fd)
tuple := tupleFromPidFd(pid, fd)
conn := ebpfConnection{
incoming: true,
tuple: tuple,
pid: pid,
networkNamespace: "4026531969", // TODO: read /proc/$pid/ns/net
}
t.openConnections[tuple.String()] = conn
if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) {
t.tracer.RemoveFdInstallWatcher(uint32(pid))
}
}
}

Expand All @@ -178,7 +198,7 @@ func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) {
t.closedConnections = t.closedConnections[:0]
}

func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) {
func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) {
t.readyToHandleConnections = true
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
Expand All @@ -204,6 +224,10 @@ func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples
t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID)
}
}
for _, p := range processesWaitingInAccept {
t.tracer.AddFdInstallWatcher(uint32(p))
log.Debugf("EbpfTracker: install fd-install watcher: pid=%d", p)
}
}

func (t *EbpfTracker) isReadyToHandleConnections() bool {
Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/procspy/proc_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestWalkProcPid(t *testing.T) {
defer fs_hook.Restore()

buf := bytes.Buffer{}
walker := process.NewWalker(procRoot)
walker := process.NewWalker(procRoot, false)
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
pWalker := newPidWalker(walker, ticker.C, 1)
Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/procspy/spy_linux_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestLinuxConnections(t *testing.T) {
fs_hook.Mock(mockFS)
defer fs_hook.Restore()
scanner := NewConnectionScanner(process.NewWalker("/proc"))
scanner := NewConnectionScanner(process.NewWalker("/proc", false))
defer scanner.Stop()

// let the background scanner finish its first pass
Expand Down
19 changes: 10 additions & 9 deletions probe/process/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import "sync"

// Process represents a single process.
type Process struct {
PID, PPID int
Name string
Cmdline string
Threads int
Jiffies uint64
RSSBytes uint64
RSSBytesLimit uint64
OpenFilesCount int
OpenFilesLimit uint64
PID, PPID int
Name string
Cmdline string
Threads int
Jiffies uint64
RSSBytes uint64
RSSBytesLimit uint64
OpenFilesCount int
OpenFilesLimit uint64
IsWaitingInAccept bool
}

// Walker is something that walks the /proc directory
Expand Down
60 changes: 47 additions & 13 deletions probe/process/walker_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
)

type walker struct {
procRoot string
procRoot string
gatheringWaitingInAccept bool
}

var (
Expand All @@ -38,8 +39,11 @@ const (
)

// NewWalker creates a new process Walker.
func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
func NewWalker(procRoot string, gatheringWaitingInAccept bool) Walker {
return &walker{
procRoot: procRoot,
gatheringWaitingInAccept: gatheringWaitingInAccept,
}
}

// skipNSpaces skips nSpaces in buf and updates the cursor 'pos'
Expand Down Expand Up @@ -166,6 +170,30 @@ func (w *walker) readCmdline(filename string) (cmdline, name string) {
return
}

// IsProcInAccept returns true if the process has a at least one thread
// blocked on the accept() system call
func IsProcInAccept(procRoot, pid string) (ret bool) {
tasks, err := fs.ReadDirNames(path.Join(procRoot, pid, "task"))
if err != nil {
// if the process has terminated, it is obviously not blocking
// on the accept system call
return false
}

for _, tid := range tasks {
buf, err := fs.ReadFile(path.Join(procRoot, pid, "task", tid, "wchan"))
if err != nil {
// if a thread has terminated, it is obviously not
// blocking on the accept system call
continue
}
if strings.TrimSpace(string(buf)) == "inet_csk_accept" {
return true
}
}
return false
}

// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
Expand Down Expand Up @@ -215,17 +243,23 @@ func (w *walker) Walk(f func(Process, Process)) error {
cmdlineCache.Set([]byte(filename), []byte(fmt.Sprintf("%s\x00%s", cmdline, name)), cmdlineCacheTimeout)
}

isWaitingInAccept := false
if w.gatheringWaitingInAccept {
isWaitingInAccept = IsProcInAccept(w.procRoot, filename)
}

f(Process{
PID: pid,
PPID: ppid,
Name: name,
Cmdline: cmdline,
Threads: threads,
Jiffies: jiffies,
RSSBytes: rss,
RSSBytesLimit: rssLimit,
OpenFilesCount: openFilesCount,
OpenFilesLimit: openFilesLimit,
PID: pid,
PPID: ppid,
Name: name,
Cmdline: cmdline,
Threads: threads,
Jiffies: jiffies,
RSSBytes: rss,
RSSBytesLimit: rssLimit,
OpenFilesCount: openFilesCount,
OpenFilesLimit: openFilesLimit,
IsWaitingInAccept: isWaitingInAccept,
}, Process{})
}

Expand Down
2 changes: 1 addition & 1 deletion probe/process/walker_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestWalker(t *testing.T) {
}

have := map[int]process.Process{}
walker := process.NewWalker("/proc")
walker := process.NewWalker("/proc", false)
err := walker.Walk(func(p, _ process.Process) {
have[p.PID] = p
})
Expand Down
2 changes: 1 addition & 1 deletion probe/process/walker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestBasicWalk(t *testing.T) {
procRoot = "/proc"
procFunc = func(process.Process, process.Process) {}
)
if err := process.NewWalker(procRoot).Walk(procFunc); err != nil {
if err := process.NewWalker(procRoot, false).Walk(procFunc); err != nil {
t.Fatal(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {

var processCache *process.CachingWalker
if flags.procEnabled {
processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot))
processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot, false))
p.AddTicker(processCache)
p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies, flags.noCommandLineArguments))
}
Expand Down

0 comments on commit d438e74

Please sign in to comment.