From d89c88694639274bcd38e8bd1bfc039c64d8b00f Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 28 Jan 2016 20:08:23 +0000 Subject: [PATCH] Get the background reader to read /proc in a loop --- probe/endpoint/procspy/backgound_reader.go | 112 +++++++++------------ probe/endpoint/procspy/proc.go | 5 +- probe/endpoint/procspy/spy_linux.go | 7 +- probe/endpoint/reporter.go | 2 +- 4 files changed, 56 insertions(+), 70 deletions(-) diff --git a/probe/endpoint/procspy/backgound_reader.go b/probe/endpoint/procspy/backgound_reader.go index 7f3483e635..06da76f609 100644 --- a/probe/endpoint/procspy/backgound_reader.go +++ b/probe/endpoint/procspy/backgound_reader.go @@ -2,95 +2,77 @@ package procspy import ( "bytes" + "fmt" "sync" "time" - //"github.com/armon/go-metrics" - "github.com/coocood/freecache" - - "github.com/weaveworks/scope/common/fs" + "github.com/weaveworks/scope/probe/process" ) const ( - timeout = 60 // keep contents for 60 seconds - size = 10 * 1024 * 1024 // keep upto 10MB worth - ratelimit = 20 * time.Millisecond // read 50 files per second + ratelimit = 20 * time.Millisecond // read 50 namespaces per second ) type backgroundReader struct { - mtx sync.Mutex - cond *sync.Cond - - queue []string // sorted list of files to fetch - queueIndex map[string]struct{} // entry here indicates file is already queued for fetching - cache *freecache.Cache + walker process.Walker + mtx sync.Mutex + walkingBuf *bytes.Buffer + readyBuf *bytes.Buffer + readySockets map[uint64]*Proc } -// StartBackgroundReader starts a ratelimited background goroutine to -// read the expensive files from proc. -func StartBackgroundReader() { - br := &backgroundReader{ - queueIndex: map[string]struct{}{}, - cache: freecache.NewCache(size), - } - br.cond = sync.NewCond(&br.mtx) - go br.loop() - readFile = br.readFile -} +// HACK: Pretty ugly singleton interface (particularly the part part of passing +// the walker to StartBackgroundReader() and ignoring it in in Connections() ) +// experimenting with this for now. +var singleton *backgroundReader -func (br *backgroundReader) next() string { - br.mtx.Lock() - defer br.mtx.Unlock() - for len(br.queue) == 0 { - br.cond.Wait() +func getBackgroundReader() (*backgroundReader, error) { + var err error + if singleton == nil { + err = fmt.Errorf("background reader hasn't yet been started") } - filename := br.queue[0] - br.queue = br.queue[1:] - delete(br.queueIndex, filename) - return filename + return singleton, err } -func (br *backgroundReader) enqueue(filename string) { - br.mtx.Lock() - defer br.mtx.Unlock() - if _, ok := br.queueIndex[filename]; !ok { - br.queue = append(br.queue, filename) - br.queueIndex[filename] = struct{}{} - br.cond.Broadcast() +// StartBackgroundReader starts a ratelimited background goroutine to +// read the expensive files from proc. +func StartBackgroundReader(walker process.Walker) { + if singleton != nil { + return } -} - -func (br *backgroundReader) readFileIntoCache(filename string) error { - contents, err := fs.ReadFile(filename) - if err != nil { - return err + singleton = &backgroundReader{ + walker: walker, + walkingBuf: bytes.NewBuffer(make([]byte, 0, 5000)), + readyBuf: bytes.NewBuffer(make([]byte, 0, 5000)), } - br.cache.Set([]byte(filename), contents, timeout) - return nil + go singleton.loop() } func (br *backgroundReader) loop() { - ticker := time.Tick(ratelimit) + namespaceTicker := time.Tick(ratelimit) for { - err := br.readFileIntoCache(br.next()) - // Only rate limit if we succesfully read a file - if err == nil { - <-ticker + sockets, err := walkProcPid(br.walkingBuf, br.walker, namespaceTicker) + if err != nil { + fmt.Printf("background reader: error reading walking /proc: %s\n", err) + continue } + + // Swap buffers + br.mtx.Lock() + br.readyBuf, br.walkingBuf = br.walkingBuf, br.readyBuf + br.readySockets = sockets + br.mtx.Unlock() + + br.walkingBuf.Reset() } } -func (br *backgroundReader) readFile(filename string, buf *bytes.Buffer) (int64, error) { - // We always schedule the filename for reading, as on quiet systems - // we want to have a fresh as possible resuls - br.enqueue(filename) +func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) map[uint64]*Proc { + br.mtx.Lock() + defer br.mtx.Unlock() - v, err := br.cache.Get([]byte(filename)) - if err != nil { - return 0, nil - } - // race! - br.cache.Del([]byte(filename)) - n, err := buf.Write(v) - return int64(n), err + reader := bytes.NewReader(br.readyBuf.Bytes()) + buf.ReadFrom(reader) + + return br.readySockets } diff --git a/probe/endpoint/procspy/proc.go b/probe/endpoint/procspy/proc.go index 8b83de53bc..a71c71974e 100644 --- a/probe/endpoint/procspy/proc.go +++ b/probe/endpoint/procspy/proc.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strconv" "syscall" + "time" "github.com/weaveworks/scope/common/fs" "github.com/weaveworks/scope/probe/process" @@ -24,7 +25,7 @@ func SetProcRoot(root string) { // walkProcPid walks over all numerical (PID) /proc entries, and sees if their // ./fd/* files are symlink to sockets. Returns a map from socket ID (inode) // to PID. Will return an error if /proc isn't there. -func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]*Proc, error) { +func walkProcPid(buf *bytes.Buffer, walker process.Walker, namespaceTicker <-chan time.Time) (map[uint64]*Proc, error) { var ( res = map[uint64]*Proc{} // map socket inode -> process namespaces = map[uint64][]*process.Process{} // map network namespace id -> processes @@ -56,6 +57,8 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]*Proc, er for _, procs := range namespaces { + <-namespaceTicker + // Read the namespace connections (i.e. read /proc/PID/net/tcp{,6} for // any of the processes in the namespace) diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 68b0d7c0bc..4429aa2778 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -33,17 +33,18 @@ func (c *pnConnIter) Next() *Connection { } // cbConnections sets Connections() -var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error) { +var cbConnections = func(processes bool, _ process.Walker) (ConnIter, error) { // buffer for contents of /proc//net/tcp buf := bufPool.Get().(*bytes.Buffer) buf.Reset() var procs map[uint64]*Proc if processes { - var err error - if procs, err = walkProcPid(buf, walker); err != nil { + br, err := getBackgroundReader() + if err != nil { return nil, err } + procs = br.getWalkedProcPid(buf) } if buf.Len() == 0 { diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index c177d19ea7..2adf1c9e20 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -49,7 +49,7 @@ var SpyDuration = prometheus.NewSummaryVec( // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, procWalker process.Walker) *Reporter { - procspy.StartBackgroundReader() + procspy.StartBackgroundReader(procWalker) return &Reporter{ hostID: hostID, hostName: hostName,