From 3c4833fe97786d8f4803349bf00bdeab74aaf8e5 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 26 Aug 2016 11:50:59 +0100 Subject: [PATCH] use go-dockerclient's Client.Stats ...instead of rolling our own. This also fixes #1799 - a race condition that could result in multiple stats gatherers for a container. --- probe/docker/container.go | 123 ++++++++++----------------------- probe/docker/container_test.go | 52 +++++--------- probe/docker/registry.go | 3 +- probe/docker/registry_test.go | 6 +- 4 files changed, 59 insertions(+), 125 deletions(-) diff --git a/probe/docker/container.go b/probe/docker/container.go index c6ed853915..7427b7180e 100644 --- a/probe/docker/container.go +++ b/probe/docker/container.go @@ -1,13 +1,9 @@ package docker import ( - "bufio" "fmt" "io" "net" - "net/http" - "net/http/httputil" - "net/url" "strconv" "strings" "sync" @@ -15,7 +11,6 @@ import ( log "github.com/Sirupsen/logrus" docker "github.com/fsouza/go-dockerclient" - "github.com/ugorji/go/codec" "github.com/weaveworks/scope/common/mtime" "github.com/weaveworks/scope/report" @@ -77,20 +72,9 @@ var ( StateDeleted = "deleted" ) -// Exported for testing -var ( - DialStub = net.Dial - NewClientConnStub = newClientConn -) - -func newClientConn(c net.Conn, r *bufio.Reader) ClientConn { - return httputil.NewClientConn(c, r) -} - -// ClientConn is exported for testing -type ClientConn interface { - Do(req *http.Request) (resp *http.Response, err error) - Close() error +// StatsGatherer gathers container stats +type StatsGatherer interface { + Stats(docker.StatsOptions) error } // Container represents a Docker container @@ -106,7 +90,7 @@ type Container interface { StateString() string HasTTY() bool Container() *docker.Container - StartGatheringStats() error + StartGatheringStats(StatsGatherer) error StopGatheringStats() NetworkMode() (string, bool) NetworkInfo([]net.IP) report.Sets @@ -115,7 +99,7 @@ type Container interface { type container struct { sync.RWMutex container *docker.Container - statsConn ClientConn + stopStats chan<- bool latestStats docker.Stats pendingStats [60]docker.Stats numPending int @@ -176,83 +160,50 @@ func (c *container) Container() *docker.Container { return c.container } -func (c *container) StartGatheringStats() error { +func (c *container) StartGatheringStats(client StatsGatherer) error { c.Lock() defer c.Unlock() - if c.statsConn != nil { + if c.stopStats != nil { return nil } + done := make(chan bool) + c.stopStats = done + + stats := make(chan *docker.Stats) + opts := docker.StatsOptions{ + ID: c.container.ID, + Stats: stats, + Stream: true, + Done: done, + } - go func() { - log.Debugf("docker container: collecting stats for %s", c.container.ID) - req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil) - if err != nil { - log.Errorf("docker container: %v", err) - return - } - req.Header.Set("User-Agent", "weavescope") - - url, err := url.Parse(endpoint) - if err != nil { - log.Errorf("docker container: %v", err) - return - } + log.Debugf("docker container: collecting stats for %s", c.container.ID) - dial, err := DialStub(url.Scheme, url.Path) - if err != nil { - log.Errorf("docker container: %v", err) - return - } - - conn := NewClientConnStub(dial, nil) - resp, err := conn.Do(req) - if err != nil { - log.Errorf("docker container: %v", err) - return + go func() { + if err := client.Stats(opts); err != nil && err != io.EOF && err != io.ErrClosedPipe { + log.Errorf("docker container: error collecting stats for %s: %v", c.container.ID, err) } - defer resp.Body.Close() - - c.Lock() - c.statsConn = conn - c.Unlock() + }() - defer func() { - c.Lock() - defer c.Unlock() - - log.Debugf("docker container: stopped collecting stats for %s", c.container.ID) - c.statsConn = nil - }() - - // Use a buffer since the codec library doesn't implicitly do it - bufReader := bufio.NewReader(resp.Body) - decoder := codec.NewDecoder(bufReader, &codec.JsonHandle{}) - for { - var stats docker.Stats - if err := decoder.Decode(&stats); err != nil { - if err == io.EOF { - break - } - // Unfortunately we typically get a different error - // than io.EOF. Yes, this is really the best we can do - // in go - https://github.com/golang/go/issues/4373 - if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" { - break - } - log.Errorf("docker container: error reading event for %s: %v", c.container.ID, err) - break - } + go func() { + for s := range stats { c.Lock() if c.numPending >= len(c.pendingStats) { log.Warnf("docker container: dropping stats for %s", c.container.ID) } else { - c.latestStats = stats - c.pendingStats[c.numPending] = stats + c.latestStats = *s + c.pendingStats[c.numPending] = *s c.numPending++ } c.Unlock() } + log.Debugf("docker container: stopped collecting stats for %s", c.container.ID) + c.Lock() + if c.stopStats == done { + c.stopStats = nil + } + c.Unlock() }() return nil @@ -261,14 +212,10 @@ func (c *container) StartGatheringStats() error { func (c *container) StopGatheringStats() { c.Lock() defer c.Unlock() - - if c.statsConn == nil { - return + if c.stopStats != nil { + close(c.stopStats) + c.stopStats = nil } - - c.statsConn.Close() - c.statsConn = nil - return } func (c *container) ports(localAddrs []net.IP) report.StringSet { diff --git a/probe/docker/container_test.go b/probe/docker/container_test.go index 28c5c54fdf..75f3266f89 100644 --- a/probe/docker/container_test.go +++ b/probe/docker/container_test.go @@ -1,17 +1,11 @@ package docker_test import ( - "bufio" - "io" - "io/ioutil" "net" - "net/http" "testing" "time" - log "github.com/Sirupsen/logrus" client "github.com/fsouza/go-dockerclient" - "github.com/ugorji/go/codec" "github.com/weaveworks/scope/common/mtime" "github.com/weaveworks/scope/probe/docker" @@ -20,44 +14,35 @@ import ( "github.com/weaveworks/scope/test/reflect" ) -type mockConnection struct { - reader *io.PipeReader +type mockStatsGatherer struct { + opts client.StatsOptions + ready chan bool } -func (c *mockConnection) Do(req *http.Request) (resp *http.Response, err error) { - return &http.Response{ - Body: c.reader, - }, nil +func NewMockStatsGatherer() *mockStatsGatherer { + return &mockStatsGatherer{ready: make(chan bool)} } -func (c *mockConnection) Close() error { - return c.reader.Close() +func (s *mockStatsGatherer) Stats(opts client.StatsOptions) error { + s.opts = opts + close(s.ready) + return nil } -func TestContainer(t *testing.T) { - log.SetOutput(ioutil.Discard) - - oldDialStub, oldNewClientConnStub := docker.DialStub, docker.NewClientConnStub - defer func() { docker.DialStub, docker.NewClientConnStub = oldDialStub, oldNewClientConnStub }() - - docker.DialStub = func(network, address string) (net.Conn, error) { - return nil, nil - } - - reader, writer := io.Pipe() - connection := &mockConnection{reader} - - docker.NewClientConnStub = func(c net.Conn, r *bufio.Reader) docker.ClientConn { - return connection - } +func (s *mockStatsGatherer) Send(stats *client.Stats) { + <-s.ready + s.opts.Stats <- stats +} +func TestContainer(t *testing.T) { now := time.Unix(12345, 67890).UTC() mtime.NowForce(now) defer mtime.NowReset() const hostID = "scope" c := docker.NewContainer(container1, hostID) - err := c.StartGatheringStats() + s := NewMockStatsGatherer() + err := c.StartGatheringStats(s) if err != nil { t.Errorf("%v", err) } @@ -68,10 +53,7 @@ func TestContainer(t *testing.T) { stats.Read = now stats.MemoryStats.Usage = 12345 stats.MemoryStats.Limit = 45678 - encoder := codec.NewEncoder(writer, &codec.JsonHandle{}) - if err = encoder.Encode(&stats); err != nil { - t.Error(err) - } + s.Send(stats) // Now see if we go them { diff --git a/probe/docker/registry.go b/probe/docker/registry.go index 995aca8584..1f533806b8 100644 --- a/probe/docker/registry.go +++ b/probe/docker/registry.go @@ -85,6 +85,7 @@ type Client interface { AttachToContainerNonBlocking(docker_client.AttachToContainerOptions) (docker_client.CloseWaiter, error) CreateExec(docker_client.CreateExecOptions) (*docker_client.Exec, error) StartExecNonBlocking(string, docker_client.StartExecOptions) (docker_client.CloseWaiter, error) + Stats(docker_client.StatsOptions) error } func newDockerClient(endpoint string) (Client, error) { @@ -361,7 +362,7 @@ func (r *registry) updateContainerState(containerID string, intendedState *strin // And finally, ensure we gather stats for it if r.collectStats { if dockerContainer.State.Running { - if err := c.StartGatheringStats(); err != nil { + if err := c.StartGatheringStats(r.client); err != nil { log.Errorf("Error gathering stats for container %s: %s", containerID, err) return } diff --git a/probe/docker/registry_test.go b/probe/docker/registry_test.go index afeaf29b92..cb57628910 100644 --- a/probe/docker/registry_test.go +++ b/probe/docker/registry_test.go @@ -55,7 +55,7 @@ func (c *mockContainer) StateString() string { return docker.StateRunning } -func (c *mockContainer) StartGatheringStats() error { +func (c *mockContainer) StartGatheringStats(docker.StatsGatherer) error { return nil } @@ -163,6 +163,10 @@ func (m *mockDockerClient) RemoveContainer(_ client.RemoveContainerOptions) erro return fmt.Errorf("remove") } +func (m *mockDockerClient) Stats(_ client.StatsOptions) error { + return fmt.Errorf("stats") +} + type mockCloseWaiter struct{} func (mockCloseWaiter) Close() error { return nil }