Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batcher ordered #111

Merged
merged 1 commit into from
Aug 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 137 additions & 69 deletions batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,38 @@ package batcher

import (
"errors"
"sync"
"sync/atomic"
"time"
)

const (
batcherActive = uint32(0)
batcherDisposed = uint32(1)
)
// I honestly can't believe I'm doing this, but go's sync package doesn't
// have a TryLock function.
// Could probably do this with atomics
type mutex struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might suggest naming this tryableMutex or something else that describes the purpose. mutex makes me think sync.Mutex.

// This is really more of a semaphore design, but eh
// Full -> locked, empty -> unlocked
lock chan struct{}
}

func newMutex() *mutex {
return &mutex{lock: make(chan struct{}, 1)}
}

func (m *mutex) Lock() {
m.lock <- struct{}{}
}

func (m *mutex) Unlock() {
<-m.lock
}

func (m *mutex) TryLock() bool {
select {
case m.lock <- struct{}{}:
return true
default:
return false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

}

// Batcher provides an API for accumulating items into a batch for processing.
type Batcher interface {
Expand Down Expand Up @@ -60,13 +83,11 @@ type basicBatcher struct {
maxItems uint
maxBytes uint
calculateBytes CalculateBytes
disposed uint32
disposed bool
items []interface{}
lock sync.RWMutex
batchChan chan []interface{}
disposeChan chan struct{}
availableBytes uint
waiting int32
lock *mutex
}

// New creates a new Batcher using the provided arguments.
Expand All @@ -76,6 +97,10 @@ type basicBatcher struct {
// - Maximum amount of time waiting for a batch
// Values of zero for one of these fields indicate they should not be
// taken into account when evaluating the readiness of a batch.
// This provides an ordering guarantee for any given thread such that if a
// thread places two items in the batcher, Get will guarantee the first
// item is returned before the second, whether before the second in the same
// batch, or in an earlier batch.
func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate CalculateBytes) (Batcher, error) {
if maxBytes > 0 && calculate == nil {
return nil, errors.New("batcher: must provide CalculateBytes function")
Expand All @@ -88,24 +113,27 @@ func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate Cal
calculateBytes: calculate,
items: make([]interface{}, 0, maxItems),
batchChan: make(chan []interface{}, queueLen),
disposeChan: make(chan struct{}),
lock: newMutex(),
}, nil
}

// Put adds items to the batcher. If Put is continually called without calls to
// Get, an unbounded number of go-routines will be generated.
// Note: there is no order guarantee for items entering/leaving the batcher.
// Put adds items to the batcher.
func (b *basicBatcher) Put(item interface{}) error {
// Check to see if disposed before putting
if b.IsDisposed() {
b.lock.Lock()
if b.disposed {
b.lock.Unlock()
return ErrDisposed
}
b.lock.Lock()

b.items = append(b.items, item)
if b.calculateBytes != nil {
b.availableBytes += b.calculateBytes(item)
}
if b.ready() {
// To guarantee ordering this MUST be in the lock, otherwise multiple
// flush calls could be blocked at the same time, in which case
// there's no guarantee each batch is placed into the channel in
// the proper order
b.flush()
}

Expand All @@ -114,10 +142,7 @@ func (b *basicBatcher) Put(item interface{}) error {
}

// Get retrieves a batch from the batcher. This call will block until
// one of the conditions for a "complete" batch is reached. If Put is
// continually called without calls to Get, an unbounded number of
// go-routines will be generated.
// Note: there is no order guarantee for items entering/leaving the batcher.
// one of the conditions for a "complete" batch is reached.
func (b *basicBatcher) Get() ([]interface{}, error) {
// Don't check disposed yet so any items remaining in the queue
// will be returned properly.
Expand All @@ -127,86 +152,119 @@ func (b *basicBatcher) Get() ([]interface{}, error) {
timeout = time.After(b.maxTime)
}

// Check to see if disposed before blocking
if b.IsDisposed() {
return nil, ErrDisposed
}

select {
case items := <-b.batchChan:
return items, nil
case _, ok := <-b.disposeChan:
case items, ok := <-b.batchChan:
// If there's something on the batch channel, we definitely want that.
if !ok {
return nil, ErrDisposed
}
return nil, nil
return items, nil
case <-timeout:
// Check to see if disposed before getting lock
if b.IsDisposed() {
return nil, ErrDisposed
// It's possible something was added to the channel after something
// was received on the timeout channel, in which case that must
// be returned first to satisfy our ordering guarantees.
// We can't just grab the lock here in case the batch channel is full,
// in which case a Put or Flush will be blocked and holding
// onto the lock. In that case, there should be something on the
// batch channel
for {
if b.lock.TryLock() {
// We have a lock, try to read from channel first in case
// something snuck in
select {
case items, ok := <-b.batchChan:
b.lock.Unlock()
if !ok {
return nil, ErrDisposed
}
return items, nil
default:
}

// If that is unsuccessful, nothing was added to the channel,
// and the temp buffer can't have changed because of the lock,
// so grab that
items := b.items
b.items = make([]interface{}, 0, b.maxItems)
b.availableBytes = 0
b.lock.Unlock()
return items, nil
} else {
// If we didn't get a lock, there are two cases:
// 1) The batch chan is full.
// 2) A Put or Flush temporarily has the lock.
// In either case, trying to read something off the batch chan,
// and going back to trying to get a lock if unsuccessful
// works.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some typos in this comment.

"trying to read something" -> "try to read something"
"going back to trying"     -> "go back to trying"
"if unsuccessful works"    -> "if unsuccessful"

select {
case items, ok := <-b.batchChan:
if !ok {
return nil, ErrDisposed
}
return items, nil
default:
}
}
}
b.lock.Lock()
items := b.items
b.items = make([]interface{}, 0, b.maxItems)
b.availableBytes = 0
b.lock.Unlock()
return items, nil
}
}

// Flush forcibly completes the batch currently being built
func (b *basicBatcher) Flush() error {
if b.IsDisposed() {
// This is the same pattern as a Put
b.lock.Lock()
if b.disposed {
b.lock.Unlock()
return ErrDisposed
}
b.lock.Lock()
b.flush()
b.lock.Unlock()
return nil
}

// Dispose will dispose of the batcher. Any calls to Put or Flush
// will return ErrDisposed, calls to Get will return an error iff
// there are no more ready batches.
// there are no more ready batches. Any items not flushed and retrieved
// by a Get may or may not be retrievable after calling this.
func (b *basicBatcher) Dispose() {
// Check to see if disposed before attempting to dispose
if atomic.CompareAndSwapUint32(&b.disposed, batcherActive, batcherDisposed) {
return
}
b.lock.Lock()
b.flush()
b.items = nil
close(b.disposeChan)
for {
if b.lock.TryLock() {
// We've got a lock
if b.disposed {
b.lock.Unlock()
return
}

b.disposed = true
b.items = nil
b.drainBatchChan()
close(b.batchChan)
b.lock.Unlock()
} else {
// Two cases here:
// 1) Something is blocked and holding onto the lock
// 2) Something temporarily has a lock
// For case 1, we have to clear at least some space so the blocked
// Put/Flush can release the lock. For case 2, nothing bad
// will happen here
b.drainBatchChan()
}

// Drain the batch channel and all routines waiting to put on the channel
for len(b.batchChan) > 0 || atomic.LoadInt32(&b.waiting) > 0 {
<-b.batchChan
}
close(b.batchChan)
b.lock.Unlock()
}

// IsDisposed will determine if the batcher is disposed
func (b *basicBatcher) IsDisposed() bool {
return atomic.LoadUint32(&b.disposed) == batcherDisposed
b.lock.Lock()
disposed := b.disposed
b.lock.Unlock()
return disposed
}

// flush adds the batch currently being built to the queue of completed batches.
// flush is not threadsafe, so should be synchronized externally.
func (b *basicBatcher) flush() {
// Note: This needs to be in a go-routine to avoid locking out gets when
// the batch channel is full.
cpItems := make([]interface{}, len(b.items))
for i, val := range b.items {
cpItems[i] = val
}
// Signal one more waiter for the batch channel
atomic.AddInt32(&b.waiting, 1)
// Don't block on the channel put
go func() {
b.batchChan <- cpItems
atomic.AddInt32(&b.waiting, -1)
}()
b.batchChan <- b.items
b.items = make([]interface{}, 0, b.maxItems)
b.availableBytes = 0
}
Expand All @@ -220,3 +278,13 @@ func (b *basicBatcher) ready() bool {
}
return false
}

func (b *basicBatcher) drainBatchChan() {
for {
select {
case <-b.batchChan:
default:
return
}
}
}
16 changes: 2 additions & 14 deletions batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,29 +145,17 @@ func TestDispose(t *testing.T) {
b.Put("b")
b.Put("c")

possibleBatches := [][]interface{}{
[]interface{}{"a", "b"},
[]interface{}{"c"},
}

// Wait for items to get to the channel
for len(b.(*basicBatcher).batchChan) == 0 {
time.Sleep(1 * time.Millisecond)
}
batch1, err := b.Get()
assert.Contains(possibleBatches, batch1)
assert.Equal([]interface{}{"a", "b"}, batch1)
assert.Nil(err)

batch2, err := b.Get()
assert.Contains(possibleBatches, batch2)
assert.Equal([]interface{}{"c"}, batch2)
assert.Nil(err)

b.Put("d")
b.Put("e")
b.Put("f")
b.Put("g")
b.Put("h")
b.Put("i")

b.Dispose()

Expand Down