Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alfonso Acosta committed Feb 8, 2016
1 parent 39e11c8 commit c20707a
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 73 deletions.
40 changes: 22 additions & 18 deletions probe/endpoint/procspy/background_reader_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,32 @@ func (br *backgroundReader) loop() {
)

rateLimit := initialRateLimit
ticker := time.Tick(rateLimit)
ticker := time.NewTicker(rateLimit)
for {
start := time.Now()
sockets, err := walkProcPid(br.walkingBuf, br.walker, ticker, fdBlockSize)
sockets, err := walkProcPid(br.walkingBuf, br.walker, ticker.C, fdBlockSize)
if err != nil {
log.Printf("background reader: error walking /proc: %s\n", err)
continue
}

br.mtx.Lock()

// Should we stop?
if br.pleaseStop {
br.pleaseStop = false
br.running = false
ticker.Stop()
br.mtx.Unlock()
return
}

// Swap buffers
br.readyBuf, br.walkingBuf = br.walkingBuf, br.readyBuf
br.readySockets = sockets

br.mtx.Unlock()

walkTime := time.Now().Sub(start)
walkTimeF := float64(walkTime)

Expand All @@ -92,22 +110,8 @@ func (br *backgroundReader) loop() {
rateLimit = time.Duration(math.Min(scaledRateLimit, maxRateLimitF))
log.Printf("debug: background reader: new rate limit %s\n", rateLimit)

ticker = time.Tick(rateLimit)

br.mtx.Lock()

// Should we stop?
if br.pleaseStop {
br.pleaseStop = false
br.running = false
br.mtx.Unlock()
return
}

// Swap buffers
br.readyBuf, br.walkingBuf = br.walkingBuf, br.readyBuf
br.readySockets = sockets
br.mtx.Unlock()
ticker.Stop()
ticker = time.NewTicker(rateLimit)

br.walkingBuf.Reset()

Expand Down
36 changes: 0 additions & 36 deletions probe/endpoint/procspy/benchmark_internal_test.go

This file was deleted.

8 changes: 4 additions & 4 deletions probe/endpoint/procspy/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ func (f *fixedConnIter) Next() *Connection {
return &car
}

// fixedScanner implements ConnectionScanner and uses constant Connection and
// FixedScanner implements ConnectionScanner and uses constant Connection and
// ConnectionProcs. It's designed to be used in tests.
type fixedScanner []Connection
type FixedScanner []Connection

func (s fixedScanner) Connections() (ConnIter, error) {
func (s FixedScanner) Connections(_ bool) (ConnIter, error) {
iter := fixedConnIter(s)
return &iter, nil
}

func (s fixedScanner) Stop() {}
func (s FixedScanner) Stop() {}
7 changes: 6 additions & 1 deletion probe/endpoint/procspy/proc_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"
"syscall"
"testing"
"time"

fs_hook "github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
Expand Down Expand Up @@ -57,7 +58,11 @@ func TestWalkProcPid(t *testing.T) {
defer fs_hook.Restore()

buf := bytes.Buffer{}
have, err := walkProcPid(&buf, process.NewWalker(procRoot))
walker := process.NewWalker(procRoot)
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
fdBlockSize := 1
have, err := walkProcPid(&buf, walker, ticker.C, fdBlockSize)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 2 additions & 4 deletions probe/endpoint/procspy/proc_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,8 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker, ticker <-chan time.Ti
return sockets, nil
}

// readFile reads an arbitrary file into a buffer. It's a variable so it can
// be overwritten for benchmarks. That's bad practice and we should change it
// to be a dependency.
var readFile = func(filename string, buf *bytes.Buffer) (int64, error) {
// readFile reads an arbitrary file into a buffer.
func readFile(filename string, buf *bytes.Buffer) (int64, error) {
f, err := fs.Open(filename)
if err != nil {
return -1, err
Expand Down
9 changes: 8 additions & 1 deletion probe/endpoint/procspy/spy_linux_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"reflect"
"testing"
"time"

fs_hook "github.com/weaveworks/scope/common/fs"
"github.com/weaveworks/scope/probe/process"
Expand All @@ -13,8 +14,13 @@ import (
func TestLinuxConnections(t *testing.T) {
fs_hook.Mock(mockFS)
defer fs_hook.Restore()
scanner := NewConnectionScanner(process.NewWalker("/proc"))
defer scanner.Stop()

iter, err := cbConnections(true, process.NewWalker("/proc"))
// let the background scanner finish its first pass
time.Sleep(1 * time.Second)

iter, err := scanner.Connections(true)
if err != nil {
t.Fatal(err)
}
Expand All @@ -37,4 +43,5 @@ func TestLinuxConnections(t *testing.T) {
if have := iter.Next(); have != nil {
t.Fatal(have)
}

}
4 changes: 2 additions & 2 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, procWalker process.Walker) *Reporter {
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, scanner procspy.ConnectionScanner) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
flowWalker: newConntrackFlowWalker(useConntrack),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")),
reverseResolver: newReverseResolver(),
scanner: procspy.NewConnectionScanner(procWalker),
scanner: scanner,
}
}

Expand Down
10 changes: 4 additions & 6 deletions probe/endpoint/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,13 @@ var (
)

func TestSpyNoProcesses(t *testing.T) {
procspy.SetFixtures(fixConnections)

const (
nodeID = "heinz-tomato-ketchup" // TODO rename to hostID
nodeName = "frenchs-since-1904" // TODO rename to hostNmae
)

reporter := endpoint.NewReporter(nodeID, nodeName, false, false, nil)
scanner := procspy.FixedScanner(fixConnections)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false, scanner)
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
Expand Down Expand Up @@ -101,14 +100,13 @@ func TestSpyNoProcesses(t *testing.T) {
}

func TestSpyWithProcesses(t *testing.T) {
procspy.SetFixtures(fixConnectionsWithProcesses)

const (
nodeID = "nikon" // TODO rename to hostID
nodeName = "fishermans-friend" // TODO rename to hostNmae
)

reporter := endpoint.NewReporter(nodeID, nodeName, true, false, nil)
scanner := procspy.FixedScanner(fixConnectionsWithProcesses)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false, scanner)
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)

Expand Down
3 changes: 2 additions & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ func probeMain() {
defer resolver.Stop()

processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
scanner := procspy.NewConnectionScanner(processCache)

endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack, processCache)
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack, scanner)
defer endpointReporter.Stop()

p := probe.New(*spyInterval, *publishInterval, clients)
Expand Down

0 comments on commit c20707a

Please sign in to comment.