Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More tests for docker integration #257

Merged
merged 1 commit into from
Jun 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/*

$(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go

$(PROBE_EXE): probe/*.go probe/tag/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go probe/tag/*.go probe/docker/*.go report/*.go xfer/*.go

$(APP_EXE) $(PROBE_EXE):
go get -tags netgo ./$(@D)
Expand Down
134 changes: 98 additions & 36 deletions probe/tag/docker_container.go → probe/docker/container.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tag
package docker

import (
"bufio"
"encoding/json"
"fmt"
"io"
Expand All @@ -10,9 +11,12 @@ import (
"net/http/httputil"
"net/url"
"strconv"
"strings"
"sync"

docker "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/scope/report"
)

// These constants are keys used in node metadata
Expand All @@ -39,51 +43,102 @@ const (
CPUSystemCPUUsage = "cpu_system_cpu_usage"
)

type dockerContainer struct {
sync.RWMutex
*docker.Container
// Exported for testing
var (
DialStub = net.Dial
NewClientConnStub = newClientConn
)

func newClientConn(c net.Conn, r *bufio.Reader) ClientConn {
return httputil.NewClientConn(c, r)
}

// ClientConn is exported for testing
type ClientConn interface {
Do(req *http.Request) (resp *http.Response, err error)
Close() error
}

// Container represents a docker container
type Container interface {
ID() string
Image() string
PID() int
GetNodeMetadata() report.NodeMetadata

StartGatheringStats() error
StopGatheringStats()
}

statsConn *httputil.ClientConn
type container struct {
sync.RWMutex
container *docker.Container
statsConn ClientConn
latestStats *docker.Stats
}

// called whilst holding t.Lock() for writes
func (c *dockerContainer) startGatheringStats(containerID string) error {
// NewContainer creates a new Container
func NewContainer(c *docker.Container) Container {
return &container{container: c}
}

func (c *container) ID() string {
return c.container.ID
}

func (c *container) Image() string {
return c.container.Image
}

func (c *container) PID() int {
return c.container.State.Pid
}

func (c *container) StartGatheringStats() error {
c.Lock()
defer c.Unlock()

if c.statsConn != nil {
return fmt.Errorf("already gather stats for container %s", containerID)
return fmt.Errorf("already gather stats for container %s", c.container.ID)
}

log.Printf("docker mapper: collecting stats for %s", containerID)
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", containerID), nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", "weavescope")
go func() {
log.Printf("docker container: collecting stats for %s", c.container.ID)
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil)
if err != nil {
log.Printf("docker container: %v", err)
return
}
req.Header.Set("User-Agent", "weavescope")

url, err := url.Parse(endpoint)
if err != nil {
return err
}
url, err := url.Parse(endpoint)
if err != nil {
log.Printf("docker container: %v", err)
return
}

dial, err := net.Dial(url.Scheme, url.Path)
if err != nil {
return err
}
dial, err := net.Dial(url.Scheme, url.Path)
if err != nil {
log.Printf("docker container: %v", err)
return
}

conn := httputil.NewClientConn(dial, nil)
resp, err := conn.Do(req)
if err != nil {
return err
}
conn := NewClientConnStub(dial, nil)
resp, err := conn.Do(req)
if err != nil {
log.Printf("docker container: %v", err)
return
}

c.statsConn = conn
c.Lock()
c.statsConn = conn
c.Unlock()

go func() {
defer func() {
c.Lock()
defer c.Unlock()

log.Printf("docker mapper: stopped collecting stats for %s", containerID)
log.Printf("docker container: stopped collecting stats for %s", c.container.ID)
c.statsConn = nil
c.latestStats = nil
}()
Expand All @@ -93,7 +148,7 @@ func (c *dockerContainer) startGatheringStats(containerID string) error {

for err := decoder.Decode(&stats); err != io.EOF; err = decoder.Decode(&stats) {
if err != nil {
log.Printf("docker mapper: error reading event %v", err)
log.Printf("docker container: error reading event %v", err)
return
}

Expand All @@ -109,7 +164,7 @@ func (c *dockerContainer) startGatheringStats(containerID string) error {
}

// called whilst holding t.Lock()
func (c *dockerContainer) stopGatheringStats(containerID string) {
func (c *container) StopGatheringStats() {
c.Lock()
defer c.Unlock()

Expand All @@ -124,15 +179,21 @@ func (c *dockerContainer) stopGatheringStats(containerID string) {
}

// called whilst holding t.RLock()
func (c *dockerContainer) getStats() map[string]string {
func (c *container) GetNodeMetadata() report.NodeMetadata {
c.RLock()
defer c.RUnlock()

result := report.NodeMetadata{
ContainerID: c.ID(),
ContainerName: strings.TrimPrefix(c.container.Name, "/"),
ImageID: c.container.Image,
}

if c.latestStats == nil {
return map[string]string{}
return result
}

return map[string]string{
result.Merge(report.NodeMetadata{
NetworkRxDropped: strconv.FormatUint(c.latestStats.Network.RxDropped, 10),
NetworkRxBytes: strconv.FormatUint(c.latestStats.Network.RxBytes, 10),
NetworkRxErrors: strconv.FormatUint(c.latestStats.Network.RxErrors, 10),
Expand All @@ -152,5 +213,6 @@ func (c *dockerContainer) getStats() map[string]string {
CPUTotalUsage: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.TotalUsage, 10),
CPUUsageInKernelmode: strconv.FormatUint(c.latestStats.CPUStats.CPUUsage.UsageInKernelmode, 10),
CPUSystemCPUUsage: strconv.FormatUint(c.latestStats.CPUStats.SystemCPUUsage, 10),
}
})
return result
}
67 changes: 67 additions & 0 deletions probe/docker/container_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package docker_test

import (
"bufio"
"encoding/json"
"io"
"net"
"net/http"
"runtime"
"testing"

client "github.com/fsouza/go-dockerclient"
"github.com/weaveworks/scope/probe/docker"
)

type mockConnection struct {
reader *io.PipeReader
}

func (c *mockConnection) Do(req *http.Request) (resp *http.Response, err error) {
return &http.Response{
Body: c.reader,
}, nil
}

func (c *mockConnection) Close() error {
return c.reader.Close()
}

func TestContainer(t *testing.T) {
oldDialStub, oldNewClientConnStub := docker.DialStub, docker.NewClientConnStub
defer func() { docker.DialStub, docker.NewClientConnStub = oldDialStub, oldNewClientConnStub }()

docker.DialStub = func(network, address string) (net.Conn, error) {
return nil, nil
}

reader, writer := io.Pipe()
connection := &mockConnection{reader}

docker.NewClientConnStub = func(c net.Conn, r *bufio.Reader) docker.ClientConn {
return connection
}

c := docker.NewContainer(container1)
err := c.StartGatheringStats()
if err != nil {
t.Errorf("%v", err)
}
defer c.StopGatheringStats()
runtime.Gosched() // wait for StartGatheringStats goroutine to call connection.Do

// Send some stats to the docker container
stats := &client.Stats{}
stats.MemoryStats.Usage = 12345
err = json.NewEncoder(writer).Encode(&stats)
if err != nil {
t.Errorf("%v", err)
}
runtime.Gosched() // wait for StartGatheringStats goroutine to receive the stats

// Now see if we go them
nmd := c.GetNodeMetadata()
if nmd[docker.MemoryUsage] != "12345" {
t.Errorf("want 12345, got %s", nmd[docker.MemoryUsage])
}
}
Loading