From d1b8d1f84488c2dd6fac6cd9185f7cf6dac52eb2 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Fri, 29 Jan 2016 18:34:22 +0000 Subject: [PATCH] Dynamically adjust the ratelimit of the PID walker --- probe/endpoint/procspy/backgound_reader.go | 40 +++++++++++++++++++--- probe/endpoint/procspy/proc.go | 3 ++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/probe/endpoint/procspy/backgound_reader.go b/probe/endpoint/procspy/backgound_reader.go index 06da76f609..cee956f560 100644 --- a/probe/endpoint/procspy/backgound_reader.go +++ b/probe/endpoint/procspy/backgound_reader.go @@ -3,6 +3,8 @@ package procspy import ( "bytes" "fmt" + "log" + "math" "sync" "time" @@ -10,7 +12,12 @@ import ( ) const ( - ratelimit = 20 * time.Millisecond // read 50 namespaces per second + initialRateLimit = 50 * time.Millisecond // read 20 namespaces per second + maxRateLimit = 100 * time.Millisecond // read 10 namespaces per second + targetWalkTime = 15 * time.Second + + maxRateLimitF = float64(maxRateLimit) + targetWalkTimeF = float64(targetWalkTime) ) type backgroundReader struct { @@ -21,7 +28,7 @@ type backgroundReader struct { readySockets map[uint64]*Proc } -// HACK: Pretty ugly singleton interface (particularly the part part of passing +// HACK: Pretty ugly singleton interface (particularly the part of passing // the walker to StartBackgroundReader() and ignoring it in in Connections() ) // experimenting with this for now. var singleton *backgroundReader @@ -49,13 +56,35 @@ func StartBackgroundReader(walker process.Walker) { } func (br *backgroundReader) loop() { - namespaceTicker := time.Tick(ratelimit) + rateLimit := initialRateLimit + + namespaceTicker := time.Tick(rateLimit) + for { + start := time.Now() sockets, err := walkProcPid(br.walkingBuf, br.walker, namespaceTicker) if err != nil { - fmt.Printf("background reader: error reading walking /proc: %s\n", err) + log.Printf("background reader: error walking /proc: %s\n", err) continue } + walkTime := time.Now().Sub(start) + walkTimeF := float64(walkTime) + + log.Printf("debug: background reader: full pass took %s\n", walkTime) + if walkTimeF/targetWalkTimeF > 1.5 { + log.Printf( + "warn: background reader: full pass took %s: 50%% more than expected (%s)\n", + walkTime, + targetWalkTime, + ) + } + + // Adjust rate limit to more-accurately meet the target walk time in next iteration + scaledRateLimit := targetWalkTimeF / walkTimeF * float64(rateLimit) + rateLimit = time.Duration(math.Min(scaledRateLimit, maxRateLimitF)) + log.Printf("debug: background reader: new rate limit %s\n", rateLimit) + + namespaceTicker = time.Tick(rateLimit) // Swap buffers br.mtx.Lock() @@ -64,6 +93,9 @@ func (br *backgroundReader) loop() { br.mtx.Unlock() br.walkingBuf.Reset() + + // Sleep during spare time + time.Sleep(targetWalkTime - walkTime) } } diff --git a/probe/endpoint/procspy/proc.go b/probe/endpoint/procspy/proc.go index a71c71974e..0725b61d9d 100644 --- a/probe/endpoint/procspy/proc.go +++ b/probe/endpoint/procspy/proc.go @@ -4,6 +4,7 @@ package procspy import ( "bytes" + "log" "path/filepath" "strconv" "syscall" @@ -55,6 +56,8 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker, namespaceTicker <-cha } }) + log.Printf("debug: walkProcPid: found %d namespaces\n", len(namespaces)) + for _, procs := range namespaces { <-namespaceTicker