Skip to content

Commit

Permalink
Can't share buffers like that!
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Wilkie committed Sep 14, 2015
1 parent bf3e9a1 commit c1a9de7
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 31 deletions.
10 changes: 8 additions & 2 deletions app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"compress/gzip"
"encoding/gob"
"log"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -76,6 +77,11 @@ func Router(c collector) *mux.Router {
return router
}

func badRequest(r *http.Request, w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusBadRequest)
log.Printf("Error procressing request for %s: %v", r.URL.Path, err)
}

func makeReportPostHandler(a xfer.Adder) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var (
Expand All @@ -86,13 +92,13 @@ func makeReportPostHandler(a xfer.Adder) http.HandlerFunc {
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
reader, err = gzip.NewReader(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
badRequest(r, w, err)
return
}
}

if err := gob.NewDecoder(reader).Decode(&rpt); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
badRequest(r, w, err)
return
}
a.Add(rpt)
Expand Down
49 changes: 41 additions & 8 deletions xfer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,73 @@ package xfer

import (
"bytes"
"io"
"sync"
"sync/atomic"
)

// A Buffer is a reference counted bytes.Buffer, which belongs
// to a sync.Pool
type Buffer struct {
type Buffer interface {
io.Reader
io.Writer

// Get returns a new buffer sharing the contents of this
// buffer, but with its own cursor. Get also increases
// the reference count. It is safe for concurrent calls.
Get() Buffer

// Put decreases the reference count, and when it hits zero, puts the
// buffer back in the pool.
Put()
}

type baseBuffer struct {
bytes.Buffer
pool *sync.Pool
refs int32
}

type dependentBuffer struct {
*bytes.Buffer
buf *baseBuffer
}

// NewBuffer creates a new buffer
func NewBuffer(pool *sync.Pool) *Buffer {
return &Buffer{
func NewBuffer(pool *sync.Pool) Buffer {
return &baseBuffer{
pool: pool,
refs: 0,
}
}

// Get increases the reference count. It is safe for concurrent calls.
func (b *Buffer) Get() {
// Get implements Buffer
func (b *baseBuffer) Get() Buffer {
atomic.AddInt32(&b.refs, 1)
return &dependentBuffer{
Buffer: bytes.NewBuffer(b.Bytes()),
buf: b,
}
}

// Put decreases the reference count, and when it hits zero, puts the
// buffer back in the pool.
func (b *Buffer) Put() {
// Put implements Buffer
func (b *baseBuffer) Put() {
if atomic.AddInt32(&b.refs, -1) == 0 {
b.Reset()
b.pool.Put(b)
}
}

// Get implements Buffer
func (b *dependentBuffer) Get() Buffer {
return b.buf.Get()
}

// Put implements Buffer
func (b *dependentBuffer) Put() {
b.buf.Put()
}

// NewBufferPool creates a new buffer pool.
func NewBufferPool() *sync.Pool {
result := &sync.Pool{}
Expand Down
31 changes: 17 additions & 14 deletions xfer/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
// Publisher is something which can send a buffered set of data somewhere,
// probably to a collector.
type Publisher interface {
Publish(*Buffer) error
Publish(Buffer) error
Stop()
}

Expand Down Expand Up @@ -59,9 +59,7 @@ func (p HTTPPublisher) String() string {
}

// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(buf *Buffer) error {
defer buf.Put()

func (p HTTPPublisher) Publish(buf Buffer) error {
req, err := http.NewRequest("POST", p.url, buf)
if err != nil {
return err
Expand Down Expand Up @@ -98,15 +96,15 @@ func AuthorizationHeader(token string) string {
// concurrent publishes are dropped.
type BackgroundPublisher struct {
publisher Publisher
reports chan *Buffer
reports chan Buffer
quit chan struct{}
}

// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
result := &BackgroundPublisher{
publisher: p,
reports: make(chan *Buffer),
reports: make(chan Buffer),
quit: make(chan struct{}),
}
go result.loop()
Expand All @@ -116,8 +114,9 @@ func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
func (b *BackgroundPublisher) loop() {
backoff := initialBackoff

for r := range b.reports {
err := b.publisher.Publish(r)
for buf := range b.reports {
err := b.publisher.Publish(buf)
buf.Put()
if err == nil {
backoff = initialBackoff
continue
Expand All @@ -136,10 +135,13 @@ func (b *BackgroundPublisher) loop() {
}

// Publish implements Publisher
func (b *BackgroundPublisher) Publish(buf *Buffer) error {
func (b *BackgroundPublisher) Publish(buf Buffer) error {
buf = buf.Get()

select {
case b.reports <- buf:
default:
buf.Put()
}
return nil
}
Expand Down Expand Up @@ -188,14 +190,15 @@ func (p *MultiPublisher) Add(target string) {
}

// Publish implements Publisher by emitting the report to all publishers.
func (p *MultiPublisher) Publish(buf *Buffer) error {
func (p *MultiPublisher) Publish(buf Buffer) error {
p.mtx.RLock()
defer p.mtx.RUnlock()

// First take a reference for each publisher
for range p.m {
buf.Get()
}
// First take a reference for me, to prevent one of
// the many publishers returning before I get a chance
// to give references to the other publishers.
buf = buf.Get()
defer buf.Put()

var errs []string
for _, publisher := range p.m {
Expand Down
9 changes: 5 additions & 4 deletions xfer/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,19 @@ func TestMultiPublisher(t *testing.T) {
p = &mockPublisher{}
factory = func(string) (xfer.Publisher, error) { return p, nil }
multiPublisher = xfer.NewMultiPublisher(factory)
buffers = xfer.NewBufferPool()
)

multiPublisher.Add("first")
if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil {
if err := multiPublisher.Publish(buffers.Get().(xfer.Buffer)); err != nil {
t.Error(err)
}
if want, have := 1, p.count; want != have {
t.Errorf("want %d, have %d", want, have)
}

multiPublisher.Add("second") // but factory returns same mockPublisher
if err := multiPublisher.Publish(xfer.NewBuffer(nil)); err != nil {
if err := multiPublisher.Publish(buffers.Get().(xfer.Buffer)); err != nil {
t.Error(err)
}
if want, have := 3, p.count; want != have {
Expand All @@ -102,5 +103,5 @@ func TestMultiPublisher(t *testing.T) {

type mockPublisher struct{ count int }

func (p *mockPublisher) Publish(*xfer.Buffer) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}
func (p *mockPublisher) Publish(xfer.Buffer) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}
7 changes: 4 additions & 3 deletions xfer/report_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ func NewReportPublisher(publisher Publisher) *ReportPublisher {

// Publish serialises and compresses a report, then passes it to a publisher
func (p *ReportPublisher) Publish(r report.Report) error {
buf := p.buffers.Get().(*Buffer)
buf := p.buffers.Get().(Buffer)
buf.Get()
defer buf.Put()

gzwriter := gzip.NewWriter(buf)
if err := gob.NewEncoder(gzwriter).Encode(r); err != nil {
buf.Reset()
p.buffers.Put(buf)
return err
}
gzwriter.Close() // otherwise the content won't get flushed to the output stream
Expand Down

0 comments on commit c1a9de7

Please sign in to comment.