Skip to content

Commit

Permalink
Merge pull request #1833 from weaveworks/simplify-container-stats
Browse files Browse the repository at this point in the history
use go-dockerclient's Client.Stats

Fixes #1799
  • Loading branch information
rade authored Aug 26, 2016
2 parents 7c2e3c8 + 3c4833f commit a5b491d
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 125 deletions.
123 changes: 35 additions & 88 deletions probe/docker/container.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package docker

import (
"bufio"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"sync"
"time"

log "github.com/Sirupsen/logrus"
docker "github.com/fsouza/go-dockerclient"
"github.com/ugorji/go/codec"

"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/report"
Expand Down Expand Up @@ -77,20 +72,9 @@ var (
StateDeleted = "deleted"
)

// Exported for testing
var (
DialStub = net.Dial
NewClientConnStub = newClientConn
)

func newClientConn(c net.Conn, r *bufio.Reader) ClientConn {
return httputil.NewClientConn(c, r)
}

// ClientConn is exported for testing
type ClientConn interface {
Do(req *http.Request) (resp *http.Response, err error)
Close() error
// StatsGatherer gathers container stats
type StatsGatherer interface {
Stats(docker.StatsOptions) error
}

// Container represents a Docker container
Expand All @@ -106,7 +90,7 @@ type Container interface {
StateString() string
HasTTY() bool
Container() *docker.Container
StartGatheringStats() error
StartGatheringStats(StatsGatherer) error
StopGatheringStats()
NetworkMode() (string, bool)
NetworkInfo([]net.IP) report.Sets
Expand All @@ -115,7 +99,7 @@ type Container interface {
type container struct {
sync.RWMutex
container *docker.Container
statsConn ClientConn
stopStats chan<- bool
latestStats docker.Stats
pendingStats [60]docker.Stats
numPending int
Expand Down Expand Up @@ -176,83 +160,50 @@ func (c *container) Container() *docker.Container {
return c.container
}

func (c *container) StartGatheringStats() error {
func (c *container) StartGatheringStats(client StatsGatherer) error {
c.Lock()
defer c.Unlock()

if c.statsConn != nil {
if c.stopStats != nil {
return nil
}
done := make(chan bool)
c.stopStats = done

stats := make(chan *docker.Stats)
opts := docker.StatsOptions{
ID: c.container.ID,
Stats: stats,
Stream: true,
Done: done,
}

go func() {
log.Debugf("docker container: collecting stats for %s", c.container.ID)
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
req.Header.Set("User-Agent", "weavescope")

url, err := url.Parse(endpoint)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
log.Debugf("docker container: collecting stats for %s", c.container.ID)

dial, err := DialStub(url.Scheme, url.Path)
if err != nil {
log.Errorf("docker container: %v", err)
return
}

conn := NewClientConnStub(dial, nil)
resp, err := conn.Do(req)
if err != nil {
log.Errorf("docker container: %v", err)
return
go func() {
if err := client.Stats(opts); err != nil && err != io.EOF && err != io.ErrClosedPipe {
log.Errorf("docker container: error collecting stats for %s: %v", c.container.ID, err)
}
defer resp.Body.Close()

c.Lock()
c.statsConn = conn
c.Unlock()
}()

defer func() {
c.Lock()
defer c.Unlock()

log.Debugf("docker container: stopped collecting stats for %s", c.container.ID)
c.statsConn = nil
}()

// Use a buffer since the codec library doesn't implicitly do it
bufReader := bufio.NewReader(resp.Body)
decoder := codec.NewDecoder(bufReader, &codec.JsonHandle{})
for {
var stats docker.Stats
if err := decoder.Decode(&stats); err != nil {
if err == io.EOF {
break
}
// Unfortunately we typically get a different error
// than io.EOF. Yes, this is really the best we can do
// in go - https://github.com/golang/go/issues/4373
if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" {
break
}
log.Errorf("docker container: error reading event for %s: %v", c.container.ID, err)
break
}
go func() {
for s := range stats {
c.Lock()
if c.numPending >= len(c.pendingStats) {
log.Warnf("docker container: dropping stats for %s", c.container.ID)
} else {
c.latestStats = stats
c.pendingStats[c.numPending] = stats
c.latestStats = *s
c.pendingStats[c.numPending] = *s
c.numPending++
}
c.Unlock()
}
log.Debugf("docker container: stopped collecting stats for %s", c.container.ID)
c.Lock()
if c.stopStats == done {
c.stopStats = nil
}
c.Unlock()
}()

return nil
Expand All @@ -261,14 +212,10 @@ func (c *container) StartGatheringStats() error {
func (c *container) StopGatheringStats() {
c.Lock()
defer c.Unlock()

if c.statsConn == nil {
return
if c.stopStats != nil {
close(c.stopStats)
c.stopStats = nil
}

c.statsConn.Close()
c.statsConn = nil
return
}

func (c *container) ports(localAddrs []net.IP) report.StringSet {
Expand Down
52 changes: 17 additions & 35 deletions probe/docker/container_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
package docker_test

import (
"bufio"
"io"
"io/ioutil"
"net"
"net/http"
"testing"
"time"

log "github.com/Sirupsen/logrus"
client "github.com/fsouza/go-dockerclient"
"github.com/ugorji/go/codec"

"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/probe/docker"
Expand All @@ -20,44 +14,35 @@ import (
"github.com/weaveworks/scope/test/reflect"
)

type mockConnection struct {
reader *io.PipeReader
type mockStatsGatherer struct {
opts client.StatsOptions
ready chan bool
}

func (c *mockConnection) Do(req *http.Request) (resp *http.Response, err error) {
return &http.Response{
Body: c.reader,
}, nil
func NewMockStatsGatherer() *mockStatsGatherer {
return &mockStatsGatherer{ready: make(chan bool)}
}

func (c *mockConnection) Close() error {
return c.reader.Close()
func (s *mockStatsGatherer) Stats(opts client.StatsOptions) error {
s.opts = opts
close(s.ready)
return nil
}

func TestContainer(t *testing.T) {
log.SetOutput(ioutil.Discard)

oldDialStub, oldNewClientConnStub := docker.DialStub, docker.NewClientConnStub
defer func() { docker.DialStub, docker.NewClientConnStub = oldDialStub, oldNewClientConnStub }()

docker.DialStub = func(network, address string) (net.Conn, error) {
return nil, nil
}

reader, writer := io.Pipe()
connection := &mockConnection{reader}

docker.NewClientConnStub = func(c net.Conn, r *bufio.Reader) docker.ClientConn {
return connection
}
func (s *mockStatsGatherer) Send(stats *client.Stats) {
<-s.ready
s.opts.Stats <- stats
}

func TestContainer(t *testing.T) {
now := time.Unix(12345, 67890).UTC()
mtime.NowForce(now)
defer mtime.NowReset()

const hostID = "scope"
c := docker.NewContainer(container1, hostID)
err := c.StartGatheringStats()
s := NewMockStatsGatherer()
err := c.StartGatheringStats(s)
if err != nil {
t.Errorf("%v", err)
}
Expand All @@ -68,10 +53,7 @@ func TestContainer(t *testing.T) {
stats.Read = now
stats.MemoryStats.Usage = 12345
stats.MemoryStats.Limit = 45678
encoder := codec.NewEncoder(writer, &codec.JsonHandle{})
if err = encoder.Encode(&stats); err != nil {
t.Error(err)
}
s.Send(stats)

// Now see if we go them
{
Expand Down
3 changes: 2 additions & 1 deletion probe/docker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Client interface {
AttachToContainerNonBlocking(docker_client.AttachToContainerOptions) (docker_client.CloseWaiter, error)
CreateExec(docker_client.CreateExecOptions) (*docker_client.Exec, error)
StartExecNonBlocking(string, docker_client.StartExecOptions) (docker_client.CloseWaiter, error)
Stats(docker_client.StatsOptions) error
}

func newDockerClient(endpoint string) (Client, error) {
Expand Down Expand Up @@ -361,7 +362,7 @@ func (r *registry) updateContainerState(containerID string, intendedState *strin
// And finally, ensure we gather stats for it
if r.collectStats {
if dockerContainer.State.Running {
if err := c.StartGatheringStats(); err != nil {
if err := c.StartGatheringStats(r.client); err != nil {
log.Errorf("Error gathering stats for container %s: %s", containerID, err)
return
}
Expand Down
6 changes: 5 additions & 1 deletion probe/docker/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *mockContainer) StateString() string {
return docker.StateRunning
}

func (c *mockContainer) StartGatheringStats() error {
func (c *mockContainer) StartGatheringStats(docker.StatsGatherer) error {
return nil
}

Expand Down Expand Up @@ -163,6 +163,10 @@ func (m *mockDockerClient) RemoveContainer(_ client.RemoveContainerOptions) erro
return fmt.Errorf("remove")
}

func (m *mockDockerClient) Stats(_ client.StatsOptions) error {
return fmt.Errorf("stats")
}

type mockCloseWaiter struct{}

func (mockCloseWaiter) Close() error { return nil }
Expand Down

0 comments on commit a5b491d

Please sign in to comment.