Skip to content

Commit

Permalink
kvevent: refactor memory buffer to chunked linked list
Browse files Browse the repository at this point in the history
This change refactors kvevent/blocking_buffer.go to use
a chunked linked list instead of a regular linked list to
reduce pointer usage. Note that the underlying sync.Pool,
which is also a linked list, will use less pointers due to
us pooling chunks instead of events.

Release note: None

Release justification: This change significantly
improves performance by significantly reducing
pressure on GC. Consequently, this significantly
improves foreground SQL p99 latency. GC has
been causing severe issues in production changefeeds.
Merging this change in this release is worth it
for its potential to reduce incedents.
  • Loading branch information
jayshrivastava committed Aug 19, 2022
1 parent 6d8a9ca commit 5734c3d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 120 deletions.
18 changes: 18 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func (a *Alloc) Release(ctx context.Context) {
a.clear()
}

// Bytes returns the size of this alloc in bytes.
func (a *Alloc) Bytes() int64 {
return a.bytes
}

// Events returns the number of items associated with this alloc.
func (a *Alloc) Events() int64 {
return a.entries
}

// Merge merges other resources into this allocation.
func (a *Alloc) Merge(other *Alloc) {
defer other.clear()
Expand Down Expand Up @@ -84,6 +94,14 @@ func (a *Alloc) Merge(other *Alloc) {

func (a *Alloc) clear() { *a = Alloc{} }
func (a *Alloc) isZero() bool { return a.ap == nil }
func (a *Alloc) init(bytes int64, p pool) {
if !a.isZero() {
panic("cannot initialize already initialized alloc")
}
a.bytes = bytes
a.entries = 1
a.ap = p
}

// TestingMakeAlloc creates allocation for the specified number of bytes
// in a single message using allocation pool 'p'.
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/kvevent/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ package kvevent_test

import (
"context"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"math/rand"
"testing"
"time"
Expand All @@ -24,8 +21,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

Expand Down
161 changes: 44 additions & 117 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package kvevent

import (
"context"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
Expand All @@ -32,13 +31,18 @@ type blockingBuffer struct {
qp allocPool // Pool for memory allocations.
signalCh chan struct{} // Signal when new events are available.

req struct {
syncutil.Mutex
memRequest
}

mu struct {
syncutil.Mutex
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
blocked bool // Set when event is blocked, waiting to acquire quota.
queue bufferEntryQueue // Queue of added events.
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
blocked bool // Set when event is blocked, waiting to acquire quota.
queue *bufferEventChunkQueue // Queue of added events.
}
}

Expand Down Expand Up @@ -68,22 +72,23 @@ func NewMemBuffer(
AbstractPool: quotapool.New("changefeed", quota, opts...),
metrics: metrics,
}
b.mu.queue = &bufferEventChunkQueue{}

return b
}

var _ Buffer = (*blockingBuffer)(nil)

func (b *blockingBuffer) pop() (e *bufferEntry, err error) {
func (b *blockingBuffer) pop() (e Event, ok bool, err error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.mu.closed {
return nil, ErrBufferClosed{reason: b.mu.reason}
return Event{}, false, ErrBufferClosed{reason: b.mu.reason}
}

e = b.mu.queue.dequeue()
e, ok = b.mu.queue.dequeue()

if e == nil && b.mu.blocked {
if !ok && b.mu.blocked {
// Here, we know that we are blocked, waiting for memory; yet we have nothing queued up
// (and thus, no resources that could be released by draining the queue).
// This means that all the previously added entries have been read by the consumer,
Expand All @@ -93,7 +98,8 @@ func (b *blockingBuffer) pop() (e *bufferEntry, err error) {
// If the batching event consumer does not have periodic flush configured,
// we may never be able to make forward progress.
// So, we issue the flush request to the consumer to ensure that we release some memory.
e = newBufferEntry(Event{flush: true})
e = Event{flush: true}
ok = true
// Ensure we notify only once.
b.mu.blocked = false
}
Expand All @@ -102,7 +108,7 @@ func (b *blockingBuffer) pop() (e *bufferEntry, err error) {
close(b.mu.drainCh)
b.mu.drainCh = nil
}
return e, nil
return e, ok, nil
}

// notifyOutOfQuota is invoked by memQuota to notify blocking buffer that
Expand All @@ -125,16 +131,14 @@ func (b *blockingBuffer) notifyOutOfQuota() {
// Get implements kvevent.Reader interface.
func (b *blockingBuffer) Get(ctx context.Context) (ev Event, err error) {
for {
got, err := b.pop()
got, ok, err := b.pop()
if err != nil {
return Event{}, err
}

if got != nil {
if ok {
b.metrics.BufferEntriesOut.Inc(1)
e := got.e
bufferEntryPool.Put(got)
return e, nil
return got, nil
}

select {
Expand All @@ -145,38 +149,19 @@ func (b *blockingBuffer) Get(ctx context.Context) (ev Event, err error) {
}
}

func (b *blockingBuffer) ensureOpened(ctx context.Context) error {
func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
// Enqueue message, and signal if anybody is waiting.
b.mu.Lock()
defer b.mu.Unlock()
return b.ensureOpenedLocked(ctx)
}

func (b *blockingBuffer) ensureOpenedLocked(ctx context.Context) error {
if b.mu.closed {
logcrash.ReportOrPanic(ctx, b.sv, "buffer unexpectedly closed")
return errors.AssertionFailedf("buffer unexpectedly closed")
}

return nil
}

func (b *blockingBuffer) enqueue(ctx context.Context, be *bufferEntry) (err error) {
// Enqueue message, and signal if anybody is waiting.
b.mu.Lock()
defer b.mu.Unlock()
defer func() {
if err != nil {
bufferEntryPool.Put(be)
}
}()

if err = b.ensureOpenedLocked(ctx); err != nil {
return err
}

b.metrics.BufferEntriesIn.Inc(1)
b.mu.blocked = false
b.mu.queue.enqueue(be)
b.mu.queue.enqueue(e)

select {
case b.signalCh <- struct{}{}:
Expand All @@ -187,34 +172,31 @@ func (b *blockingBuffer) enqueue(ctx context.Context, be *bufferEntry) (err erro

// Add implements Writer interface.
func (b *blockingBuffer) Add(ctx context.Context, e Event) error {
if err := b.ensureOpened(ctx); err != nil {
return err
}

if e.alloc.ap != nil {
// Use allocation associated with the event itself.
return b.enqueue(ctx, newBufferEntry(e))
return b.enqueue(ctx, e)
}

// Acquire the quota first.
alloc := int64(changefeedbase.EventMemoryMultiplier.Get(b.sv) * float64(e.approxSize))
if l := changefeedbase.PerChangefeedMemLimit.Get(b.sv); alloc > l {
return errors.Newf("event size %d exceeds per changefeed limit %d", alloc, l)
}
e.alloc = Alloc{
bytes: alloc,
entries: 1,
ap: &b.qp,
}
e.alloc.init(alloc, &b.qp)
e.bufferAddTimestamp = timeutil.Now()
be := newBufferEntry(e)

if err := b.qp.Acquire(ctx, be); err != nil {
bufferEntryPool.Put(be)
if err := func() error {
b.req.Lock()
defer b.req.Unlock()
b.req.memRequest = memRequest(alloc)
if err := b.qp.Acquire(ctx, &b.req); err != nil {
return err
}
return nil
}(); err != nil {
return err
}
b.metrics.BufferEntriesMemAcquired.Inc(alloc)
return b.enqueue(ctx, be)
return b.enqueue(ctx, e)
}

// tryDrain attempts to see if the buffer already empty.
Expand Down Expand Up @@ -284,11 +266,7 @@ func (b *blockingBuffer) CloseWithReason(ctx context.Context, reason error) erro
close(b.signalCh)

// Return all queued up entries to the buffer pool.
// Note: we do not need to release their resources since we are going to close
// bound account anyway.
for be := b.mu.queue.dequeue(); be != nil; be = b.mu.queue.dequeue() {
bufferEntryPool.Put(be)
}
b.mu.queue.purge()

return nil
}
Expand Down Expand Up @@ -319,44 +297,22 @@ type memQuota struct {

var _ quotapool.Resource = (*memQuota)(nil)

// bufferEntry forms a linked list of elements in the buffer.
// These entries are pooled to eliminate allocations.
// bufferEntry also implements quotapool.Request interface for resource acquisition.
type bufferEntry struct {
e Event
next *bufferEntry // linked-list element
}

var bufferEntryPool = sync.Pool{
New: func() interface{} {
return new(bufferEntry)
},
}

func newBufferEntry(e Event) *bufferEntry {
be := bufferEntryPool.Get().(*bufferEntry)
be.e = e
be.next = nil
return be
}

var _ quotapool.Request = (*bufferEntry)(nil)
type memRequest int64

// Acquire implements quotapool.Request interface.
func (be *bufferEntry) Acquire(
func (r *memRequest) Acquire(
ctx context.Context, resource quotapool.Resource,
) (fulfilled bool, tryAgainAfter time.Duration) {
quota := resource.(*memQuota)
fulfilled, tryAgainAfter = be.acquireQuota(ctx, quota)

fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota)
if !fulfilled {
quota.notifyOutOfQuota()
}

return fulfilled, tryAgainAfter
}

func (be *bufferEntry) acquireQuota(
func (r *memRequest) acquireQuota(
ctx context.Context, quota *memQuota,
) (fulfilled bool, tryAgainAfter time.Duration) {
if quota.canAllocateBelow > 0 {
Expand All @@ -366,7 +322,7 @@ func (be *bufferEntry) acquireQuota(
quota.canAllocateBelow = 0
}

if err := quota.acc.Grow(ctx, be.e.alloc.bytes); err != nil {
if err := quota.acc.Grow(ctx, int64(*r)); err != nil {
if quota.allocated == 0 {
// We've failed but there's nothing outstanding. It seems that this request
// is doomed to fail forever. However, that's not the case since our memory
Expand All @@ -382,45 +338,16 @@ func (be *bufferEntry) acquireQuota(
return false, 0
}

quota.allocated += be.e.alloc.bytes
quota.allocated += int64(*r)
quota.canAllocateBelow = 0
return true, 0
}

// ShouldWait implements quotapool.Request interface.
func (be *bufferEntry) ShouldWait() bool {
func (r *memRequest) ShouldWait() bool {
return true
}

// bufferEntryQueue is a queue implemented as a linked-list of bufferEntry.
type bufferEntryQueue struct {
head, tail *bufferEntry
}

func (l *bufferEntryQueue) enqueue(be *bufferEntry) {
if l.tail == nil {
l.head, l.tail = be, be
} else {
l.tail.next = be
l.tail = be
}
}

func (l *bufferEntryQueue) empty() bool {
return l.head == nil
}

func (l *bufferEntryQueue) dequeue() *bufferEntry {
if l.head == nil {
return nil
}
ret := l.head
if l.head = l.head.next; l.head == nil {
l.tail = nil
}
return ret
}

type allocPool struct {
*quotapool.AbstractPool
metrics *Metrics
Expand Down

0 comments on commit 5734c3d

Please sign in to comment.