Skip to content

Commit

Permalink
Get the background reader to read /proc in a loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Alfonso Acosta committed Jan 29, 2016
1 parent 2c32110 commit d89c886
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 70 deletions.
112 changes: 47 additions & 65 deletions probe/endpoint/procspy/backgound_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,77 @@ package procspy

import (
"bytes"
"fmt"
"sync"
"time"

//"github.com/armon/go-metrics"
"github.com/coocood/freecache"

"github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
)

const (
timeout = 60 // keep contents for 60 seconds
size = 10 * 1024 * 1024 // keep upto 10MB worth
ratelimit = 20 * time.Millisecond // read 50 files per second
ratelimit = 20 * time.Millisecond // read 50 namespaces per second
)

type backgroundReader struct {
mtx sync.Mutex
cond *sync.Cond

queue []string // sorted list of files to fetch
queueIndex map[string]struct{} // entry here indicates file is already queued for fetching
cache *freecache.Cache
walker process.Walker
mtx sync.Mutex
walkingBuf *bytes.Buffer
readyBuf *bytes.Buffer
readySockets map[uint64]*Proc
}

// StartBackgroundReader starts a ratelimited background goroutine to
// read the expensive files from proc.
func StartBackgroundReader() {
br := &backgroundReader{
queueIndex: map[string]struct{}{},
cache: freecache.NewCache(size),
}
br.cond = sync.NewCond(&br.mtx)
go br.loop()
readFile = br.readFile
}
// HACK: Pretty ugly singleton interface (particularly the part part of passing
// the walker to StartBackgroundReader() and ignoring it in in Connections() )
// experimenting with this for now.
var singleton *backgroundReader

func (br *backgroundReader) next() string {
br.mtx.Lock()
defer br.mtx.Unlock()
for len(br.queue) == 0 {
br.cond.Wait()
func getBackgroundReader() (*backgroundReader, error) {
var err error
if singleton == nil {
err = fmt.Errorf("background reader hasn't yet been started")
}
filename := br.queue[0]
br.queue = br.queue[1:]
delete(br.queueIndex, filename)
return filename
return singleton, err
}

func (br *backgroundReader) enqueue(filename string) {
br.mtx.Lock()
defer br.mtx.Unlock()
if _, ok := br.queueIndex[filename]; !ok {
br.queue = append(br.queue, filename)
br.queueIndex[filename] = struct{}{}
br.cond.Broadcast()
// StartBackgroundReader starts a ratelimited background goroutine to
// read the expensive files from proc.
func StartBackgroundReader(walker process.Walker) {
if singleton != nil {
return
}
}

func (br *backgroundReader) readFileIntoCache(filename string) error {
contents, err := fs.ReadFile(filename)
if err != nil {
return err
singleton = &backgroundReader{
walker: walker,
walkingBuf: bytes.NewBuffer(make([]byte, 0, 5000)),
readyBuf: bytes.NewBuffer(make([]byte, 0, 5000)),
}
br.cache.Set([]byte(filename), contents, timeout)
return nil
go singleton.loop()
}

func (br *backgroundReader) loop() {
ticker := time.Tick(ratelimit)
namespaceTicker := time.Tick(ratelimit)
for {
err := br.readFileIntoCache(br.next())
// Only rate limit if we succesfully read a file
if err == nil {
<-ticker
sockets, err := walkProcPid(br.walkingBuf, br.walker, namespaceTicker)
if err != nil {
fmt.Printf("background reader: error reading walking /proc: %s\n", err)
continue
}

// Swap buffers
br.mtx.Lock()
br.readyBuf, br.walkingBuf = br.walkingBuf, br.readyBuf
br.readySockets = sockets
br.mtx.Unlock()

br.walkingBuf.Reset()
}
}

func (br *backgroundReader) readFile(filename string, buf *bytes.Buffer) (int64, error) {
// We always schedule the filename for reading, as on quiet systems
// we want to have a fresh as possible resuls
br.enqueue(filename)
func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) map[uint64]*Proc {
br.mtx.Lock()
defer br.mtx.Unlock()

v, err := br.cache.Get([]byte(filename))
if err != nil {
return 0, nil
}
// race!
br.cache.Del([]byte(filename))
n, err := buf.Write(v)
return int64(n), err
reader := bytes.NewReader(br.readyBuf.Bytes())
buf.ReadFrom(reader)

return br.readySockets
}
5 changes: 4 additions & 1 deletion probe/endpoint/procspy/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strconv"
"syscall"
"time"

"github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
Expand All @@ -24,7 +25,7 @@ func SetProcRoot(root string) {
// walkProcPid walks over all numerical (PID) /proc entries, and sees if their
// ./fd/* files are symlink to sockets. Returns a map from socket ID (inode)
// to PID. Will return an error if /proc isn't there.
func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]*Proc, error) {
func walkProcPid(buf *bytes.Buffer, walker process.Walker, namespaceTicker <-chan time.Time) (map[uint64]*Proc, error) {
var (
res = map[uint64]*Proc{} // map socket inode -> process
namespaces = map[uint64][]*process.Process{} // map network namespace id -> processes
Expand Down Expand Up @@ -56,6 +57,8 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]*Proc, er

for _, procs := range namespaces {

<-namespaceTicker

// Read the namespace connections (i.e. read /proc/PID/net/tcp{,6} for
// any of the processes in the namespace)

Expand Down
7 changes: 4 additions & 3 deletions probe/endpoint/procspy/spy_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ func (c *pnConnIter) Next() *Connection {
}

// cbConnections sets Connections()
var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) {
var cbConnections = func(processes bool, _ process.Walker) (ConnIter, error) {
// buffer for contents of /proc/<pid>/net/tcp
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()

var procs map[uint64]*Proc
if processes {
var err error
if procs, err = walkProcPid(buf, walker); err != nil {
br, err := getBackgroundReader()
if err != nil {
return nil, err
}
procs = br.getWalkedProcPid(buf)
}

if buf.Len() == 0 {
Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var SpyDuration = prometheus.NewSummaryVec(
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, procWalker process.Walker) *Reporter {
procspy.StartBackgroundReader()
procspy.StartBackgroundReader(procWalker)
return &Reporter{
hostID: hostID,
hostName: hostName,
Expand Down

0 comments on commit d89c886

Please sign in to comment.