Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use caching walker in procspy #741

Merged
merged 3 commits into from
Dec 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions common/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,78 @@ package fs

import (
"io"
"io/ioutil"
"os"
"syscall"
)

// Open is a mockable version of os.Open
var Open = func(path string) (io.ReadWriteCloser, error) {
// Interface is the filesystem interface type.
type Interface interface {
ReadDir(string) ([]os.FileInfo, error)
ReadFile(string) ([]byte, error)
Lstat(string, *syscall.Stat_t) error
Stat(string, *syscall.Stat_t) error
Open(string) (io.ReadWriteCloser, error)
}

type realFS struct{}

// FS is the way you should access the filesystem.
var fs Interface = realFS{}

func (realFS) ReadDir(path string) ([]os.FileInfo, error) {
return ioutil.ReadDir(path)
}

func (realFS) ReadFile(path string) ([]byte, error) {
return ioutil.ReadFile(path)
}

func (realFS) Lstat(path string, stat *syscall.Stat_t) error {
return syscall.Lstat(path, stat)
}

func (realFS) Stat(path string, stat *syscall.Stat_t) error {
return syscall.Stat(path, stat)
}

func (realFS) Open(path string) (io.ReadWriteCloser, error) {
return os.Open(path)
}

// trampolines here to allow users to do fs.ReadDir etc

// ReadDir see ioutil.ReadDir
func ReadDir(path string) ([]os.FileInfo, error) {
return fs.ReadDir(path)
}

// ReadFile see ioutil.ReadFile
func ReadFile(path string) ([]byte, error) {
return fs.ReadFile(path)
}

// Lstat see syscall.Lstat
func Lstat(path string, stat *syscall.Stat_t) error {
return fs.Lstat(path, stat)
}

// Stat see syscall.Stat
func Stat(path string, stat *syscall.Stat_t) error {
return fs.Stat(path, stat)
}

// Open see os.Open
func Open(path string) (io.ReadWriteCloser, error) {
return fs.Open(path)
}

// Mock is used to switch out the filesystem for a mock.
func Mock(mock Interface) {
fs = mock
}

// Restore puts back the real filesystem.
func Restore() {
fs = realFS{}
}
2 changes: 1 addition & 1 deletion probe/endpoint/procspy/benchmark_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func benchmarkConnections(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
cbConnections(false)
cbConnections(false, nil)
}
}

Expand Down
6 changes: 5 additions & 1 deletion probe/endpoint/procspy/fixture.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package procspy

import (
"github.com/weaveworks/scope/probe/process"
)

// SetFixtures declares constant Connection and ConnectionProcs which will
// always be returned by the package-level Connections and Processes
// functions. It's designed to be used in tests.
Expand All @@ -19,7 +23,7 @@ func (f *fixedConnIter) Next() *Connection {

// SetFixtures is used in test scenarios to have known output.
func SetFixtures(c []Connection) {
cbConnections = func(bool) (ConnIter, error) {
cbConnections = func(bool, process.Walker) (ConnIter, error) {
f := fixedConnIter(c)
return &f, nil
}
Expand Down
75 changes: 13 additions & 62 deletions probe/endpoint/procspy/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ package procspy

import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"syscall"

"github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
)

var (
Expand All @@ -22,48 +21,30 @@ func SetProcRoot(root string) {
procRoot = root
}

// made variables for mocking
var (
readDir = ioutil.ReadDir
lstat = syscall.Lstat
stat = syscall.Stat
open = fs.Open
)

// walkProcPid walks over all numerical (PID) /proc entries, and sees if their
// ./fd/* files are symlink to sockets. Returns a map from socket ID (inode)
// to PID. Will return an error if /proc isn't there.
func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
dirNames, err := readDir(procRoot)
if err != nil {
return nil, err
}

func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, error) {
var (
res = map[uint64]Proc{}
namespaces = map[uint64]struct{}{}
statT syscall.Stat_t
)
for _, entry := range dirNames {
dirName := entry.Name()
pid, err := strconv.ParseUint(dirName, 10, 0)
if err != nil {
// Not a number, so not a PID subdir.
continue
}

walker.Walk(func(p process.Process) {
dirName := strconv.Itoa(p.PID)
fdBase := filepath.Join(procRoot, dirName, "fd")
fds, err := readDir(fdBase)
fds, err := fs.ReadDir(fdBase)
if err != nil {
// Process is be gone by now, or we don't have access.
continue
return
}

// Read network namespace, and if we haven't seen it before,
// read /proc/<pid>/net/tcp
err = lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT)
err = fs.Lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT)
if err != nil {
continue
return
}

if _, ok := namespaces[statT.Ino]; !ok {
Expand All @@ -72,10 +53,9 @@ func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
readFile(filepath.Join(procRoot, dirName, "/net/tcp6"), buf)
}

var name string
for _, fd := range fds {
// Direct use of syscall.Stat() to save garbage.
err = stat(filepath.Join(fdBase, fd.Name()), &statT)
err = fs.Stat(filepath.Join(fdBase, fd.Name()), &statT)
if err != nil {
continue
}
Expand All @@ -85,50 +65,21 @@ func walkProcPid(buf *bytes.Buffer) (map[uint64]Proc, error) {
continue
}

if name == "" {
if name = procName(filepath.Join(procRoot, dirName)); name == "" {
// Process might be gone by now
break
}
}

res[statT.Ino] = Proc{
PID: uint(pid),
Name: name,
PID: uint(p.PID),
Name: p.Comm,
}
}
}
})

return res, nil
}

// procName does a pid->name lookup.
func procName(base string) string {
fh, err := open(filepath.Join(base, "/comm"))
if err != nil {
return ""
}

name := make([]byte, 64)
l, err := fh.Read(name)
fh.Close()
if err != nil {
return ""
}

if l < 2 {
return ""
}

// drop trailing "\n"
return string(name[:l-1])
}

// readFile reads an arbitrary file into a buffer. It's a variable so it can
// be overwritten for benchmarks. That's bad practice and we should change it
// to be a dependency.
var readFile = func(filename string, buf *bytes.Buffer) error {
f, err := os.Open(filename)
f, err := fs.Open(filename)
if err != nil {
return err
}
Expand Down
13 changes: 9 additions & 4 deletions probe/endpoint/procspy/proc_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"syscall"
"testing"

fs_hook "github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/test/fs"
)

Expand All @@ -31,17 +33,20 @@ var mockFS = fs.Dir("",
FStat: syscall.Stat_t{},
},
),
fs.File{
FName: "stat",
FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1",
},
),
),
)

func TestWalkProcPid(t *testing.T) {
oldReadDir, oldLstat, oldStat, oldOpen := readDir, lstat, stat, open
defer func() { readDir, lstat, stat, open = oldReadDir, oldLstat, oldStat, oldOpen }()
readDir, lstat, stat, open = mockFS.ReadDir, mockFS.Lstat, mockFS.Stat, mockFS.Open
fs_hook.Mock(mockFS)
defer fs_hook.Restore()

buf := bytes.Buffer{}
have, err := walkProcPid(&buf)
have, err := walkProcPid(&buf, process.NewWalker(procRoot))
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions probe/endpoint/procspy/spy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package procspy

import (
"net"

"github.com/weaveworks/scope/probe/process"
)

const (
Expand Down Expand Up @@ -38,6 +40,6 @@ type ConnIter interface {
// If processes is true it'll additionally try to lookup the process owning the
// connection, filling in the Proc field. You will need to run this as root to
// find all processes.
func Connections(processes bool) (ConnIter, error) {
return cbConnections(processes)
func Connections(processes bool, walker process.Walker) (ConnIter, error) {
return cbConnections(processes, walker)
}
4 changes: 3 additions & 1 deletion probe/endpoint/procspy/spy_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"net"
"os/exec"
"strconv"

"github.com/weaveworks/scope/probe/process"
)

const (
Expand All @@ -14,7 +16,7 @@ const (
// Connections returns all established (TCP) connections. No need to be root
// to run this. If processes is true it also tries to fill in the process
// fields of the connection. You need to be root to find all processes.
var cbConnections = func(processes bool) (ConnIter, error) {
var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) {
out, err := exec.Command(
netstatBinary,
"-n", // no number resolving
Expand Down
6 changes: 4 additions & 2 deletions probe/endpoint/procspy/spy_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package procspy
import (
"bytes"
"sync"

"github.com/weaveworks/scope/probe/process"
)

var bufPool = sync.Pool{
Expand Down Expand Up @@ -31,15 +33,15 @@ func (c *pnConnIter) Next() *Connection {
}

// cbConnections sets Connections()
var cbConnections = func(processes bool) (ConnIter, error) {
var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) {
// buffer for contents of /proc/<pid>/net/tcp
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()

var procs map[uint64]Proc
if processes {
var err error
if procs, err = walkProcPid(buf); err != nil {
if procs, err = walkProcPid(buf, walker); err != nil {
return nil, err
}
}
Expand Down
6 changes: 4 additions & 2 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Reporter struct {
includeProcesses bool
includeNAT bool
flowWalker flowWalker // interface
procWalker process.Walker
natMapper natMapper
reverseResolver *reverseResolver
}
Expand All @@ -47,14 +48,15 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, procWalker process.Walker) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
flowWalker: newConntrackFlowWalker(useConntrack),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")),
reverseResolver: newReverseResolver(),
procWalker: procWalker,
}
}

Expand All @@ -78,7 +80,7 @@ func (r *Reporter) Report() (report.Report, error) {
rpt := report.MakeReport()

{
conns, err := procspy.Connections(r.includeProcesses)
conns, err := procspy.Connections(r.includeProcesses, r.procWalker)
if err != nil {
return rpt, err
}
Expand Down
4 changes: 2 additions & 2 deletions probe/endpoint/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSpyNoProcesses(t *testing.T) {
nodeName = "frenchs-since-1904" // TODO rename to hostNmae
)

reporter := endpoint.NewReporter(nodeID, nodeName, false, false)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false, nil)
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestSpyWithProcesses(t *testing.T) {
nodeName = "fishermans-friend" // TODO rename to hostNmae
)

reporter := endpoint.NewReporter(nodeID, nodeName, true, false)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false, nil)
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)

Expand Down
Loading