Skip to content

Commit

Permalink
Only serialise and compress reports once per publish.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Wilkie committed Sep 9, 2015
1 parent d5570f2 commit bb20a81
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 29 deletions.
3 changes: 2 additions & 1 deletion experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ func main() {
if err != nil {
log.Fatal(err)
}
rp := xfer.NewReportPublisher(publisher)

rand.Seed(time.Now().UnixNano())
for range time.Tick(*publishInterval) {
if err := publisher.Publish(demoReport(*hostCount)); err != nil {
if err := rp.Publish(demoReport(*hostCount)); err != nil {
log.Print(err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion experimental/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func main() {
log.Fatal(err)
}

rp := xfer.NewReportPublisher(publisher)
for range time.Tick(*publishInterval) {
publisher.Publish(fixedReport)
rp.Publish(fixedReport)
}
}
3 changes: 2 additions & 1 deletion probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,15 @@ func main() {
pubTick = time.Tick(*publishInterval)
spyTick = time.Tick(*spyInterval)
r = report.MakeReport()
p = xfer.NewReportPublisher(publishers)
)

for {
select {
case <-pubTick:
publishTicks.WithLabelValues().Add(1)
r.Window = *publishInterval
if err := publishers.Publish(r); err != nil {
if err := p.Publish(r); err != nil {
log.Printf("publish: %v", err)
}
r = report.MakeReport()
Expand Down
36 changes: 15 additions & 21 deletions xfer/publisher.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
package xfer

import (
"bytes"
"compress/gzip"
"encoding/gob"
"fmt"
"log"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/weaveworks/scope/report"
)

const (
Expand All @@ -22,7 +17,7 @@ const (

// Publisher is something which can send a report to a remote collector.
type Publisher interface {
Publish(report.Report) error
Publish(*Buffer) error
Stop()
}

Expand Down Expand Up @@ -63,16 +58,10 @@ func (p HTTPPublisher) String() string {
}

// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(rpt report.Report) error {
gzbuf := bytes.Buffer{}
gzwriter := gzip.NewWriter(&gzbuf)

if err := gob.NewEncoder(gzwriter).Encode(rpt); err != nil {
return err
}
gzwriter.Close() // otherwise the content won't get flushed to the output stream
func (p HTTPPublisher) Publish(buf *Buffer) error {
defer buf.Put()

req, err := http.NewRequest("POST", p.url, &gzbuf)
req, err := http.NewRequest("POST", p.url, buf)
if err != nil {
return err
}
Expand Down Expand Up @@ -108,15 +97,15 @@ func AuthorizationHeader(token string) string {
// concurrent publishes are dropped.
type BackgroundPublisher struct {
publisher Publisher
reports chan report.Report
reports chan *Buffer
quit chan struct{}
}

// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
result := &BackgroundPublisher{
publisher: p,
reports: make(chan report.Report),
reports: make(chan *Buffer),
quit: make(chan struct{}),
}
go result.loop()
Expand Down Expand Up @@ -146,9 +135,9 @@ func (b *BackgroundPublisher) loop() {
}

// Publish implements Publisher
func (b *BackgroundPublisher) Publish(r report.Report) error {
func (b *BackgroundPublisher) Publish(buf *Buffer) error {
select {
case b.reports <- r:
case b.reports <- buf:
default:
}
return nil
Expand Down Expand Up @@ -198,13 +187,18 @@ func (p *MultiPublisher) Add(target string) {
}

// Publish implements Publisher by emitting the report to all publishers.
func (p *MultiPublisher) Publish(rpt report.Report) error {
func (p *MultiPublisher) Publish(buf *Buffer) error {
p.mtx.RLock()
defer p.mtx.RUnlock()

// First take a reference for each publisher
for range p.m {
buf.Get()
}

var errs []string
for _, publisher := range p.m {
if err := publisher.Publish(rpt); err != nil {
if err := publisher.Publish(buf); err != nil {
errs = append(errs, err.Error())
}
}
Expand Down
11 changes: 6 additions & 5 deletions xfer/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func TestHTTPPublisher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := p.Publish(rpt); err != nil {
rp := xfer.NewReportPublisher(p)
if err := rp.Publish(rpt); err != nil {
t.Error(err)
}

Expand All @@ -83,15 +84,15 @@ func TestMultiPublisher(t *testing.T) {
)

multiPublisher.Add("first")
if err := multiPublisher.Publish(report.MakeReport()); err != nil {
if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil {
t.Error(err)
}
if want, have := 1, p.count; want != have {
t.Errorf("want %d, have %d", want, have)
}

multiPublisher.Add("second") // but factory returns same mockPublisher
if err := multiPublisher.Publish(report.MakeReport()); err != nil {
if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil {
t.Error(err)
}
if want, have := 3, p.count; want != have {
Expand All @@ -101,5 +102,5 @@ func TestMultiPublisher(t *testing.T) {

type mockPublisher struct{ count int }

func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}
func (p *mockPublisher) Publish(*xfer.Buffer) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}
79 changes: 79 additions & 0 deletions xfer/report_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package xfer

import (
"bytes"
"compress/gzip"
"encoding/gob"
"sync"
"sync/atomic"

"github.com/weaveworks/scope/report"
)

// A Buffer is a reference counted bytes.Buffer, which belongs
// to a sync.Pool
type Buffer struct {
bytes.Buffer
pool *sync.Pool
refs int32
}

// NewBuffer creates a new buffer
func NewBuffer(pool *sync.Pool) *Buffer {
return &Buffer{
pool: pool,
refs: 0,
}
}

// Get increases the reference count. It is safe for concurrent calls.
func (b *Buffer) Get() {
atomic.AddInt32(&b.refs, 1)
}

// Put decreases the reference count, and when it hits zero, puts the
// buffer back in the pool.
func (b *Buffer) Put() {
if atomic.AddInt32(&b.refs, -1) == 0 {
b.Reset()
b.pool.Put(b)
}
}

// NewBufferPool creates a new buffer pool.
func NewBufferPool() *sync.Pool {
result := &sync.Pool{}
result.New = func() interface{} {
return NewBuffer(result)
}
return result
}

// A ReportPublisher uses a buffer pool to serialise reports, which it
// then passes to a publisher
type ReportPublisher struct {
buffers *sync.Pool
publisher Publisher
}

// NewReportPublisher creates a new report publisher
func NewReportPublisher(publisher Publisher) *ReportPublisher {
return &ReportPublisher{
buffers: NewBufferPool(),
publisher: publisher,
}
}

// Publish serialises and compresses a report, then passes it to a publisher
func (p *ReportPublisher) Publish(r report.Report) error {
buf := p.buffers.Get().(*Buffer)
gzwriter := gzip.NewWriter(buf)
if err := gob.NewEncoder(gzwriter).Encode(r); err != nil {
buf.Reset()
p.buffers.Put(buf)
return err
}
gzwriter.Close() // otherwise the content won't get flushed to the output stream

return p.publisher.Publish(buf)
}

0 comments on commit bb20a81

Please sign in to comment.