Skip to content

Commit

Permalink
Improve probe docker code quality & test coverage.
Browse files Browse the repository at this point in the history
- Move docker probe code into it's own module
- Put PIDTree behind and interface for mocking
- Disaggregate dockerTagger into a registry, tagger and reporter
- Similarly disaggregate tests
- Add mocks for docker container and registry
- Add test for docker events & stats
  • Loading branch information
Tom Wilkie committed Jun 18, 2015
1 parent a6ef295 commit 0e6f22d
Show file tree
Hide file tree
Showing 20 changed files with 1,041 additions and 590 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/*

$(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go

$(PROBE_EXE): probe/*.go probe/tag/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go probe/tag/*.go probe/docker/*.go report/*.go xfer/*.go

$(APP_EXE) $(PROBE_EXE):
go get -tags netgo ./$(@D)
Expand Down
134 changes: 98 additions & 36 deletions probe/tag/docker_container.go → probe/docker/container.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tag
package docker

import (
"bufio"
"encoding/json"
"fmt"
"io"
Expand All @@ -10,9 +11,12 @@ import (
"net/http/httputil"
"net/url"
"strconv"
"strings"
"sync"

docker "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/scope/report"
)

// These constants are keys used in node metadata
Expand All @@ -39,51 +43,102 @@ const (
CPUSystemCPUUsage = "cpu_system_cpu_usage"
)

type dockerContainer struct {
sync.RWMutex
*docker.Container
// Exported for testing
var (
DialStub = net.Dial
NewClientConnStub = newClientConn
)

func newClientConn(c net.Conn, r *bufio.Reader) ClientConn {
return httputil.NewClientConn(c, r)
}

// ClientConn is exported for testing
type ClientConn interface {
Do(req *http.Request) (resp *http.Response, err error)
Close() error
}

// Container represents a docker container
type Container interface {
ID() string
Image() string
PID() int
GetNodeMetadata() report.NodeMetadata

StartGatheringStats() error
StopGatheringStats()
}

statsConn *httputil.ClientConn
type container struct {
sync.RWMutex
container *docker.Container
statsConn ClientConn
latestStats *docker.Stats
}

// called whilst holding t.Lock() for writes
func (c *dockerContainer) startGatheringStats(containerID string) error {
// NewContainer creates a new Container
func NewContainer(c *docker.Container) Container {
return &container{container: c}
}

func (c *container) ID() string {
return c.container.ID
}

func (c *container) Image() string {
return c.container.Image
}

func (c *container) PID() int {
return c.container.State.Pid
}

func (c *container) StartGatheringStats() error {
c.Lock()
defer c.Unlock()

if c.statsConn != nil {
return fmt.Errorf("already gather stats for container %s", containerID)
return fmt.Errorf("already gather stats for container %s", c.container.ID)
}

log.Printf("docker mapper: collecting stats for %s", containerID)
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", containerID), nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", "weavescope")
go func() {
log.Printf("docker container: collecting stats for %s", c.container.ID)
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil)
if err != nil {
log.Printf("docker container: %v", err)
return
}
req.Header.Set("User-Agent", "weavescope")

url, err := url.Parse(endpoint)
if err != nil {
return err
}
url, err := url.Parse(endpoint)
if err != nil {
log.Printf("docker container: %v", err)
return
}

dial, err := net.Dial(url.Scheme, url.Path)
if err != nil {
return err
}
dial, err := net.Dial(url.Scheme, url.Path)
if err != nil {
log.Printf("docker container: %v", err)
return
}

conn := httputil.NewClientConn(dial, nil)
resp, err := conn.Do(req)
if err != nil {
return err
}
conn := NewClientConnStub(dial, nil)
resp, err := conn.Do(req)
if err != nil {
log.Printf("docker container: %v", err)
return
}

c.statsConn = conn
c.Lock()
c.statsConn = conn
c.Unlock()

go func() {
defer func() {
c.Lock()
defer c.Unlock()

log.Printf("docker mapper: stopped collecting stats for %s", containerID)
log.Printf("docker container: stopped collecting stats for %s", c.container.ID)
c.statsConn = nil
c.latestStats = nil
}()
Expand All @@ -93,7 +148,7 @@ func (c *dockerContainer) startGatheringStats(containerID string) error {

for err := decoder.Decode(&stats); err != io.EOF; err = decoder.Decode(&stats) {
if err != nil {
log.Printf("docker mapper: error reading event %v", err)
log.Printf("docker container: error reading event %v", err)
return
}

Expand All @@ -109,7 +164,7 @@ func (c *dockerContainer) startGatheringStats(containerID string) error {
}

// called whilst holding t.Lock()
func (c *dockerContainer) stopGatheringStats(containerID string) {
func (c *container) StopGatheringStats() {
c.Lock()
defer c.Unlock()

Expand All @@ -124,15 +179,21 @@ func (c *dockerContainer) stopGatheringStats(containerID string) {
}

// called whilst holding t.RLock()
func (c *dockerContainer) getStats() map[string]string {
func (c *container) GetNodeMetadata() report.NodeMetadata {
c.RLock()
defer c.RUnlock()

result := report.NodeMetadata{
ContainerID: c.ID(),
ContainerName: strings.TrimPrefix(c.container.Name, "/"),
ImageID: c.container.Image,
}

if c.latestStats == nil {
return map[string]string{}
return result
}

return map[string]string{
result.Merge(report.NodeMetadata{
NetworkRxDropped: strconv.FormatUint(c.latestStats.Network.RxDropped, 10),
NetworkRxBytes: strconv.FormatUint(c.latestStats.Network.RxBytes, 10),
NetworkRxErrors: strconv.FormatUint(c.latestStats.Network.RxErrors, 10),
Expand All @@ -152,5 +213,6 @@ func (c *dockerContainer) getStats() map[string]string {
CPUTotalUsage: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.TotalUsage, 10),
CPUUsageInKernelmode: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.UsageInKernelmode, 10),
CPUSystemCPUUsage: strconv.FormatUint(c.latestStats.CPUStats.SystemCPUUsage, 10),
}
})
return result
}
67 changes: 67 additions & 0 deletions probe/docker/container_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package docker_test

import (
"bufio"
"encoding/json"
"io"
"net"
"net/http"
"runtime"
"testing"

client "github.com/fsouza/go-dockerclient"
"github.com/weaveworks/scope/probe/docker"
)

type mockConnection struct {
reader *io.PipeReader
}

func (c *mockConnection) Do(req *http.Request) (resp *http.Response, err error) {
return &http.Response{
Body: c.reader,
}, nil
}

func (c *mockConnection) Close() error {
return c.reader.Close()
}

func TestContainer(t *testing.T) {
oldDialStub, oldNewClientConnStub := docker.DialStub, docker.NewClientConnStub
defer func() { docker.DialStub, docker.NewClientConnStub = oldDialStub, oldNewClientConnStub }()

docker.DialStub = func(network, address string) (net.Conn, error) {
return nil, nil
}

reader, writer := io.Pipe()
connection := &mockConnection{reader}

docker.NewClientConnStub = func(c net.Conn, r *bufio.Reader) docker.ClientConn {
return connection
}

c := docker.NewContainer(container1)
err := c.StartGatheringStats()
if err != nil {
t.Errorf("%v", err)
}
defer c.StopGatheringStats()
runtime.Gosched() // wait for StartGatheringStats goroutine to call connection.Do

// Send some stats to the docker container
stats := &client.Stats{}
stats.MemoryStats.Usage = 12345
err = json.NewEncoder(writer).Encode(&stats)
if err != nil {
t.Errorf("%v", err)
}
runtime.Gosched() // wait for StartGatheringStats goroutine to receive the stats

// Now see if we go them
nmd := c.GetNodeMetadata()
if nmd[docker.MemoryUsage] != "12345" {
t.Errorf("want 12345, got %s", nmd[docker.MemoryUsage])
}
}
Loading

0 comments on commit 0e6f22d

Please sign in to comment.