From c1a9de77c76402c1130ffa33664b841c94a2319c Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 14 Sep 2015 10:12:31 +0000 Subject: [PATCH] Can't share buffers like that! --- app/router.go | 10 ++++++-- xfer/buffer.go | 49 +++++++++++++++++++++++++++++++++------- xfer/publisher.go | 31 +++++++++++++------------ xfer/publisher_test.go | 9 ++++---- xfer/report_publisher.go | 7 +++--- 5 files changed, 75 insertions(+), 31 deletions(-) diff --git a/app/router.go b/app/router.go index ccdcadee49..3586074048 100644 --- a/app/router.go +++ b/app/router.go @@ -3,6 +3,7 @@ package main import ( "compress/gzip" "encoding/gob" + "log" "net/http" "net/url" "strings" @@ -76,6 +77,11 @@ func Router(c collector) *mux.Router { return router } +func badRequest(r *http.Request, w http.ResponseWriter, err error) { + http.Error(w, err.Error(), http.StatusBadRequest) + log.Printf("Error procressing request for %s: %v", r.URL.Path, err) +} + func makeReportPostHandler(a xfer.Adder) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var ( @@ -86,13 +92,13 @@ func makeReportPostHandler(a xfer.Adder) http.HandlerFunc { if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") { reader, err = gzip.NewReader(r.Body) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + badRequest(r, w, err) return } } if err := gob.NewDecoder(reader).Decode(&rpt); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + badRequest(r, w, err) return } a.Add(rpt) diff --git a/xfer/buffer.go b/xfer/buffer.go index 452411e52e..e9d597d92b 100644 --- a/xfer/buffer.go +++ b/xfer/buffer.go @@ -2,40 +2,73 @@ package xfer import ( "bytes" + "io" "sync" "sync/atomic" ) // A Buffer is a reference counted bytes.Buffer, which belongs // to a sync.Pool -type Buffer struct { +type Buffer interface { + io.Reader + io.Writer + + // Get returns a new buffer sharing the contents of this + // buffer, but with its own cursor. Get also increases + // the reference count. It is safe for concurrent calls. + Get() Buffer + + // Put decreases the reference count, and when it hits zero, puts the + // buffer back in the pool. + Put() +} + +type baseBuffer struct { bytes.Buffer pool *sync.Pool refs int32 } +type dependentBuffer struct { + *bytes.Buffer + buf *baseBuffer +} + // NewBuffer creates a new buffer -func NewBuffer(pool *sync.Pool) *Buffer { - return &Buffer{ +func NewBuffer(pool *sync.Pool) Buffer { + return &baseBuffer{ pool: pool, refs: 0, } } -// Get increases the reference count. It is safe for concurrent calls. -func (b *Buffer) Get() { +// Get implements Buffer +func (b *baseBuffer) Get() Buffer { atomic.AddInt32(&b.refs, 1) + return &dependentBuffer{ + Buffer: bytes.NewBuffer(b.Bytes()), + buf: b, + } } -// Put decreases the reference count, and when it hits zero, puts the -// buffer back in the pool. -func (b *Buffer) Put() { +// Put implements Buffer +func (b *baseBuffer) Put() { if atomic.AddInt32(&b.refs, -1) == 0 { b.Reset() b.pool.Put(b) } } +// Get implements Buffer +func (b *dependentBuffer) Get() Buffer { + return b.buf.Get() +} + +// Put implements Buffer +func (b *dependentBuffer) Put() { + b.buf.Put() +} + // NewBufferPool creates a new buffer pool. func NewBufferPool() *sync.Pool { result := &sync.Pool{} diff --git a/xfer/publisher.go b/xfer/publisher.go index d1612e78eb..c2f801447c 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -18,7 +18,7 @@ const ( // Publisher is something which can send a buffered set of data somewhere, // probably to a collector. type Publisher interface { - Publish(*Buffer) error + Publish(Buffer) error Stop() } @@ -59,9 +59,7 @@ func (p HTTPPublisher) String() string { } // Publish publishes the report to the URL. -func (p HTTPPublisher) Publish(buf *Buffer) error { - defer buf.Put() - +func (p HTTPPublisher) Publish(buf Buffer) error { req, err := http.NewRequest("POST", p.url, buf) if err != nil { return err @@ -98,7 +96,7 @@ func AuthorizationHeader(token string) string { // concurrent publishes are dropped. type BackgroundPublisher struct { publisher Publisher - reports chan *Buffer + reports chan Buffer quit chan struct{} } @@ -106,7 +104,7 @@ type BackgroundPublisher struct { func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { result := &BackgroundPublisher{ publisher: p, - reports: make(chan *Buffer), + reports: make(chan Buffer), quit: make(chan struct{}), } go result.loop() @@ -116,8 +114,9 @@ func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { func (b *BackgroundPublisher) loop() { backoff := initialBackoff - for r := range b.reports { - err := b.publisher.Publish(r) + for buf := range b.reports { + err := b.publisher.Publish(buf) + buf.Put() if err == nil { backoff = initialBackoff continue @@ -136,10 +135,13 @@ func (b *BackgroundPublisher) loop() { } // Publish implements Publisher -func (b *BackgroundPublisher) Publish(buf *Buffer) error { +func (b *BackgroundPublisher) Publish(buf Buffer) error { + buf = buf.Get() + select { case b.reports <- buf: default: + buf.Put() } return nil } @@ -188,14 +190,15 @@ func (p *MultiPublisher) Add(target string) { } // Publish implements Publisher by emitting the report to all publishers. -func (p *MultiPublisher) Publish(buf *Buffer) error { +func (p *MultiPublisher) Publish(buf Buffer) error { p.mtx.RLock() defer p.mtx.RUnlock() - // First take a reference for each publisher - for range p.m { - buf.Get() - } + // First take a reference for me, to prevent one of + // the many publishers returning before I get a chance + // to give references to the other publishers. + buf = buf.Get() + defer buf.Put() var errs []string for _, publisher := range p.m { diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index 195d0b0bf1..b6393f0b88 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -81,10 +81,11 @@ func TestMultiPublisher(t *testing.T) { p = &mockPublisher{} factory = func(string) (xfer.Publisher, error) { return p, nil } multiPublisher = xfer.NewMultiPublisher(factory) + buffers = xfer.NewBufferPool() ) multiPublisher.Add("first") - if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil { + if err := multiPublisher.Publish(buffers.Get().(xfer.Buffer)); err != nil { t.Error(err) } if want, have := 1, p.count; want != have { @@ -92,7 +93,7 @@ func TestMultiPublisher(t *testing.T) { } multiPublisher.Add("second") // but factory returns same mockPublisher - if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil { + if err := multiPublisher.Publish(buffers.Get().(xfer.Buffer)); err != nil { t.Error(err) } if want, have := 3, p.count; want != have { @@ -102,5 +103,5 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } -func (p *mockPublisher) Publish(*xfer.Buffer) error { p.count++; return nil } -func (p *mockPublisher) Stop() {} +func (p *mockPublisher) Publish(xfer.Buffer) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} diff --git a/xfer/report_publisher.go b/xfer/report_publisher.go index cdf5a0101f..1b0eaaf1e1 100644 --- a/xfer/report_publisher.go +++ b/xfer/report_publisher.go @@ -25,11 +25,12 @@ func NewReportPublisher(publisher Publisher) *ReportPublisher { // Publish serialises and compresses a report, then passes it to a publisher func (p *ReportPublisher) Publish(r report.Report) error { - buf := p.buffers.Get().(*Buffer) + buf := p.buffers.Get().(Buffer) + buf.Get() + defer buf.Put() + gzwriter := gzip.NewWriter(buf) if err := gob.NewEncoder(gzwriter).Encode(r); err != nil { - buf.Reset() - p.buffers.Put(buf) return err } gzwriter.Close() // otherwise the content won't get flushed to the output stream