From c20707abeafa6908c8bc0519c6e363168320bf96 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 8 Feb 2016 11:50:57 +0000 Subject: [PATCH] Fix tests --- .../procspy/background_reader_linux.go | 40 ++++++++++--------- .../procspy/benchmark_internal_test.go | 36 ----------------- probe/endpoint/procspy/fixture.go | 8 ++-- probe/endpoint/procspy/proc_internal_test.go | 7 +++- probe/endpoint/procspy/proc_linux.go | 6 +-- .../procspy/spy_linux_internal_test.go | 9 ++++- probe/endpoint/reporter.go | 4 +- probe/endpoint/reporter_test.go | 10 ++--- prog/probe.go | 3 +- 9 files changed, 50 insertions(+), 73 deletions(-) delete mode 100644 probe/endpoint/procspy/benchmark_internal_test.go diff --git a/probe/endpoint/procspy/background_reader_linux.go b/probe/endpoint/procspy/background_reader_linux.go index 51eb6b7e7c..f565662dde 100644 --- a/probe/endpoint/procspy/background_reader_linux.go +++ b/probe/endpoint/procspy/background_reader_linux.go @@ -67,14 +67,32 @@ func (br *backgroundReader) loop() { ) rateLimit := initialRateLimit - ticker := time.Tick(rateLimit) + ticker := time.NewTicker(rateLimit) for { start := time.Now() - sockets, err := walkProcPid(br.walkingBuf, br.walker, ticker, fdBlockSize) + sockets, err := walkProcPid(br.walkingBuf, br.walker, ticker.C, fdBlockSize) if err != nil { log.Printf("background reader: error walking /proc: %s\n", err) continue } + + br.mtx.Lock() + + // Should we stop? + if br.pleaseStop { + br.pleaseStop = false + br.running = false + ticker.Stop() + br.mtx.Unlock() + return + } + + // Swap buffers + br.readyBuf, br.walkingBuf = br.walkingBuf, br.readyBuf + br.readySockets = sockets + + br.mtx.Unlock() + walkTime := time.Now().Sub(start) walkTimeF := float64(walkTime) @@ -92,22 +110,8 @@ func (br *backgroundReader) loop() { rateLimit = time.Duration(math.Min(scaledRateLimit, maxRateLimitF)) log.Printf("debug: background reader: new rate limit %s\n", rateLimit) - ticker = time.Tick(rateLimit) - - br.mtx.Lock() - - // Should we stop? - if br.pleaseStop { - br.pleaseStop = false - br.running = false - br.mtx.Unlock() - return - } - - // Swap buffers - br.readyBuf, br.walkingBuf = br.walkingBuf, br.readyBuf - br.readySockets = sockets - br.mtx.Unlock() + ticker.Stop() + ticker = time.NewTicker(rateLimit) br.walkingBuf.Reset() diff --git a/probe/endpoint/procspy/benchmark_internal_test.go b/probe/endpoint/procspy/benchmark_internal_test.go deleted file mode 100644 index 1e48aab06e..0000000000 --- a/probe/endpoint/procspy/benchmark_internal_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package procspy - -import ( - "bytes" - "testing" -) - -func BenchmarkParseConnectionsBaseline(b *testing.B) { - readFile = func(string, *bytes.Buffer) (int64, error) { return 0, nil } - benchmarkConnections(b) - // 333 ns/op, 0 allocs/op -} - -func BenchmarkParseConnectionsFixture(b *testing.B) { - readFile = func(_ string, buf *bytes.Buffer) (int64, error) { - n, err := buf.Write(fixture) - return int64(n), err - } - benchmarkConnections(b) - // 15553 ns/op, 12 allocs/op -} - -func benchmarkConnections(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - cbConnections(false, nil) - } -} - -var fixture = []byte(` sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode - 0: 00000000:A6C0 00000000:0000 01 00000000:00000000 00:00000000 00000000 105 0 5107 1 ffff8800a6aaf040 100 0 0 10 0 - 1: 00000000:006F 00000000:0000 01 00000000:00000000 00:00000000 00000000 0 0 5084 1 ffff8800a6aaf740 100 0 0 10 0 - 2: 0100007F:0019 00000000:0000 01 00000000:00000000 00:00000000 00000000 0 0 10550 1 ffff8800a729b780 100 0 0 10 0 - 3: A12CF62E:E4D7 57FC1EC0:01BB 01 00000000:00000000 02:000006FA 00000000 1000 0 639474 2 ffff88007e75a740 48 4 26 10 -1 -`) diff --git a/probe/endpoint/procspy/fixture.go b/probe/endpoint/procspy/fixture.go index b8d0ba02da..baf778fe4a 100644 --- a/probe/endpoint/procspy/fixture.go +++ b/probe/endpoint/procspy/fixture.go @@ -13,13 +13,13 @@ func (f *fixedConnIter) Next() *Connection { return &car } -// fixedScanner implements ConnectionScanner and uses constant Connection and +// FixedScanner implements ConnectionScanner and uses constant Connection and // ConnectionProcs. It's designed to be used in tests. -type fixedScanner []Connection +type FixedScanner []Connection -func (s fixedScanner) Connections() (ConnIter, error) { +func (s FixedScanner) Connections(_ bool) (ConnIter, error) { iter := fixedConnIter(s) return &iter, nil } -func (s fixedScanner) Stop() {} +func (s FixedScanner) Stop() {} diff --git a/probe/endpoint/procspy/proc_internal_test.go b/probe/endpoint/procspy/proc_internal_test.go index 1035e485ab..1e3bc74665 100644 --- a/probe/endpoint/procspy/proc_internal_test.go +++ b/probe/endpoint/procspy/proc_internal_test.go @@ -5,6 +5,7 @@ import ( "reflect" "syscall" "testing" + "time" fs_hook "github.com/weaveworks/scope/common/fs" "github.com/weaveworks/scope/probe/process" @@ -57,7 +58,11 @@ func TestWalkProcPid(t *testing.T) { defer fs_hook.Restore() buf := bytes.Buffer{} - have, err := walkProcPid(&buf, process.NewWalker(procRoot)) + walker := process.NewWalker(procRoot) + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + fdBlockSize := 1 + have, err := walkProcPid(&buf, walker, ticker.C, fdBlockSize) if err != nil { t.Fatal(err) } diff --git a/probe/endpoint/procspy/proc_linux.go b/probe/endpoint/procspy/proc_linux.go index 2af9b999de..0329fb8476 100644 --- a/probe/endpoint/procspy/proc_linux.go +++ b/probe/endpoint/procspy/proc_linux.go @@ -210,10 +210,8 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker, ticker <-chan time.Ti return sockets, nil } -// readFile reads an arbitrary file into a buffer. It's a variable so it can -// be overwritten for benchmarks. That's bad practice and we should change it -// to be a dependency. -var readFile = func(filename string, buf *bytes.Buffer) (int64, error) { +// readFile reads an arbitrary file into a buffer. +func readFile(filename string, buf *bytes.Buffer) (int64, error) { f, err := fs.Open(filename) if err != nil { return -1, err diff --git a/probe/endpoint/procspy/spy_linux_internal_test.go b/probe/endpoint/procspy/spy_linux_internal_test.go index fc5ed3890d..bdbf11c8e9 100644 --- a/probe/endpoint/procspy/spy_linux_internal_test.go +++ b/probe/endpoint/procspy/spy_linux_internal_test.go @@ -4,6 +4,7 @@ import ( "net" "reflect" "testing" + "time" fs_hook "github.com/weaveworks/scope/common/fs" "github.com/weaveworks/scope/probe/process" @@ -13,8 +14,13 @@ import ( func TestLinuxConnections(t *testing.T) { fs_hook.Mock(mockFS) defer fs_hook.Restore() + scanner := NewConnectionScanner(process.NewWalker("/proc")) + defer scanner.Stop() - iter, err := cbConnections(true, process.NewWalker("/proc")) + // let the background scanner finish its first pass + time.Sleep(1 * time.Second) + + iter, err := scanner.Connections(true) if err != nil { t.Fatal(err) } @@ -37,4 +43,5 @@ func TestLinuxConnections(t *testing.T) { if have := iter.Next(); have != nil { t.Fatal(have) } + } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index b88b0c4051..5999312c6f 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -48,7 +48,7 @@ var SpyDuration = prometheus.NewSummaryVec( // on the host machine, at the granularity of host and port. That information // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. -func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, procWalker process.Walker) *Reporter { +func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, scanner procspy.ConnectionScanner) *Reporter { return &Reporter{ hostID: hostID, hostName: hostName, @@ -56,7 +56,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo flowWalker: newConntrackFlowWalker(useConntrack), natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")), reverseResolver: newReverseResolver(), - scanner: procspy.NewConnectionScanner(procWalker), + scanner: scanner, } } diff --git a/probe/endpoint/reporter_test.go b/probe/endpoint/reporter_test.go index 4a03636e74..ba3c2e738e 100644 --- a/probe/endpoint/reporter_test.go +++ b/probe/endpoint/reporter_test.go @@ -64,14 +64,13 @@ var ( ) func TestSpyNoProcesses(t *testing.T) { - procspy.SetFixtures(fixConnections) - const ( nodeID = "heinz-tomato-ketchup" // TODO rename to hostID nodeName = "frenchs-since-1904" // TODO rename to hostNmae ) - reporter := endpoint.NewReporter(nodeID, nodeName, false, false, nil) + scanner := procspy.FixedScanner(fixConnections) + reporter := endpoint.NewReporter(nodeID, nodeName, false, false, scanner) r, _ := reporter.Report() //buf, _ := json.MarshalIndent(r, "", " ") //t.Logf("\n%s\n", buf) @@ -101,14 +100,13 @@ func TestSpyNoProcesses(t *testing.T) { } func TestSpyWithProcesses(t *testing.T) { - procspy.SetFixtures(fixConnectionsWithProcesses) - const ( nodeID = "nikon" // TODO rename to hostID nodeName = "fishermans-friend" // TODO rename to hostNmae ) - reporter := endpoint.NewReporter(nodeID, nodeName, true, false, nil) + scanner := procspy.FixedScanner(fixConnectionsWithProcesses) + reporter := endpoint.NewReporter(nodeID, nodeName, true, false, scanner) r, _ := reporter.Report() // buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf) diff --git a/prog/probe.go b/prog/probe.go index 51081a6aab..a7a3d68663 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -139,8 +139,9 @@ func probeMain() { defer resolver.Stop() processCache := process.NewCachingWalker(process.NewWalker(*procRoot)) + scanner := procspy.NewConnectionScanner(processCache) - endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack, processCache) + endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack, scanner) defer endpointReporter.Stop() p := probe.New(*spyInterval, *publishInterval, clients)