Skip to content

Commit

Permalink
Merge pull request #320 from weaveworks/process-names-on-darwin
Browse files Browse the repository at this point in the history
Naïve process walker for Darwin
  • Loading branch information
peterbourgon committed Jul 20, 2015
2 parents 69265d2 + b585a36 commit fb7eed1
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 204 deletions.
40 changes: 16 additions & 24 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"time"
Expand All @@ -24,8 +23,6 @@ import (

var version = "dev" // set at build time

const linux = "linux" // runtime.GOOS

func main() {
var (
httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server")
Expand All @@ -34,7 +31,7 @@ func main() {
listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address")
prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)")
spyProcs = flag.Bool("processes", true, "report processes (needs root)")
dockerEnabled = flag.Bool("docker", true, "collect Docker-related attributes for processes")
dockerEnabled = flag.Bool("docker", false, "collect Docker-related attributes for processes")
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes")
dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name")
weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router")
Expand Down Expand Up @@ -82,25 +79,22 @@ func main() {
processCache *process.CachingWalker
)

// TODO provide an alternate implementation for Darwin.
if runtime.GOOS == linux {
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))

if *dockerEnabled {
if err = report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))

dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()
if *dockerEnabled {
if err := report.AddLocalBridge(*dockerBridge); err != nil {
log.Fatalf("failed to get docker bridge address: %v", err)
}

taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
dockerRegistry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Fatalf("failed to start docker registry: %v", err)
}
defer dockerRegistry.Stop()

taggers = append(taggers, docker.NewTagger(dockerRegistry, processCache))
reporters = append(reporters, docker.NewReporter(dockerRegistry, hostID))
}

if *weaveRouterAddr != "" {
Expand Down Expand Up @@ -131,10 +125,8 @@ func main() {
r = report.MakeReport()

case <-spyTick:
if processCache != nil {
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}

for _, reporter := range reporters {
Expand Down
2 changes: 1 addition & 1 deletion probe/process/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *Reporter) Report() (report.Report, error) {

func (r *Reporter) processTopology() (report.Topology, error) {
t := report.NewTopology()
err := r.walker.Walk(func(p *Process) {
err := r.walker.Walk(func(p Process) {
pidstr := strconv.Itoa(p.PID)
nodeID := report.MakeProcessNodeID(r.scope, pidstr)
t.NodeMetadatas[nodeID] = report.NewNodeMetadata(map[string]string{
Expand Down
6 changes: 3 additions & 3 deletions probe/process/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
)

type mockWalker struct {
processes []*process.Process
processes []process.Process
}

func (m *mockWalker) Walk(f func(*process.Process)) error {
func (m *mockWalker) Walk(f func(process.Process)) error {
for _, p := range m.processes {
f(p)
}
Expand All @@ -22,7 +22,7 @@ func (m *mockWalker) Walk(f func(*process.Process)) error {

func TestReporter(t *testing.T) {
walker := &mockWalker{
processes: []*process.Process{
processes: []process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
Expand Down
6 changes: 3 additions & 3 deletions probe/process/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type Tree interface {
}

type tree struct {
processes map[int]*Process
processes map[int]Process
}

// NewTree returns a new Tree that can be polled.
func NewTree(walker Walker) (Tree, error) {
pt := tree{processes: map[int]*Process{}}
err := walker.Walk(func(p *Process) {
pt := tree{processes: map[int]Process{}}
err := walker.Walk(func(p Process) {
pt.processes[p.PID] = p
})

Expand Down
2 changes: 1 addition & 1 deletion probe/process/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestTree(t *testing.T) {
walker := &mockWalker{
processes: []*process.Process{
processes: []process.Process{
{PID: 1, PPID: 0, Comm: "init"},
{PID: 2, PPID: 1, Comm: "bash"},
{PID: 3, PPID: 1, Comm: "apache", Threads: 2},
Expand Down
89 changes: 6 additions & 83 deletions probe/process/walker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
package process

import (
"bytes"
"io/ioutil"
"path"
"strconv"
"strings"
"sync"
)
import "sync"

// Process represents a single process.
type Process struct {
Expand All @@ -19,83 +12,13 @@ type Process struct {

// Walker is something that walks the /proc directory
type Walker interface {
Walk(func(*Process)) error
}

// Hooks exposed for mocking
var (
ReadDir = ioutil.ReadDir
ReadFile = ioutil.ReadFile
)

type walker struct {
procRoot string
}

// NewWalker creates a new process Walker
func NewWalker(procRoot string) Walker {
return &walker{procRoot: procRoot}
}

// Walk walks the supplied directory (expecting it to look like /proc)
// and marshalls the files into instances of Process, which it then
// passes one-by-one to the supplied function. Walk is only made public
// so that is can be tested.
func (w *walker) Walk(f func(*Process)) error {
dirEntries, err := ReadDir(w.procRoot)
if err != nil {
return err
}

for _, dirEntry := range dirEntries {
filename := dirEntry.Name()
pid, err := strconv.Atoi(filename)
if err != nil {
continue
}

stat, err := ReadFile(path.Join(w.procRoot, filename, "stat"))
if err != nil {
continue
}
splits := strings.Fields(string(stat))
ppid, err := strconv.Atoi(splits[3])
if err != nil {
return err
}

threads, err := strconv.Atoi(splits[19])
if err != nil {
return err
}

cmdline := ""
if cmdlineBuf, err := ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil {
cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1)
cmdline = string(cmdlineBuf)
}

comm := "(unknown)"
if commBuf, err := ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil {
comm = strings.TrimSpace(string(commBuf))
}

f(&Process{
PID: pid,
PPID: ppid,
Comm: comm,
Cmdline: cmdline,
Threads: threads,
})
}

return nil
Walk(func(Process)) error
}

// CachingWalker is a walker than caches a copy of the output from another
// Walker, and then allows other concurrent readers to Walk that copy.
type CachingWalker struct {
cache []*Process
cache []Process
cacheLock sync.RWMutex
source Walker
}
Expand All @@ -106,7 +29,7 @@ func NewCachingWalker(source Walker) *CachingWalker {
}

// Walk walks a cached copy of process list
func (c *CachingWalker) Walk(f func(*Process)) error {
func (c *CachingWalker) Walk(f func(Process)) error {
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()

Expand All @@ -118,8 +41,8 @@ func (c *CachingWalker) Walk(f func(*Process)) error {

// Update updates cached copy of process list
func (c *CachingWalker) Update() error {
newCache := []*Process{}
err := c.source.Walk(func(p *Process) {
newCache := []Process{}
err := c.source.Walk(func(p Process) {
newCache = append(newCache, p)
})
if err != nil {
Expand Down
93 changes: 93 additions & 0 deletions probe/process/walker_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package process

import (
"fmt"
"os/exec"
"strconv"
"strings"
)

// NewWalker returns a Darwin (lsof-based) walker.
func NewWalker(_ string) Walker {
return &walker{}
}

type walker struct{}

const (
lsofBinary = "lsof"
lsofFields = "cn" // parseLSOF() depends on the order
)

// These functions copied from procspy.

func (walker) Walk(f func(Process)) error {
output, err := exec.Command(
lsofBinary,
"-i", // only Internet files
"-n", "-P", // no number resolving
"-w", // no warnings
"-F", lsofFields, // \n based output of only the fields we want.
).CombinedOutput()
if err != nil {
return err
}

processes, err := parseLSOF(string(output))
if err != nil {
return err
}

for _, process := range processes {
f(process)
}
return nil
}

func parseLSOF(output string) (map[string]Process, error) {
var (
processes = map[string]Process{} // Local addr -> Proc
process Process
)
for _, line := range strings.Split(output, "\n") {
if len(line) <= 1 {
continue
}

var (
field = line[0]
value = line[1:]
)
switch field {
case 'p':
pid, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("invalid 'p' field in lsof output: %#v", value)
}
process.PID = pid

case 'c':
process.Comm = value

case 'n':
// 'n' is the last field, with '-F cn'
// format examples:
// "192.168.2.111:44013->54.229.241.196:80"
// "[2003:45:2b57:8900:1869:2947:f942:aba7]:55711->[2a00:1450:4008:c01::11]:443"
// "*:111" <- a listen
addresses := strings.SplitN(value, "->", 2)
if len(addresses) != 2 {
// That's a listen entry.
continue
}
processes[addresses[0]] = Process{
PID: process.PID,
Comm: process.Comm,
}

default:
return nil, fmt.Errorf("unexpected lsof field: %c in %#v", field, value)
}
}
return processes, nil
}
Loading

0 comments on commit fb7eed1

Please sign in to comment.