diff --git a/pkg/ccl/changefeedccl/kvevent/alloc.go b/pkg/ccl/changefeedccl/kvevent/alloc.go index aaa4084e05c8..ba63a8b4d7b9 100644 --- a/pkg/ccl/changefeedccl/kvevent/alloc.go +++ b/pkg/ccl/changefeedccl/kvevent/alloc.go @@ -46,6 +46,16 @@ func (a *Alloc) Release(ctx context.Context) { a.clear() } +// Bytes returns the size of this alloc in bytes. +func (a *Alloc) Bytes() int64 { + return a.bytes +} + +// Events returns the number of items associated with this alloc. +func (a *Alloc) Events() int64 { + return a.entries +} + // Merge merges other resources into this allocation. func (a *Alloc) Merge(other *Alloc) { defer other.clear() @@ -84,6 +94,14 @@ func (a *Alloc) Merge(other *Alloc) { func (a *Alloc) clear() { *a = Alloc{} } func (a *Alloc) isZero() bool { return a.ap == nil } +func (a *Alloc) init(bytes int64, p pool) { + if !a.isZero() { + panic("cannot initialize already initialized alloc") + } + a.bytes = bytes + a.entries = 1 + a.ap = p +} // TestingMakeAlloc creates allocation for the specified number of bytes // in a single message using allocation pool 'p'. diff --git a/pkg/ccl/changefeedccl/kvevent/bench_test.go b/pkg/ccl/changefeedccl/kvevent/bench_test.go index 43687d888dfb..395881e1a86f 100644 --- a/pkg/ccl/changefeedccl/kvevent/bench_test.go +++ b/pkg/ccl/changefeedccl/kvevent/bench_test.go @@ -10,9 +10,6 @@ package kvevent_test import ( "context" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "math/rand" "testing" "time" @@ -24,8 +21,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/stretchr/testify/require" ) diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go index 41f93e8f7888..bb17da8a97ae 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go @@ -10,7 +10,6 @@ package kvevent import ( "context" - "sync" "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" @@ -32,13 +31,18 @@ type blockingBuffer struct { qp allocPool // Pool for memory allocations. signalCh chan struct{} // Signal when new events are available. + req struct { + syncutil.Mutex + memRequest + } + mu struct { syncutil.Mutex - closed bool // True when buffer closed. - reason error // Reason buffer is closed. - drainCh chan struct{} // Set when Drain request issued. - blocked bool // Set when event is blocked, waiting to acquire quota. - queue bufferEntryQueue // Queue of added events. + closed bool // True when buffer closed. + reason error // Reason buffer is closed. + drainCh chan struct{} // Set when Drain request issued. + blocked bool // Set when event is blocked, waiting to acquire quota. + queue *bufferEventChunkQueue // Queue of added events. } } @@ -68,22 +72,23 @@ func NewMemBuffer( AbstractPool: quotapool.New("changefeed", quota, opts...), metrics: metrics, } + b.mu.queue = &bufferEventChunkQueue{} return b } var _ Buffer = (*blockingBuffer)(nil) -func (b *blockingBuffer) pop() (e *bufferEntry, err error) { +func (b *blockingBuffer) pop() (e Event, ok bool, err error) { b.mu.Lock() defer b.mu.Unlock() if b.mu.closed { - return nil, ErrBufferClosed{reason: b.mu.reason} + return Event{}, false, ErrBufferClosed{reason: b.mu.reason} } - e = b.mu.queue.dequeue() + e, ok = b.mu.queue.dequeue() - if e == nil && b.mu.blocked { + if !ok && b.mu.blocked { // Here, we know that we are blocked, waiting for memory; yet we have nothing queued up // (and thus, no resources that could be released by draining the queue). // This means that all the previously added entries have been read by the consumer, @@ -93,7 +98,8 @@ func (b *blockingBuffer) pop() (e *bufferEntry, err error) { // If the batching event consumer does not have periodic flush configured, // we may never be able to make forward progress. // So, we issue the flush request to the consumer to ensure that we release some memory. - e = newBufferEntry(Event{flush: true}) + e = Event{flush: true} + ok = true // Ensure we notify only once. b.mu.blocked = false } @@ -102,7 +108,7 @@ func (b *blockingBuffer) pop() (e *bufferEntry, err error) { close(b.mu.drainCh) b.mu.drainCh = nil } - return e, nil + return e, ok, nil } // notifyOutOfQuota is invoked by memQuota to notify blocking buffer that @@ -125,16 +131,14 @@ func (b *blockingBuffer) notifyOutOfQuota() { // Get implements kvevent.Reader interface. func (b *blockingBuffer) Get(ctx context.Context) (ev Event, err error) { for { - got, err := b.pop() + got, ok, err := b.pop() if err != nil { return Event{}, err } - if got != nil { + if ok { b.metrics.BufferEntriesOut.Inc(1) - e := got.e - bufferEntryPool.Put(got) - return e, nil + return got, nil } select { @@ -145,38 +149,19 @@ func (b *blockingBuffer) Get(ctx context.Context) (ev Event, err error) { } } -func (b *blockingBuffer) ensureOpened(ctx context.Context) error { +func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) { + // Enqueue message, and signal if anybody is waiting. b.mu.Lock() defer b.mu.Unlock() - return b.ensureOpenedLocked(ctx) -} -func (b *blockingBuffer) ensureOpenedLocked(ctx context.Context) error { if b.mu.closed { logcrash.ReportOrPanic(ctx, b.sv, "buffer unexpectedly closed") return errors.AssertionFailedf("buffer unexpectedly closed") } - return nil -} - -func (b *blockingBuffer) enqueue(ctx context.Context, be *bufferEntry) (err error) { - // Enqueue message, and signal if anybody is waiting. - b.mu.Lock() - defer b.mu.Unlock() - defer func() { - if err != nil { - bufferEntryPool.Put(be) - } - }() - - if err = b.ensureOpenedLocked(ctx); err != nil { - return err - } - b.metrics.BufferEntriesIn.Inc(1) b.mu.blocked = false - b.mu.queue.enqueue(be) + b.mu.queue.enqueue(e) select { case b.signalCh <- struct{}{}: @@ -187,13 +172,9 @@ func (b *blockingBuffer) enqueue(ctx context.Context, be *bufferEntry) (err erro // Add implements Writer interface. func (b *blockingBuffer) Add(ctx context.Context, e Event) error { - if err := b.ensureOpened(ctx); err != nil { - return err - } - if e.alloc.ap != nil { // Use allocation associated with the event itself. - return b.enqueue(ctx, newBufferEntry(e)) + return b.enqueue(ctx, e) } // Acquire the quota first. @@ -201,20 +182,21 @@ func (b *blockingBuffer) Add(ctx context.Context, e Event) error { if l := changefeedbase.PerChangefeedMemLimit.Get(b.sv); alloc > l { return errors.Newf("event size %d exceeds per changefeed limit %d", alloc, l) } - e.alloc = Alloc{ - bytes: alloc, - entries: 1, - ap: &b.qp, - } + e.alloc.init(alloc, &b.qp) e.bufferAddTimestamp = timeutil.Now() - be := newBufferEntry(e) - - if err := b.qp.Acquire(ctx, be); err != nil { - bufferEntryPool.Put(be) + if err := func() error { + b.req.Lock() + defer b.req.Unlock() + b.req.memRequest = memRequest(alloc) + if err := b.qp.Acquire(ctx, &b.req); err != nil { + return err + } + return nil + }(); err != nil { return err } b.metrics.BufferEntriesMemAcquired.Inc(alloc) - return b.enqueue(ctx, be) + return b.enqueue(ctx, e) } // tryDrain attempts to see if the buffer already empty. @@ -284,11 +266,7 @@ func (b *blockingBuffer) CloseWithReason(ctx context.Context, reason error) erro close(b.signalCh) // Return all queued up entries to the buffer pool. - // Note: we do not need to release their resources since we are going to close - // bound account anyway. - for be := b.mu.queue.dequeue(); be != nil; be = b.mu.queue.dequeue() { - bufferEntryPool.Put(be) - } + b.mu.queue.purge() return nil } @@ -319,36 +297,14 @@ type memQuota struct { var _ quotapool.Resource = (*memQuota)(nil) -// bufferEntry forms a linked list of elements in the buffer. -// These entries are pooled to eliminate allocations. -// bufferEntry also implements quotapool.Request interface for resource acquisition. -type bufferEntry struct { - e Event - next *bufferEntry // linked-list element -} - -var bufferEntryPool = sync.Pool{ - New: func() interface{} { - return new(bufferEntry) - }, -} - -func newBufferEntry(e Event) *bufferEntry { - be := bufferEntryPool.Get().(*bufferEntry) - be.e = e - be.next = nil - return be -} - -var _ quotapool.Request = (*bufferEntry)(nil) +type memRequest int64 // Acquire implements quotapool.Request interface. -func (be *bufferEntry) Acquire( +func (r *memRequest) Acquire( ctx context.Context, resource quotapool.Resource, ) (fulfilled bool, tryAgainAfter time.Duration) { quota := resource.(*memQuota) - fulfilled, tryAgainAfter = be.acquireQuota(ctx, quota) - + fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota) if !fulfilled { quota.notifyOutOfQuota() } @@ -356,7 +312,7 @@ func (be *bufferEntry) Acquire( return fulfilled, tryAgainAfter } -func (be *bufferEntry) acquireQuota( +func (r *memRequest) acquireQuota( ctx context.Context, quota *memQuota, ) (fulfilled bool, tryAgainAfter time.Duration) { if quota.canAllocateBelow > 0 { @@ -366,7 +322,7 @@ func (be *bufferEntry) acquireQuota( quota.canAllocateBelow = 0 } - if err := quota.acc.Grow(ctx, be.e.alloc.bytes); err != nil { + if err := quota.acc.Grow(ctx, int64(*r)); err != nil { if quota.allocated == 0 { // We've failed but there's nothing outstanding. It seems that this request // is doomed to fail forever. However, that's not the case since our memory @@ -382,45 +338,16 @@ func (be *bufferEntry) acquireQuota( return false, 0 } - quota.allocated += be.e.alloc.bytes + quota.allocated += int64(*r) quota.canAllocateBelow = 0 return true, 0 } // ShouldWait implements quotapool.Request interface. -func (be *bufferEntry) ShouldWait() bool { +func (r *memRequest) ShouldWait() bool { return true } -// bufferEntryQueue is a queue implemented as a linked-list of bufferEntry. -type bufferEntryQueue struct { - head, tail *bufferEntry -} - -func (l *bufferEntryQueue) enqueue(be *bufferEntry) { - if l.tail == nil { - l.head, l.tail = be, be - } else { - l.tail.next = be - l.tail = be - } -} - -func (l *bufferEntryQueue) empty() bool { - return l.head == nil -} - -func (l *bufferEntryQueue) dequeue() *bufferEntry { - if l.head == nil { - return nil - } - ret := l.head - if l.head = l.head.next; l.head == nil { - l.tail = nil - } - return ret -} - type allocPool struct { *quotapool.AbstractPool metrics *Metrics