Skip to content

Commit

Permalink
Merge pull request #2743 from weaveworks/2738-prefer-full-reports
Browse files Browse the repository at this point in the history
new full reports are more important than old and shortcut reports

Fixes #2738.
  • Loading branch information
rade authored Jul 25, 2017
2 parents ea85296 + 3c6ae97 commit a50c290
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 11 deletions.
16 changes: 14 additions & 2 deletions probe/appclient/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type AppClient interface {
ControlConnection()
PipeConnection(string, xfer.Pipe)
PipeClose(string) error
Publish(r io.Reader) error
Publish(io.Reader, bool) error
Target() url.URL
ReTarget(url.URL)
Stop()
Expand Down Expand Up @@ -311,13 +311,25 @@ func (c *appClient) startPublishing() {
}

// Publish implements Publisher
func (c *appClient) Publish(r io.Reader) error {
func (c *appClient) Publish(r io.Reader, shortcut bool) error {
// Lazily start the background publishing loop.
c.publishLoop.Do(c.startPublishing)
// enqueue report
select {
case c.readers <- r:
default:
log.Errorf("Dropping report to %s", c.hostname)
if shortcut {
return nil
}
// drop an old report to make way for new one
c.mtx.Lock()
defer c.mtx.Unlock()
select {
case <-c.readers:
default:
}
c.readers <- r
}
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions probe/appclient/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type clientTuple struct {
// Publisher is something which can send a stream of data somewhere, probably
// to a remote collector.
type Publisher interface {
Publish(io.Reader) error
Publish(io.Reader, bool) error
Stop()
}

Expand All @@ -51,7 +51,7 @@ type MultiAppClient interface {
PipeConnection(appID, pipeID string, pipe xfer.Pipe) error
PipeClose(appID, pipeID string) error
Stop()
Publish(io.Reader) error
Publish(io.Reader, bool) error
}

// NewMultiAppClient creates a new MultiAppClient.
Expand Down Expand Up @@ -165,13 +165,13 @@ func (c *multiClient) Stop() {
// underlying publishers sequentially. To do that, it needs to drain the
// reader, and recreate new readers for each publisher. Note that it will
// publish to one endpoint for each unique ID. Failed publishes don't count.
func (c *multiClient) Publish(r io.Reader) error {
func (c *multiClient) Publish(r io.Reader, shortcut bool) error {
c.mtx.Lock()
defer c.mtx.Unlock()

if len(c.clients) <= 1 { // optimisation
for _, c := range c.clients {
return c.Publish(r)
return c.Publish(r, shortcut)
}
return nil
}
Expand All @@ -183,7 +183,7 @@ func (c *multiClient) Publish(r io.Reader) error {

errs := []string{}
for _, c := range c.clients {
if err := c.Publish(bytes.NewReader(buf)); err != nil {
if err := c.Publish(bytes.NewReader(buf), shortcut); err != nil {
errs = append(errs, err.Error())
}
}
Expand Down
4 changes: 2 additions & 2 deletions probe/appclient/multi_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *mockClient) Stop() {
c.stopped++
}

func (c *mockClient) Publish(io.Reader) error {
func (c *mockClient) Publish(io.Reader, bool) error {
c.publish++
return nil
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestMultiClientPublish(t *testing.T) {
mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}})

for i := 1; i < 10; i++ {
if err := mp.Publish(&bytes.Buffer{}); err != nil {
if err := mp.Publish(&bytes.Buffer{}, false); err != nil {
t.Error(err)
}
if want, have := 3*i, sum(); want != have {
Expand Down
2 changes: 1 addition & 1 deletion probe/appclient/report_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ func (p *ReportPublisher) Publish(r report.Report) error {
}
buf := &bytes.Buffer{}
r.WriteBinary(buf, gzip.DefaultCompression)
return p.publisher.Publish(buf)
return p.publisher.Publish(buf, r.Shortcut)
}
2 changes: 1 addition & 1 deletion probe/probe_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type mockPublisher struct {
have chan report.Report
}

func (m mockPublisher) Publish(in io.Reader) error {
func (m mockPublisher) Publish(in io.Reader, shortcut bool) error {
var r report.Report
if reader, err := gzip.NewReader(in); err != nil {
return err
Expand Down

0 comments on commit a50c290

Please sign in to comment.