diff --git a/.chloggen/otelarrow-admission.yaml b/.chloggen/otelarrow-admission.yaml new file mode 100644 index 000000000000..8bd92566d66c --- /dev/null +++ b/.chloggen/otelarrow-admission.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Admission control improvements (LIFO); admission.waiter_limit is deprecated, replaced with admission.waiting_limit_mib. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36074] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/otelarrow/admission/README.md b/internal/otelarrow/admission/README.md index 053ad05a8fcf..9076096e2ec5 100644 --- a/internal/otelarrow/admission/README.md +++ b/internal/otelarrow/admission/README.md @@ -2,19 +2,31 @@ ## Overview -The admission package provides a BoundedQueue object which is a semaphore implementation that limits the number of bytes admitted into a collector pipeline. Additionally the BoundedQueue limits the number of waiters that can block on a call to `bq.Acquire(sz int64)`. +The admission package provides a BoundedQueue object. This object +implements a semaphore for limiting the number of bytes admitted into +a collector pipeline. Additionally, the BoundedQueue limits the +number of bytes allowed to block on a call to `Acquire(pending int64)`. -This package is an experiment to improve the behavior of Collector pipelines having their `exporterhelper` configured to apply backpressure. This package is meant to be used in receivers, via an interceptor or custom logic. Therefore, the BoundedQueue helps limit memory within the entire collector pipeline by limiting two dimensions that cause memory issues: -1. bytes: large requests that enter the collector pipeline can require large allocations even if downstream components will eventually limit or ratelimit the request. -2. waiters: limiting on bytes alone is not enough because requests that enter the pipeline and block on `bq.Acquire()` can still consume memory within the receiver. If there are enough waiters this can be a significant contribution to memory usage. +There are two error conditions generated within this code: + +- `rejecting request, too much pending data`: When the limit on waiting bytes its reached, this will be returned to limit the total amount waiting. +- `rejecting request, request is too large`: When an individual request exceeds the configured limit, this will be returned without acquiring or waiting. + +The BoundedQueue implements LIFO semantics. See this +[article](https://medium.com/swlh/fifo-considered-harmful-793b76f98374) +explaining why it is preferred to FIFO semantics. ## Usage -Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiters)` +Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiting)` + +Within the component call `bq.Acquire(ctx, requestSize)` which will: -Within the component call `bq.Acquire(ctx, requestSize)` which will either -1. succeed immediately if there is enough available memory -2. fail immediately if there are too many waiters -3. block until context cancelation or enough bytes becomes available +1. succeed immediately if there is enough available memory, +2. fail immediately if there are too many waiters, or +3. block until context cancelation or enough bytes becomes available. -Once a request has finished processing and is sent downstream call `bq.Release(requestSize)` to allow waiters to be admitted for processing. Release should only fail if releasing more bytes than previously acquired. \ No newline at end of file +When the resources have been acquired successfully, a closure is +returned that, when called, will release the semaphore. When the +semaphore is released, pending waiters that can be satisfied will +acquire the resource and become unblocked. diff --git a/internal/otelarrow/admission/boundedqueue.go b/internal/otelarrow/admission/boundedqueue.go index ea3f255db551..e6e77142069a 100644 --- a/internal/otelarrow/admission/boundedqueue.go +++ b/internal/otelarrow/admission/boundedqueue.go @@ -4,157 +4,174 @@ package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" import ( + "container/list" "context" - "fmt" "sync" - "github.com/google/uuid" - orderedmap "github.com/wk8/go-ordered-map/v2" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + grpccodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters") +var ErrTooMuchWaiting = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data") +var ErrRequestTooLarge = status.Errorf(grpccodes.InvalidArgument, "rejecting request, request is too large") +// BoundedQueue is a LIFO-oriented admission-controlled Queue. type BoundedQueue struct { - maxLimitBytes int64 - maxLimitWaiters int64 - currentBytes int64 - currentWaiters int64 + maxLimitAdmit uint64 + maxLimitWait uint64 + tracer trace.Tracer + + // lock protects currentAdmitted, currentWaiting, and waiters + lock sync.Mutex - waiters *orderedmap.OrderedMap[uuid.UUID, waiter] - tracer trace.Tracer + currentAdmitted uint64 + currentWaiting uint64 + waiters *list.List // of *waiter } +var _ Queue = &BoundedQueue{} + +// waiter is an item in the BoundedQueue waiters list. type waiter struct { - readyCh chan struct{} - pendingBytes int64 - ID uuid.UUID + notify N + pending uint64 } -func NewBoundedQueue(tp trace.TracerProvider, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue { +// NewBoundedQueue returns a LIFO-oriented Queue implementation which +// admits `maxLimitAdmit` bytes concurrently and allows up to +// `maxLimitWait` bytes to wait for admission. +func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue { return &BoundedQueue{ - maxLimitBytes: maxLimitBytes, - maxLimitWaiters: maxLimitWaiters, - waiters: orderedmap.New[uuid.UUID, waiter](), - tracer: tp.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), + maxLimitAdmit: maxLimitAdmit, + maxLimitWait: maxLimitWait, + waiters: list.New(), + tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), } } -func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) { +// acquireOrGetWaiter returns with three distinct conditions depending +// on whether it was accepted, rejected, or asked to wait. +// +// - element=nil, error=nil: the fast success path +// - element=nil, error=non-nil: the fast failure path +// - element=non-nil, error=non-nil: the slow success path +func (bq *BoundedQueue) acquireOrGetWaiter(pending uint64) (*list.Element, error) { bq.lock.Lock() defer bq.lock.Unlock() - if pendingBytes > bq.maxLimitBytes { // will never succeed - return false, fmt.Errorf("rejecting request, request size larger than configured limit") + if pending > bq.maxLimitAdmit { + // when the request will never succeed because it is + // individually over the total limit, fail fast. + return nil, ErrRequestTooLarge } - if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit - bq.currentBytes += pendingBytes - return true, nil + if bq.currentAdmitted+pending <= bq.maxLimitAdmit { + // the fast success path. + bq.currentAdmitted += pending + return nil, nil } // since we were unable to admit, check if we can wait. - if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters - return false, ErrTooManyWaiters + if bq.currentWaiting+pending > bq.maxLimitWait { + return nil, ErrTooMuchWaiting } - // if we got to this point we need to wait to acquire bytes, so update currentWaiters before releasing mutex. - bq.currentWaiters++ - return false, nil + // otherwise we need to wait + return bq.addWaiterLocked(pending), nil } -func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { - success, err := bq.admit(pendingBytes) - if err != nil || success { - return err +// Acquire implements Queue. +func (bq *BoundedQueue) Acquire(ctx context.Context, pending uint64) (ReleaseFunc, error) { + element, err := bq.acquireOrGetWaiter(pending) + parentSpan := trace.SpanFromContext(ctx) + pendingAttr := trace.WithAttributes(attribute.Int64("pending", int64(pending))) + + if err != nil { + parentSpan.AddEvent("admission rejected (fast path)", pendingAttr) + return noopRelease, err + } else if element == nil { + parentSpan.AddEvent("admission accepted (fast path)", pendingAttr) + return bq.releaseFunc(pending), nil } - // otherwise we need to wait for bytes to be released - curWaiter := waiter{ - pendingBytes: pendingBytes, - readyCh: make(chan struct{}), - } + parentSpan.AddEvent("enter admission queue") - bq.lock.Lock() - - // generate unique key - for { - id := uuid.New() - _, keyExists := bq.waiters.Get(id) - if keyExists { - continue - } - bq.waiters.Set(id, curWaiter) - curWaiter.ID = id - break - } - - bq.lock.Unlock() - ctx, span := bq.tracer.Start(ctx, "admission_blocked", - trace.WithAttributes(attribute.Int64("pending", pendingBytes))) + ctx, span := bq.tracer.Start(ctx, "admission_blocked", pendingAttr) defer span.End() + waiter := element.Value.(*waiter) + select { - case <-curWaiter.readyCh: - return nil + case <-waiter.notify.Chan(): + parentSpan.AddEvent("admission accepted (slow path)", pendingAttr) + return bq.releaseFunc(pending), nil + case <-ctx.Done(): - // canceled before acquired so remove waiter. bq.lock.Lock() defer bq.lock.Unlock() - err = fmt.Errorf("context canceled: %w ", ctx.Err()) - span.SetStatus(codes.Error, "context canceled") - _, found := bq.waiters.Delete(curWaiter.ID) - if !found { - return err + if waiter.notify.HasBeenNotified() { + // We were also admitted, which can happen + // concurrently with cancellation. Make sure + // to release since no one else will do it. + bq.releaseLocked(pending) + } else { + // Remove ourselves from the list of waiters + // so that we can't be admitted in the future. + bq.removeWaiterLocked(pending, element) + bq.admitWaitersLocked() } - bq.currentWaiters-- - return err + parentSpan.AddEvent("admission rejected (canceled)", pendingAttr) + return noopRelease, status.Error(grpccodes.Canceled, context.Cause(ctx).Error()) } } -func (bq *BoundedQueue) Release(pendingBytes int64) error { - bq.lock.Lock() - defer bq.lock.Unlock() +func (bq *BoundedQueue) admitWaitersLocked() { + for bq.waiters.Len() != 0 { + // Ensure there is enough room to admit the next waiter. + element := bq.waiters.Back() + waiter := element.Value.(*waiter) + if bq.currentAdmitted+waiter.pending > bq.maxLimitAdmit { + // Returning means continuing to wait for the + // most recent arrival to get service by another release. + return + } - bq.currentBytes -= pendingBytes + // Release the next waiter and tell it that it has been admitted. + bq.removeWaiterLocked(waiter.pending, element) + bq.currentAdmitted += waiter.pending - if bq.currentBytes < 0 { - return fmt.Errorf("released more bytes than acquired") + waiter.notify.Notify() } +} - for { - if bq.waiters.Len() == 0 { - return nil - } - next := bq.waiters.Oldest() - nextWaiter := next.Value - nextKey := next.Key - if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes { - bq.currentBytes += nextWaiter.pendingBytes - bq.currentWaiters-- - close(nextWaiter.readyCh) - _, found := bq.waiters.Delete(nextKey) - if !found { - return fmt.Errorf("deleting waiter that doesn't exist") - } - continue - } - break - } +func (bq *BoundedQueue) addWaiterLocked(pending uint64) *list.Element { + bq.currentWaiting += pending + return bq.waiters.PushBack(&waiter{ + pending: pending, + notify: newNotification(), + }) +} - return nil +func (bq *BoundedQueue) removeWaiterLocked(pending uint64, element *list.Element) { + bq.currentWaiting -= pending + bq.waiters.Remove(element) } -func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool { - bq.lock.Lock() - defer bq.lock.Unlock() - if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { - bq.currentBytes += pendingBytes - return true +func (bq *BoundedQueue) releaseLocked(pending uint64) { + bq.currentAdmitted -= pending + bq.admitWaitersLocked() +} + +func (bq *BoundedQueue) releaseFunc(pending uint64) ReleaseFunc { + return func() { + bq.lock.Lock() + defer bq.lock.Unlock() + + bq.releaseLocked(pending) } - return false } diff --git a/internal/otelarrow/admission/boundedqueue_test.go b/internal/otelarrow/admission/boundedqueue_test.go index e0c4ac471f10..5812d07724d4 100644 --- a/internal/otelarrow/admission/boundedqueue_test.go +++ b/internal/otelarrow/admission/boundedqueue_test.go @@ -5,209 +5,330 @@ package admission import ( "context" + "fmt" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/multierr" + "go.opentelemetry.io/collector/component/componenttest" + "google.golang.org/grpc/codes" + grpccodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -func min(x, y int64) int64 { - if x <= y { - return x - } - return y +type bqTest struct { + t *testing.T + *BoundedQueue } -func max(x, y int64) int64 { - if x >= y { - return x - } - return y -} +var noopTelemetry = componenttest.NewNopTelemetrySettings() -func abs(x int64) int64 { - if x < 0 { - return -x +func newBQTest(t *testing.T, maxAdmit, maxWait uint64) bqTest { + return bqTest{ + t: t, + BoundedQueue: NewBoundedQueue(noopTelemetry, maxAdmit, maxWait).(*BoundedQueue), } - return x } -var noopTraces = noop.NewTracerProvider() - -func TestAcquireSimpleNoWaiters(t *testing.T) { - maxLimitBytes := 1000 - maxLimitWaiters := 10 - numRequests := 40 - requestSize := 21 +func (bq *bqTest) startWaiter(ctx context.Context, size uint64, relp *ReleaseFunc) N { + n := newNotification() + go func() { + var err error + *relp, err = bq.Acquire(ctx, size) + require.NoError(bq.t, err) + n.Notify() + }() + return n +} - bq := NewBoundedQueue(noopTraces, int64(maxLimitBytes), int64(maxLimitWaiters)) +func (bq *bqTest) waitForPending(admitted, waiting uint64) { + require.Eventually(bq.t, func() bool { + bq.lock.Lock() + defer bq.lock.Unlock() + return bq.currentAdmitted == admitted && bq.currentWaiting == waiting + }, time.Second, 20*time.Millisecond) +} - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - for i := 0; i < numRequests; i++ { - go func() { - err := bq.Acquire(ctx, int64(requestSize)) - assert.NoError(t, err) - }() +func mkRepeat(x uint64, n int) []uint64 { + if n == 0 { + return nil } + return append(mkRepeat(x, n-1), x) +} - require.Never(t, func() bool { - return bq.waiters.Len() > 0 - }, 2*time.Second, 10*time.Millisecond) - - for i := 0; i < numRequests; i++ { - assert.NoError(t, bq.Release(int64(requestSize))) - assert.Equal(t, int64(0), bq.currentWaiters) +func mkRange(from, to uint64) []uint64 { + if from > to { + return nil } - - assert.ErrorContains(t, bq.Release(int64(1)), "released more bytes than acquired") - assert.NoError(t, bq.Acquire(ctx, int64(maxLimitBytes))) + return append([]uint64{from}, mkRange(from+1, to)...) } -func TestAcquireBoundedWithWaiters(t *testing.T) { - tests := []struct { - name string - maxLimitBytes int64 - maxLimitWaiters int64 - numRequests int64 - requestSize int64 - timeout time.Duration +func TestBoundedQueueLimits(t *testing.T) { + for _, test := range []struct { + name string + maxLimitAdmit uint64 + maxLimitWait uint64 + requestSizes []uint64 + timeout time.Duration + expectErrs map[string]int }{ { - name: "below max waiters above max bytes", - maxLimitBytes: 1000, - maxLimitWaiters: 100, - numRequests: 100, - requestSize: 21, - timeout: 5 * time.Second, + name: "simple_no_waiters_25", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(25, 40), + timeout: 0, + expectErrs: map[string]int{}, }, { - name: "above max waiters above max bytes", - maxLimitBytes: 1000, - maxLimitWaiters: 100, - numRequests: 200, - requestSize: 21, - timeout: 5 * time.Second, + name: "simple_no_waiters_1", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(1, 1000), + timeout: 0, + expectErrs: map[string]int{}, }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - bq := NewBoundedQueue(noopTraces, tt.maxLimitBytes, tt.maxLimitWaiters) - var blockedRequests int64 - numReqsUntilBlocked := tt.maxLimitBytes / tt.requestSize - requestsAboveLimit := abs(tt.numRequests - numReqsUntilBlocked) - tooManyWaiters := requestsAboveLimit > tt.maxLimitWaiters - numRejected := max(requestsAboveLimit-tt.maxLimitWaiters, int64(0)) - - // There should never be more blocked requests than maxLimitWaiters. - blockedRequests = min(tt.maxLimitWaiters, requestsAboveLimit) - - ctx, cancel := context.WithTimeout(context.Background(), tt.timeout) - defer cancel() - var errs error - for i := 0; i < int(tt.numRequests); i++ { + { + name: "without_waiting_remainder", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(30, 40), + timeout: 0, + expectErrs: map[string]int{ + // 7 failures with a remainder of 10 + // 30 * (40 - 7) = 990 + ErrTooMuchWaiting.Error(): 7, + }, + }, + { + name: "without_waiting_complete", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: append(mkRepeat(30, 40), 10), + timeout: 0, + expectErrs: map[string]int{ + // 30*33+10 succeed, 7 failures (as above) + ErrTooMuchWaiting.Error(): 7, + }, + }, + { + name: "with_waiters_timeout", + maxLimitAdmit: 1000, + maxLimitWait: 1000, + requestSizes: mkRepeat(20, 100), + timeout: time.Second, + expectErrs: map[string]int{ + // 20*50=1000 is half of the requests timing out + status.Error(grpccodes.Canceled, context.DeadlineExceeded.Error()).Error(): 50, + }, + }, + { + name: "with_size_exceeded", + maxLimitAdmit: 1000, + maxLimitWait: 2000, + requestSizes: []uint64{1001}, + timeout: 0, + expectErrs: map[string]int{ + ErrRequestTooLarge.Error(): 1, + }, + }, + { + name: "mixed_sizes", + maxLimitAdmit: 45, // 45 is the exact sum of request sizes + maxLimitWait: 0, + requestSizes: mkRange(1, 9), + timeout: 0, + expectErrs: map[string]int{}, + }, + { + name: "too_many_mixed_sizes", + maxLimitAdmit: 44, // all but one request will succeed + maxLimitWait: 0, + requestSizes: mkRange(1, 9), + timeout: 0, + expectErrs: map[string]int{ + ErrTooMuchWaiting.Error(): 1, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + bq := newBQTest(t, test.maxLimitAdmit, test.maxLimitWait) + ctx := context.Background() + + if test.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } + + numRequests := len(test.requestSizes) + allErrors := make(chan error, numRequests) + + releaseChan := make(chan struct{}) + var wait1 sync.WaitGroup + var wait2 sync.WaitGroup + + wait1.Add(numRequests) + wait2.Add(numRequests) + + for _, requestSize := range test.requestSizes { go func() { - err := bq.Acquire(ctx, tt.requestSize) - bq.lock.Lock() - defer bq.lock.Unlock() - errs = multierr.Append(errs, err) + release, err := bq.Acquire(ctx, requestSize) + allErrors <- err + + wait1.Done() + + <-releaseChan + + release() + + wait2.Done() }() } - require.Eventually(t, func() bool { - bq.lock.Lock() - defer bq.lock.Unlock() - return bq.waiters.Len() == int(blockedRequests) - }, 3*time.Second, 10*time.Millisecond) + wait1.Wait() - assert.NoError(t, bq.Release(tt.requestSize)) - assert.Equal(t, bq.waiters.Len(), int(blockedRequests)-1) + close(releaseChan) - for i := 0; i < int(tt.numRequests-numRejected)-1; i++ { - assert.NoError(t, bq.Release(tt.requestSize)) - } + wait2.Wait() + + close(allErrors) + + errCounts := map[string]int{} - bq.lock.Lock() - if tooManyWaiters { - assert.ErrorContains(t, errs, ErrTooManyWaiters.Error()) - } else { - assert.NoError(t, errs) + for err := range allErrors { + if err == nil { + continue + } + errCounts[err.Error()]++ } - bq.lock.Unlock() - // confirm all bytes were released by acquiring maxLimitBytes. - assert.True(t, bq.TryAcquire(tt.maxLimitBytes)) + require.Equal(t, test.expectErrs, errCounts) + + // Make sure we can allocate the whole limit at end-of-test. + release, err := bq.Acquire(ctx, test.maxLimitAdmit) + assert.NoError(t, err) + release() + + // and the final state is all 0. + bq.waitForPending(0, 0) }) } } -func TestAcquireContextCanceled(t *testing.T) { - maxLimitBytes := 1000 - maxLimitWaiters := 100 - numRequests := 100 - requestSize := 21 - numReqsUntilBlocked := maxLimitBytes / requestSize - requestsAboveLimit := abs(int64(numRequests) - int64(numReqsUntilBlocked)) - - blockedRequests := min(int64(maxLimitWaiters), requestsAboveLimit) - - exp := tracetest.NewInMemoryExporter() - tp := trace.NewTracerProvider(trace.WithSyncer(exp)) - - bq := NewBoundedQueue(tp, int64(maxLimitBytes), int64(maxLimitWaiters)) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - var errs error - var wg sync.WaitGroup - for i := 0; i < numRequests; i++ { - wg.Add(1) - go func() { - err := bq.Acquire(ctx, int64(requestSize)) - bq.lock.Lock() - defer bq.lock.Unlock() - errs = multierr.Append(errs, err) - wg.Done() - }() +func TestBoundedQueueLIFO(t *testing.T) { + const maxAdmit = 10 + + for _, firstAcquire := range mkRange(2, 8) { + for _, firstWait := range mkRange(2, 8) { + t.Run(fmt.Sprint(firstAcquire, ",", firstWait), func(t *testing.T) { + t.Parallel() + + bq := newBQTest(t, maxAdmit, maxAdmit) + ctx := context.Background() + + // Fill the queue + relFirst, err := bq.Acquire(ctx, firstAcquire) + require.NoError(t, err) + bq.waitForPending(firstAcquire, 0) + + relSecond, err := bq.Acquire(ctx, maxAdmit-firstAcquire-1) + require.NoError(t, err) + bq.waitForPending(maxAdmit-1, 0) + + relOne, err := bq.Acquire(ctx, 1) + require.NoError(t, err) + bq.waitForPending(maxAdmit, 0) + + // Create two half-size waiters + var relW0 ReleaseFunc + notW0 := bq.startWaiter(ctx, firstWait, &relW0) + bq.waitForPending(maxAdmit, firstWait) + + var relW1 ReleaseFunc + secondWait := maxAdmit - firstWait + notW1 := bq.startWaiter(ctx, secondWait, &relW1) + bq.waitForPending(maxAdmit, maxAdmit) + + relFirst() + + // early is true when releasing the first acquired + // will not make enough room for the first waiter + early := firstAcquire < secondWait + if early { + relSecond() + } + + // Expect notifications in LIFO order, i.e., W1 before W0. + select { + case <-notW0.Chan(): + t.Fatalf("FIFO order -- incorrect") + case <-notW1.Chan(): + if !early { + relSecond() + } + } + relOne() + + <-notW0.Chan() + + relW0() + relW1() + + bq.waitForPending(0, 0) + }) + } } +} - // Wait until all calls to Acquire() happen and we have the expected number of waiters. - require.Eventually(t, func() bool { - bq.lock.Lock() - defer bq.lock.Unlock() - return bq.waiters.Len() == int(blockedRequests) - }, 3*time.Second, 10*time.Millisecond) - - cancel() - wg.Wait() - assert.ErrorContains(t, errs, "context canceled") - - // Expect spans named admission_blocked w/ context canceled. - spans := exp.GetSpans() - exp.Reset() - assert.NotEmpty(t, spans) - for _, span := range spans { - assert.Equal(t, "admission_blocked", span.Name) - assert.Equal(t, codes.Error, span.Status.Code) - assert.Equal(t, "context canceled", span.Status.Description) - } +func TestBoundedQueueCancelation(t *testing.T) { + // this test attempts to exercise the race condition between + // the Acquire slow path and context cancelation. + const ( + repetition = 100 + maxAdmit = 10 + ) + bq := newBQTest(t, maxAdmit, maxAdmit) + + for number := range repetition { + ctx, cancel := context.WithCancel(context.Background()) + + tester := func() { + // This acquire either succeeds or is canceled. + testrel, err := bq.Acquire(ctx, maxAdmit) + defer testrel() + if err == nil { + return + } + serr, ok := status.FromError(err) + require.True(t, ok, "has gRPC status") + require.Equal(t, codes.Canceled, serr.Code()) + } + + release, err := bq.Acquire(ctx, maxAdmit) + require.NoError(t, err) - // Now all waiters should have returned and been removed. - assert.Equal(t, 0, bq.waiters.Len()) + go tester() - for i := 0; i < numReqsUntilBlocked; i++ { - assert.NoError(t, bq.Release(int64(requestSize))) - assert.Equal(t, int64(0), bq.currentWaiters) + if number%2 == 0 { + go cancel() + go release() + } else { + go release() + go cancel() + } + + bq.waitForPending(0, 0) } - assert.True(t, bq.TryAcquire(int64(maxLimitBytes))) +} - // Expect no more spans, because admission was not blocked. - spans = exp.GetSpans() - require.Empty(t, spans) +func TestBoundedQueueNoop(t *testing.T) { + nq := NewUnboundedQueue() + for _, i := range mkRange(1, 100) { + rel, err := nq.Acquire(context.Background(), i<<20) + require.NoError(t, err) + defer rel() + } } diff --git a/internal/otelarrow/admission/controller.go b/internal/otelarrow/admission/controller.go new file mode 100644 index 000000000000..0970834811a9 --- /dev/null +++ b/internal/otelarrow/admission/controller.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + +import ( + "context" +) + +// Queue is a weighted admission queue interface. +type Queue interface { + // Acquire asks the controller to admit the caller. + // + // The weight parameter specifies how large of an admission to make. + // This might be used on the bytes of request (for example) to differentiate + // between large and small requests. + // + // Admit will return when one of the following events occurs: + // + // (1) admission is allowed, or + // (2) the provided ctx becomes canceled, or + // (3) there are so many existing waiters that the + // controller decides to reject this caller without + // admitting it. + // + // In case (1), the return value will be a non-nil + // ReleaseFunc. The caller must invoke it after it is finished + // with the resource being guarded by the admission + // controller. + // + // In case (2), the return value will be a Cancelled or + // DeadlineExceeded error. + // + // In case (3), the return value will be a ResourceExhausted + // error. + Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error) +} + +// ReleaseFunc is returned by Acquire when the Acquire() was admitted. +type ReleaseFunc func() + +type noopController struct{} + +var _ Queue = noopController{} + +// NewUnboundedQueue returns a no-op implementation of the Queue interface. +func NewUnboundedQueue() Queue { + return noopController{} +} + +func noopRelease() {} + +// Acquire implements Queue. +func (noopController) Acquire(_ context.Context, _ uint64) (ReleaseFunc, error) { + return noopRelease, nil +} diff --git a/internal/otelarrow/admission/notification.go b/internal/otelarrow/admission/notification.go new file mode 100644 index 000000000000..bf8cd9b2884b --- /dev/null +++ b/internal/otelarrow/admission/notification.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + +// notification.N is a minimal Go version of absl::Notification: +// +// https://github.com/abseil/abseil-cpp/blob/master/absl/synchronization/notification.h +// +// Use New() to construct a notification object (the zero value is not +// usable). +type N struct { + c chan struct{} +} + +func newNotification() N { + return N{c: make(chan struct{})} +} + +func (n *N) Notify() { + close(n.c) +} + +func (n *N) HasBeenNotified() bool { + select { + case <-n.c: + return true + default: + return false + } +} + +func (n *N) WaitForNotification() { + <-n.c +} + +// Chan allows a caller to wait for the notification as part of a +// select statement. Outside of a select statement, prefer writing +// WaitForNotification(). +func (n *N) Chan() <-chan struct{} { + return n.c +} diff --git a/internal/otelarrow/admission/notification_test.go b/internal/otelarrow/admission/notification_test.go new file mode 100644 index 000000000000..90fe61defb06 --- /dev/null +++ b/internal/otelarrow/admission/notification_test.go @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNotification(t *testing.T) { + require := require.New(t) + + start := newNotification() + require.False(start.HasBeenNotified()) + + done := make([]N, 5) + for i := 0; i < 5; i++ { + done[i] = newNotification() + go func(i int) { + start.WaitForNotification() + done[i].Notify() + }(i) + } + + // None of the goroutines can make progress until we notify + // start. + for now := time.Now(); time.Now().Before(now.Add(100 * time.Millisecond)); { + for _, n := range done { + require.False(n.HasBeenNotified()) + } + } + + start.Notify() + + // Now the goroutines can finish. + for _, n := range done { + n.WaitForNotification() + require.True(n.HasBeenNotified()) + } +} diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index d02ec733917f..506c044a0fef 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -46,8 +46,8 @@ import ( ) type testParams struct { - threadCount int - requestUntil func(*testConsumer) bool + threadCount int + requestWhileTrue func(*testConsumer) bool // missingDeadline is configured so the zero value implies a deadline, // which is the default. @@ -100,7 +100,7 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) { tset := componenttest.NewNopTelemetrySettings() - core, obslogs := observer.New(zapcore.InfoLevel) + core, obslogs := observer.New(zapcore.DebugLevel) exp := tracetest.NewInMemoryExporter() @@ -221,7 +221,7 @@ func testIntegrationTraces(ctx context.Context, t *testing.T, tp testParams, cfg go func(num int) { defer clientDoneWG.Done() generator := mkgen() - for i := 0; tp.requestUntil(testCon); i++ { + for i := 0; tp.requestWhileTrue(testCon); i++ { td := generator(i) errf(t, exporter.ConsumeTraces(ctx, td)) @@ -388,18 +388,43 @@ func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, eSigs, eMsgs := logSigs(testCon.expLogs) rSigs, rMsgs := logSigs(testCon.recvLogs) - // Test for arrow receiver stream errors on both sides. + // Test for arrow stream errors on both sides. require.Positive(t, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eMsgs) require.Positive(t, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs) - // Ensure both side's error logs include memory limit errors - // one way or another. + // Ensure both side's error logs include admission control errors. require.Positive(t, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs) require.Positive(t, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs) return nil, nil } +var admissionRegexp = regexp.MustCompile(`too much pending data`) + +func countAdmissionLimitErrors(msgs []string) (cnt int) { + for _, msg := range msgs { + if admissionRegexp.MatchString(msg) { + cnt++ + } + } + return +} + +func failureAdmissionLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, _ [][]ptrace.Traces) (rops, eops map[string]int) { + eSigs, eMsgs := logSigs(testCon.expLogs) + rSigs, rMsgs := logSigs(testCon.recvLogs) + + // Test for arrow stream errors on both sides. + require.Positive(t, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eMsgs) + require.Positive(t, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs) + + // Ensure both side's error logs include admission limit errors. + require.Positive(t, countAdmissionLimitErrors(rMsgs), "should have admission limit errors: %v", rMsgs) + require.Positive(t, countAdmissionLimitErrors(eMsgs), "should have admission limit errors: %v", eMsgs) + + return nil, nil +} + func consumerSuccess(t *testing.T, err error) { require.NoError(t, err) } @@ -434,7 +459,7 @@ func TestIntegrationTracesSimple(t *testing.T) { // until 10 threads can write 1000 spans var params = testParams{ threadCount: 10, - requestUntil: func(test *testConsumer) bool { + requestWhileTrue: func(test *testConsumer) bool { return test.sink.SpanCount() < 1000 }, } @@ -455,7 +480,7 @@ func TestIntegrationDeadlinePropagation(t *testing.T) { // Until at least one span is written. var params = testParams{ threadCount: 1, - requestUntil: func(test *testConsumer) bool { + requestWhileTrue: func(test *testConsumer) bool { return test.sink.SpanCount() < 1 }, missingDeadline: !hasDeadline, @@ -480,7 +505,7 @@ func TestIntegrationMemoryLimited(t *testing.T) { // until exporter and receiver finish at least one ArrowTraces span. params := testParams{ threadCount: 10, - requestUntil: func(test *testConsumer) bool { + requestWhileTrue: func(test *testConsumer) bool { cf := func(spans tracetest.SpanStubs) (cnt int) { for _, span := range spans { if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" { @@ -568,7 +593,7 @@ func TestIntegrationSelfTracing(t *testing.T) { // until 2 Arrow stream spans are received from self instrumentation var params = testParams{ threadCount: 10, - requestUntil: func(test *testConsumer) bool { + requestWhileTrue: func(test *testConsumer) bool { cnt := 0 for _, span := range test.expSpans.GetSpans() { @@ -589,3 +614,80 @@ func TestIntegrationSelfTracing(t *testing.T) { } }, func() GenFunc { return makeTestTraces }, consumerSuccess, multiStreamEnding) } + +func nearLimitGenFunc() MkGen { + var sizer ptrace.ProtoMarshaler + const nearLimit = 900 << 10 // close to 1 MiB + const hardLimit = 1 << 20 // 1 MiB + + return func() GenFunc { + entropy := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing + + tracesGen := datagen.NewTracesGenerator( + entropy, + entropy.NewStandardResourceAttributes(), + entropy.NewStandardInstrumentationScopes(), + ) + + return func(int) ptrace.Traces { + size := 100 + for { + td := tracesGen.Generate(size, time.Minute) + uncomp := sizer.TracesSize(td) + if uncomp > nearLimit && uncomp < hardLimit { + return td + } + switch { + case uncomp > hardLimit: + size -= 10 + case uncomp < nearLimit/2: + size *= 2 + default: + size += 10 + } + } + } + } +} + +func TestIntegrationAdmissionLimited(t *testing.T) { + for _, allowWait := range []bool{false, true} { + t.Run(fmt.Sprint("allow_wait=", allowWait), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // until exporter and receiver finish at least one ArrowTraces span. + params := testParams{ + threadCount: 10, + requestWhileTrue: func(test *testConsumer) bool { + return test.sink.SpanCount() < 10000 + }, + } + + var ending func(*testing.T, testParams, *testConsumer, [][]ptrace.Traces) (_, _ map[string]int) + var waitingLimit uint64 + if allowWait { + ending = standardEnding + waitingLimit = uint64(params.threadCount) + } else { + ending = failureAdmissionLimitEnding + waitingLimit = 0 + } + + testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) { + rcfg.Admission.RequestLimitMiB = 1 + // either allow, or don't allow, all threads to wait + rcfg.Admission.WaitingLimitMiB = waitingLimit + ecfg.Arrow.NumStreams = 10 + + // Shorten timeouts for this test, because we intend + // for it to fail and don't want to wait for retries. + ecfg.TimeoutSettings.Timeout = 5 * time.Second + ecfg.RetryConfig.InitialInterval = 1 * time.Second + ecfg.RetryConfig.MaxInterval = 2 * time.Second + ecfg.RetryConfig.MaxElapsedTime = 10 * time.Second + ecfg.Arrow.MaxStreamLifetime = 5 * time.Second + }, nearLimitGenFunc(), consumerFailure, ending) + }) + } +} diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index ef126ccb60b2..e43742fbd0ac 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -82,11 +82,11 @@ Several common configuration structures provide additional capabilities automati In the `admission` configuration block the following settings are available: -- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream in terms of *uncompressed request size*. This should not be confused with `arrow.memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing. +- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream in terms of *uncompressed request size*. This should not be confused with `arrow.memory_limit_mib`, which limits allocations made by the consumer when translating arrow records into pdata objects. The `request_limit_mib` value is used to control how much traffic we admit, but does not control how much memory is used during request processing. -- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. +- `waiting_limit_mib` (default: 32): limits the number of requests waiting on admission after `request_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. -`request_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. +`request_limit_mib` and `waiting_limit_mib` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. ### Arrow-specific Configuration diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index 220b0fc45124..a5aafccfb87b 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -20,15 +20,20 @@ type Protocols struct { } type AdmissionConfig struct { - // RequestLimitMiB limits the number of requests that are received by the stream based on - // uncompressed request size. Request size is used to control how much traffic we admit - // for processing. + // RequestLimitMiB limits the number of requests that are + // received by the stream and admitted to the pipeline, based + // on uncompressed request size. RequestLimitMiB uint64 `mapstructure:"request_limit_mib"` - // WaiterLimit is the limit on the number of waiters waiting to be processed and consumed. - // This is a dimension of memory limiting to ensure waiters are not consuming an - // unexpectedly large amount of memory in the arrow receiver. - WaiterLimit int64 `mapstructure:"waiter_limit"` + // WaitingLimitMiB limits the number of requests that are + // received by the stream and allowed to wait for admission, + // based on uncompressed request size. + WaitingLimitMiB uint64 `mapstructure:"waiting_limit_mib"` + + // DeprecatedWaiterLimit is no longer supported. It was once + // a limit on the number of waiting requests. Use + // waiting_limit_mib instead. + DeprecatedWaiterLimit int64 `mapstructure:"waiter_limit"` } // ArrowConfig support configuring the Arrow receiver. @@ -84,8 +89,5 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error { if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB } - if cfg.Admission.WaiterLimit == 0 && cfg.Arrow.DeprecatedWaiterLimit != 0 { - cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit - } return nil } diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 60edaf00cf61..3acddb3a07c3 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -82,7 +82,7 @@ func TestUnmarshalConfig(t *testing.T) { }, Admission: AdmissionConfig{ RequestLimitMiB: 80, - WaiterLimit: 100, + WaitingLimitMiB: 10, }, }, cfg) @@ -105,9 +105,8 @@ func TestValidateDeprecatedConfig(t *testing.T) { }, }, Admission: AdmissionConfig{ - // cfg.Validate should now set these fields. + // cfg.Validate sets RequestLimitMiB from DeprecatedAdmissionLimitMiB RequestLimitMiB: 80, - WaiterLimit: 100, }, }, cfg) } @@ -134,7 +133,7 @@ func TestUnmarshalConfigUnix(t *testing.T) { }, Admission: AdmissionConfig{ RequestLimitMiB: defaultRequestLimitMiB, - WaiterLimit: defaultWaiterLimit, + WaitingLimitMiB: defaultWaitingLimitMiB, }, }, cfg) } diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index 07ccc1bbbb1a..c440a8de925b 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -21,7 +21,7 @@ const ( defaultMemoryLimitMiB = 128 defaultRequestLimitMiB = 128 - defaultWaiterLimit = 1000 + defaultWaitingLimitMiB = 32 ) // NewFactory creates a new OTel-Arrow receiver factory. @@ -52,7 +52,7 @@ func createDefaultConfig() component.Config { }, Admission: AdmissionConfig{ RequestLimitMiB: defaultRequestLimitMiB, - WaiterLimit: defaultWaiterLimit, + WaitingLimitMiB: defaultWaitingLimitMiB, }, } } diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index 717dbed42f0c..228fe92b895c 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -8,9 +8,7 @@ import ( "errors" "fmt" "io" - "net" "runtime" - "strconv" "strings" "sync" "sync/atomic" @@ -39,7 +37,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" @@ -80,7 +77,7 @@ type Receiver struct { newConsumer func() arrowRecord.ConsumerAPI netReporter netstats.Interface telemetryBuilder *internalmetadata.TelemetryBuilder - boundedQueue *admission.BoundedQueue + boundedQueue admission.Queue } // receiverStream holds the inFlightWG for a single stream. @@ -97,7 +94,7 @@ func New( gsettings configgrpc.ServerConfig, authServer auth.Server, newConsumer func() arrowRecord.ConsumerAPI, - bq *admission.BoundedQueue, + bq admission.Queue, netReporter netstats.Interface, ) (*Receiver, error) { tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver") @@ -448,15 +445,15 @@ type inFlightData struct { batchID int64 pendingCh chan<- batchResp span trace.Span + releaser admission.ReleaseFunc // refs counts the number of goroutines holding this object. // initially the recvOne() body, on success the // consumeAndRespond() function. refs atomic.Int32 - numAcquired int64 // how many bytes held in the semaphore - numItems int // how many items - uncompSize int64 // uncompressed data size + numItems int // how many items + uncompSize uint64 // uncompressed data size } func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) { @@ -503,16 +500,13 @@ func (id *inFlightData) anyDone(ctx context.Context) { return } - id.span.End() - - if id.numAcquired != 0 { - if err := id.boundedQueue.Release(id.numAcquired); err != nil { - id.telemetry.Logger.Error("release error", zap.Error(err)) - } + if id.releaser != nil { + id.releaser() } + id.span.End() if id.uncompSize != 0 { - id.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, -id.uncompSize) + id.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, -int64(id.uncompSize)) } if id.numItems != 0 { id.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(-id.numItems)) @@ -524,7 +518,7 @@ func (id *inFlightData) anyDone(ctx context.Context) { // is instrumented this way. var sized netstats.SizesStruct sized.Method = id.method - sized.Length = id.uncompSize + sized.Length = int64(id.uncompSize) id.netReporter.CountReceive(ctx, sized) id.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, -1) @@ -606,19 +600,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre } } - var prevAcquiredBytes int64 - uncompSizeHeaderStr, uncompSizeHeaderFound := authHdrs["otlp-pdata-size"] - if !uncompSizeHeaderFound || len(uncompSizeHeaderStr) == 0 { - // This is a compressed size so make sure to acquire the difference when request is decompressed. - prevAcquiredBytes = int64(proto.Size(req)) - } else { - var parseErr error - prevAcquiredBytes, parseErr = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64) - if parseErr != nil { - return status.Errorf(codes.Internal, "failed to convert string to request size: %v", parseErr) - } - } - var callerCancel context.CancelFunc if encodedTimeout, has := authHdrs["grpc-timeout"]; has && len(encodedTimeout) == 1 { if timeout, decodeErr := grpcutil.DecodeTimeout(encodedTimeout[0]); decodeErr != nil { @@ -638,17 +619,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre } } - // Use the bounded queue to memory limit based on incoming - // uncompressed request size and waiters. Acquire will fail - // immediately if there are too many waiters, or will - // otherwise block until timeout or enough memory becomes - // available. - acquireErr := r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes) - if acquireErr != nil { - return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", acquireErr) - } - flight.numAcquired = prevAcquiredBytes - data, numItems, uncompSize, consumeErr := r.consumeBatch(ac, req) if consumeErr != nil { @@ -658,19 +628,24 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre return status.Errorf(codes.Internal, "otel-arrow decode: %v", consumeErr) } + // Use the bounded queue to memory limit based on incoming + // uncompressed request size and waiters. Acquire will fail + // immediately if there are too many waiters, or will + // otherwise block until timeout or enough memory becomes + // available. + if releaser, acquireErr := r.boundedQueue.Acquire(inflightCtx, uncompSize); acquireErr == nil { + flight.releaser = releaser + } else { + // Note that the queue returns a gRPC ResourceExhausted or InvalidArgument status code. + return acquireErr + } + flight.uncompSize = uncompSize flight.numItems = numItems - r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize) + r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, int64(uncompSize)) r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems)) - numAcquired, secondAcquireErr := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound) - - flight.numAcquired = numAcquired - if secondAcquireErr != nil { - return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", secondAcquireErr) - } - // Recognize that the request is still in-flight via consumeAndRespond() flight.refs.Add(1) @@ -798,7 +773,7 @@ func (r *receiverStream) srvSendLoop(ctx context.Context, serverStream anyStream // consumeBatch applies the batch to the Arrow Consumer, returns a // slice of pdata objects of the corresponding data type as `any`. // along with the number of items and true uncompressed size. -func (r *Receiver) consumeBatch(arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) (retData any, numItems int, uncompSize int64, retErr error) { +func (r *Receiver) consumeBatch(arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) (retData any, numItems int, uncompSize uint64, retErr error) { payloads := records.GetArrowPayloads() if len(payloads) == 0 { @@ -816,7 +791,7 @@ func (r *Receiver) consumeBatch(arrowConsumer arrowRecord.ConsumerAPI, records * if err == nil { for _, metrics := range data { numItems += metrics.DataPointCount() - uncompSize += int64(sizer.MetricsSize(metrics)) + uncompSize += uint64(sizer.MetricsSize(metrics)) } } retData = data @@ -832,7 +807,7 @@ func (r *Receiver) consumeBatch(arrowConsumer arrowRecord.ConsumerAPI, records * if err == nil { for _, logs := range data { numItems += logs.LogRecordCount() - uncompSize += int64(sizer.LogsSize(logs)) + uncompSize += uint64(sizer.LogsSize(logs)) } } retData = data @@ -848,7 +823,7 @@ func (r *Receiver) consumeBatch(arrowConsumer arrowRecord.ConsumerAPI, records * if err == nil { for _, traces := range data { numItems += traces.SpanCount() - uncompSize += int64(sizer.TracesSize(traces)) + uncompSize += uint64(sizer.TracesSize(traces)) } } retData = data @@ -901,47 +876,3 @@ func (r *Receiver) consumeData(ctx context.Context, data any, flight *inFlightDa } return retErr } - -func (r *Receiver) acquireAdditionalBytes(ctx context.Context, prevAcquired, uncompSize int64, addr net.Addr, uncompSizeHeaderFound bool) (int64, error) { - diff := uncompSize - prevAcquired - - if diff == 0 { - return uncompSize, nil - } - - if uncompSizeHeaderFound { - var clientAddr string - if addr != nil { - clientAddr = addr.String() - } - // a mismatch between header set by exporter and the uncompSize just calculated. - r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header", - zap.String("client-address", clientAddr), - zap.Int("uncompsize", int(uncompSize)), - zap.Int("otlp-pdata-size", int(prevAcquired)), - ) - } else if diff < 0 { - // proto.Size() on compressed request was greater than pdata uncompressed size. - r.telemetry.Logger.Debug("uncompressed size is less than compressed size", - zap.Int("uncompressed", int(uncompSize)), - zap.Int("compressed", int(prevAcquired)), - ) - } - - if diff < 0 { - // If the difference is negative, release the overage. - if err := r.boundedQueue.Release(-diff); err != nil { - return 0, err - } - } else { - // Release previously acquired bytes to prevent deadlock and - // reacquire the uncompressed size we just calculated. - if err := r.boundedQueue.Release(prevAcquired); err != nil { - return 0, err - } - if err := r.boundedQueue.Acquire(ctx, uncompSize); err != nil { - return 0, err - } - } - return uncompSize, nil -} diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 9a11185ee24a..5f3e19ed2a8c 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "io" - "strconv" "strings" "sync" "testing" @@ -36,7 +35,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" - "go.opentelemetry.io/otel/trace/noop" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" "golang.org/x/net/http2/hpack" @@ -50,10 +48,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock" ) -var noopTraces = noop.NewTracerProvider() +var noopTelemetry = componenttest.NewNopTelemetrySettings() -func defaultBQ() *admission.BoundedQueue { - return admission.NewBoundedQueue(noopTraces, int64(100000), int64(10)) +func defaultBQ() admission.Queue { + return admission.NewBoundedQueue(noopTelemetry, 100000, 10) } type compareJSONTraces struct{ ptrace.Traces } @@ -360,7 +358,7 @@ func (ctc *commonTestCase) newOOMConsumer() arrowRecord.ConsumerAPI { return mock } -func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq *admission.BoundedQueue, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { +func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq admission.Queue, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { var authServer auth.Server var gsettings configgrpc.ServerConfig for _, gf := range opts { @@ -409,6 +407,10 @@ func requireExhaustedStatus(t *testing.T, err error) { requireStatus(t, codes.ResourceExhausted, err) } +func requireInvalidArgumentStatus(t *testing.T, err error) { + requireStatus(t, codes.InvalidArgument, err) +} + func requireStatus(t *testing.T, code codes.Code, err error) { require.Error(t, err) status, ok := status.FromError(err) @@ -416,55 +418,26 @@ func requireStatus(t *testing.T, code codes.Code, err error) { require.Equal(t, code, status.Code()) } -func TestBoundedQueueWithPdataHeaders(t *testing.T) { +func TestBoundedQueueLimits(t *testing.T) { var sizer ptrace.ProtoMarshaler stdTesting := otelAssert.NewStdUnitTest(t) - pdataSizeTenTraces := sizer.TracesSize(testdata.GenerateTraces(10)) - defaultBoundedQueueLimit := int64(100000) + td := testdata.GenerateTraces(10) + tdSize := uint64(sizer.TracesSize(td)) + tests := []struct { - name string - numTraces int - includePdataHeader bool - pdataSize string - rejected bool + name string + admitLimit uint64 + expectErr bool }{ { - name: "no header compressed greater than uncompressed", - numTraces: 10, - }, - { - name: "no header compressed less than uncompressed", - numTraces: 100, - }, - { - name: "pdata header less than uncompressedSize", - numTraces: 10, - pdataSize: strconv.Itoa(pdataSizeTenTraces / 2), - includePdataHeader: true, - }, - { - name: "pdata header equal uncompressedSize", - numTraces: 10, - pdataSize: strconv.Itoa(pdataSizeTenTraces), - includePdataHeader: true, + name: "admit request", + admitLimit: tdSize * 2, + expectErr: false, }, { - name: "pdata header greater than uncompressedSize", - numTraces: 10, - pdataSize: strconv.Itoa(pdataSizeTenTraces * 2), - includePdataHeader: true, - }, - { - name: "no header compressed accepted uncompressed rejected", - numTraces: 100, - rejected: true, - }, - { - name: "pdata header accepted uncompressed rejected", - numTraces: 100, - rejected: true, - pdataSize: strconv.Itoa(pdataSizeTenTraces), - includePdataHeader: true, + name: "reject request", + admitLimit: tdSize / 2, + expectErr: true, }, } for _, tt := range tests { @@ -472,35 +445,23 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) { tc := newHealthyTestChannel(t) ctc := newCommonTestCase(t, tc) - td := testdata.GenerateTraces(tt.numTraces) batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) require.NoError(t, err) - if tt.includePdataHeader { - var hpb bytes.Buffer - hpe := hpack.NewEncoder(&hpb) - err = hpe.WriteField(hpack.HeaderField{ - Name: "otlp-pdata-size", - Value: tt.pdataSize, - }) - assert.NoError(t, err) - batch.Headers = make([]byte, hpb.Len()) - copy(batch.Headers, hpb.Bytes()) - } - var bq *admission.BoundedQueue - if tt.rejected { + var bq admission.Queue + if tt.expectErr { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) - bq = admission.NewBoundedQueue(noopTraces, int64(sizer.TracesSize(td)-100), 10) + bq = admission.NewBoundedQueue(noopTelemetry, tt.admitLimit, 0) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission.NewBoundedQueue(noopTraces, defaultBoundedQueueLimit, 10) + bq = admission.NewBoundedQueue(noopTelemetry, tt.admitLimit, 0) } ctc.start(ctc.newRealConsumer, bq) ctc.putBatch(batch, nil) - if tt.rejected { - requireExhaustedStatus(t, ctc.wait()) + if tt.expectErr { + requireInvalidArgumentStatus(t, ctc.wait()) } else { data := <-ctc.consume actualTD := data.Data.(ptrace.Traces) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 23ec4c96fbbc..75152dc42def 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -22,13 +22,13 @@ type Receiver struct { plogotlp.UnimplementedGRPCServer nextConsumer consumer.Logs obsrecv *receiverhelper.ObsReport - boundedQueue *admission.BoundedQueue + boundedQueue admission.Queue sizer *plog.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, @@ -41,26 +41,23 @@ func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper // Export implements the service Export logs func. func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plogotlp.ExportResponse, error) { ld := req.Logs() - numSpans := ld.LogRecordCount() - if numSpans == 0 { + numRecords := ld.LogRecordCount() + if numRecords == 0 { return plogotlp.NewExportResponse(), nil } ctx = r.obsrecv.StartLogsOp(ctx) - sizeBytes := int64(r.sizer.LogsSize(req.Logs())) - err := r.boundedQueue.Acquire(ctx, sizeBytes) - if err != nil { - return plogotlp.NewExportResponse(), err + var err error + sizeBytes := uint64(r.sizer.LogsSize(req.Logs())) + if release, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + err = r.nextConsumer.ConsumeLogs(ctx, ld) + release() // immediate release + } else { + err = acqErr } - defer func() { - if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) - } - }() - err = r.nextConsumer.ConsumeLogs(ctx, ld) - r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) + r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numRecords, err) return plogotlp.NewExportResponse(), err } diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index bf58f738c34e..a0bbb056f24c 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -8,134 +8,181 @@ import ( "errors" "net" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/multierr" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( - maxWaiters = 10 - maxBytes = int64(250) + maxBytes = 250 ) -func TestExport(t *testing.T) { +type testSink struct { + consumertest.LogsSink + context.Context + context.CancelFunc +} + +func newTestSink() *testSink { + ctx, cancel := context.WithCancel(context.Background()) + return &testSink{ + Context: ctx, + CancelFunc: cancel, + } +} + +func (ts *testSink) unblock() { + time.Sleep(10 * time.Millisecond) + ts.CancelFunc() +} + +func (ts *testSink) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + <-ts.Context.Done() + return ts.LogsSink.ConsumeLogs(ctx, ld) +} + +func TestExport_Success(t *testing.T) { ld := testdata.GenerateLogs(1) req := plogotlp.NewExportRequestFromLogs(ld) - logSink := new(consumertest.LogsSink) - logClient := makeLogsServiceClient(t, logSink) - resp, err := logClient.Export(context.Background(), req) + logSink := newTestSink() + logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink) + + go logSink.unblock() + resp, err := logsClient.Export(context.Background(), req) require.NoError(t, err, "Failed to export trace: %v", err) require.NotNil(t, resp, "The response is missing") - lds := logSink.AllLogs() - require.Len(t, lds, 1) - assert.EqualValues(t, ld, lds[0]) + require.Len(t, logSink.AllLogs(), 1) + assert.EqualValues(t, ld, logSink.AllLogs()[0]) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } func TestExport_EmptyRequest(t *testing.T) { - logSink := new(consumertest.LogsSink) + logSink := newTestSink() + logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink) + empty := plogotlp.NewExportRequest() - logClient := makeLogsServiceClient(t, logSink) - resp, err := logClient.Export(context.Background(), plogotlp.NewExportRequest()) + go logSink.unblock() + resp, err := logsClient.Export(context.Background(), empty) assert.NoError(t, err, "Failed to export trace: %v", err) assert.NotNil(t, resp, "The response is missing") + + require.Empty(t, logSink.AllLogs()) + + // No self-tracing spans are issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Empty(t, selfExp.GetSpans()) } func TestExport_ErrorConsumer(t *testing.T) { ld := testdata.GenerateLogs(1) req := plogotlp.NewExportRequestFromLogs(ld) - logClient := makeLogsServiceClient(t, consumertest.NewErr(errors.New("my error"))) - resp, err := logClient.Export(context.Background(), req) + logsClient, selfExp, selfProv := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error"))) + resp, err := logsClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Equal(t, plogotlp.ExportResponse{}, resp) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } -func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { +func TestExport_AdmissionRequestTooLarge(t *testing.T) { ld := testdata.GenerateLogs(10) - logSink := new(consumertest.LogsSink) + logSink := newTestSink() req := plogotlp.NewExportRequestFromLogs(ld) + logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink) - logClient := makeLogsServiceClient(t, logSink) - resp, err := logClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + go logSink.unblock() + resp, err := logsClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = rejecting request, request is too large") assert.Equal(t, plogotlp.ExportResponse{}, resp) -} -func TestExport_TooManyWaiters(t *testing.T) { - bc := testconsumer.NewBlockingConsumer() + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) +} - logsClient := makeLogsServiceClient(t, bc) - bg := context.Background() - var errs, err error +func TestExport_AdmissionLimitExceeded(t *testing.T) { ld := testdata.GenerateLogs(1) + logSink := newTestSink() req := plogotlp.NewExportRequestFromLogs(ld) - var mtx sync.Mutex - numResponses := 0 - // Send request that will acquire all of the semaphores bytes and block. - go func() { - _, err = logsClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() - }() - for i := 0; i < maxWaiters+1; i++ { + logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink) + + var wait sync.WaitGroup + wait.Add(10) + + var expectSuccess atomic.Int32 + + for i := 0; i < 10; i++ { go func() { - _, err := logsClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() + defer wait.Done() + _, err := logsClient.Export(context.Background(), req) + if err == nil { + // some succeed! + expectSuccess.Add(1) + return + } + assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = rejecting request, too much pending data") }() } - // sleep so all async requests are blocked on semaphore Acquire. - time.Sleep(1 * time.Second) + logSink.unblock() + wait.Wait() - // unblock and wait for errors to be returned and written. - bc.Unblock() - assert.Eventually(t, func() bool { - mtx.Lock() - defer mtx.Unlock() - errSlice := multierr.Errors(errs) - return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3*time.Second, 10*time.Millisecond) + // 10 self-tracing spans are issued + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 10) - assert.ErrorContains(t, errs, "too many waiters") + // Expect the correct number of success and failure. + testSuccess := 0 + for _, span := range selfExp.GetSpans() { + switch span.Status.Code { + case codes.Ok, codes.Unset: + testSuccess++ + } + } + require.Equal(t, int(expectSuccess.Load()), testSuccess) } -func makeLogsServiceClient(t *testing.T, lc consumer.Logs) plogotlp.GRPCClient { - addr := otlpReceiverOnGRPCServer(t, lc) +func makeTraceServiceClient(t *testing.T, lc consumer.Logs) (plogotlp.GRPCClient, *tracetest.InMemoryExporter, *trace.TracerProvider) { + addr, exp, tp := otlpReceiverOnGRPCServer(t, lc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) t.Cleanup(func() { require.NoError(t, cc.Close()) }) - return plogotlp.NewGRPCClient(cc) + return plogotlp.NewGRPCClient(cc), exp, tp } -func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { +func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) (net.Addr, *tracetest.InMemoryExporter, *trace.TracerProvider) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) @@ -143,16 +190,23 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { require.NoError(t, ln.Close()) }) + exp := tracetest.NewInMemoryExporter() + + tp := trace.NewTracerProvider(trace.WithSyncer(exp)) + telset := componenttest.NewNopTelemetrySettings() + telset.TracerProvider = tp + set := receivertest.NewNopSettings() - set.ID = component.NewIDWithName(component.MustNewType("otlp"), "log") + set.TelemetrySettings = telset + + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "logs") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "grpc", ReceiverCreateSettings: set, }) require.NoError(t, err) - - bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() @@ -161,5 +215,5 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { _ = srv.Serve(ln) }() - return ln.Addr() + return ln.Addr(), exp, tp } diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index d038d63bef3d..adb75cf2c7f2 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -22,13 +22,13 @@ type Receiver struct { pmetricotlp.UnimplementedGRPCServer nextConsumer consumer.Metrics obsrecv *receiverhelper.ObsReport - boundedQueue *admission.BoundedQueue + boundedQueue admission.Queue sizer *pmetric.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, @@ -48,18 +48,15 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p ctx = r.obsrecv.StartMetricsOp(ctx) - sizeBytes := int64(r.sizer.MetricsSize(req.Metrics())) - err := r.boundedQueue.Acquire(ctx, sizeBytes) - if err != nil { - return pmetricotlp.NewExportResponse(), err + var err error + sizeBytes := uint64(r.sizer.MetricsSize(req.Metrics())) + if release, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + err = r.nextConsumer.ConsumeMetrics(ctx, md) + release() // immediate release + } else { + err = acqErr } - defer func() { - if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) - } - }() - err = r.nextConsumer.ConsumeMetrics(ctx, md) r.obsrecv.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) return pmetricotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index 9bd0b9911e57..77a690963bac 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -8,135 +8,181 @@ import ( "errors" "net" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/multierr" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( - maxWaiters = 10 - maxBytes = int64(250) + maxBytes = 250 ) -func TestExport(t *testing.T) { +type testSink struct { + consumertest.MetricsSink + context.Context + context.CancelFunc +} + +func newTestSink() *testSink { + ctx, cancel := context.WithCancel(context.Background()) + return &testSink{ + Context: ctx, + CancelFunc: cancel, + } +} + +func (ts *testSink) unblock() { + time.Sleep(10 * time.Millisecond) + ts.CancelFunc() +} + +func (ts *testSink) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + <-ts.Context.Done() + return ts.MetricsSink.ConsumeMetrics(ctx, md) +} + +func TestExport_Success(t *testing.T) { md := testdata.GenerateMetrics(1) req := pmetricotlp.NewExportRequestFromMetrics(md) - metricSink := new(consumertest.MetricsSink) - metricsClient := makeMetricsServiceClient(t, metricSink) - resp, err := metricsClient.Export(context.Background(), req) + metricsSink := newTestSink() + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, metricsSink) - require.NoError(t, err, "Failed to export metrics: %v", err) + go metricsSink.unblock() + resp, err := metricsClient.Export(context.Background(), req) + require.NoError(t, err, "Failed to export trace: %v", err) require.NotNil(t, resp, "The response is missing") - mds := metricSink.AllMetrics() - require.Len(t, mds, 1) - assert.EqualValues(t, md, mds[0]) + require.Len(t, metricsSink.AllMetrics(), 1) + assert.EqualValues(t, md, metricsSink.AllMetrics()[0]) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } func TestExport_EmptyRequest(t *testing.T) { - metricSink := new(consumertest.MetricsSink) - metricsClient := makeMetricsServiceClient(t, metricSink) - resp, err := metricsClient.Export(context.Background(), pmetricotlp.NewExportRequest()) - require.NoError(t, err) - require.NotNil(t, resp) + metricsSink := newTestSink() + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, metricsSink) + empty := pmetricotlp.NewExportRequest() + + go metricsSink.unblock() + resp, err := metricsClient.Export(context.Background(), empty) + assert.NoError(t, err, "Failed to export trace: %v", err) + assert.NotNil(t, resp, "The response is missing") + + require.Empty(t, metricsSink.AllMetrics()) + + // No self-tracing spans are issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Empty(t, selfExp.GetSpans()) } func TestExport_ErrorConsumer(t *testing.T) { md := testdata.GenerateMetrics(1) req := pmetricotlp.NewExportRequestFromMetrics(md) - metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(errors.New("my error"))) + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, consumertest.NewErr(errors.New("my error"))) resp, err := metricsClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Equal(t, pmetricotlp.ExportResponse{}, resp) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } -func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { +func TestExport_AdmissionRequestTooLarge(t *testing.T) { md := testdata.GenerateMetrics(10) - metricSink := new(consumertest.MetricsSink) + metricsSink := newTestSink() req := pmetricotlp.NewExportRequestFromMetrics(md) + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, metricsSink) - metricsClient := makeMetricsServiceClient(t, metricSink) + go metricsSink.unblock() resp, err := metricsClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = rejecting request, request is too large") assert.Equal(t, pmetricotlp.ExportResponse{}, resp) -} -func TestExport_TooManyWaiters(t *testing.T) { - bc := testconsumer.NewBlockingConsumer() + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) +} - metricsClient := makeMetricsServiceClient(t, bc) - bg := context.Background() - var errs, err error +func TestExport_AdmissionLimitExceeded(t *testing.T) { md := testdata.GenerateMetrics(1) + metricsSink := newTestSink() req := pmetricotlp.NewExportRequestFromMetrics(md) - var mtx sync.Mutex - numResponses := 0 - // Send request that will acquire all of the semaphores bytes and block. - go func() { - _, err = metricsClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() - }() - for i := 0; i < maxWaiters+1; i++ { + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, metricsSink) + + var wait sync.WaitGroup + wait.Add(10) + + var expectSuccess atomic.Int32 + + for i := 0; i < 10; i++ { go func() { - _, err := metricsClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() + defer wait.Done() + _, err := metricsClient.Export(context.Background(), req) + if err == nil { + // some succeed! + expectSuccess.Add(1) + return + } + assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = rejecting request, too much pending data") }() } - // sleep so all async requests are blocked on semaphore Acquire. - time.Sleep(1 * time.Second) + metricsSink.unblock() + wait.Wait() - // unblock and wait for errors to be returned and written. - bc.Unblock() - assert.Eventually(t, func() bool { - mtx.Lock() - defer mtx.Unlock() - errSlice := multierr.Errors(errs) - return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3*time.Second, 10*time.Millisecond) + // 10 self-tracing spans are issued + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 10) - assert.ErrorContains(t, errs, "too many waiters") + // Expect the correct number of success and failure. + testSuccess := 0 + for _, span := range selfExp.GetSpans() { + switch span.Status.Code { + case codes.Ok, codes.Unset: + testSuccess++ + } + } + require.Equal(t, int(expectSuccess.Load()), testSuccess) } -func makeMetricsServiceClient(t *testing.T, mc consumer.Metrics) pmetricotlp.GRPCClient { - addr := otlpReceiverOnGRPCServer(t, mc) - +func makeMetricsServiceClient(t *testing.T, mc consumer.Metrics) (pmetricotlp.GRPCClient, *tracetest.InMemoryExporter, *trace.TracerProvider) { + addr, exp, tp := otlpReceiverOnGRPCServer(t, mc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err, "Failed to create the MetricsServiceClient: %v", err) + require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) t.Cleanup(func() { require.NoError(t, cc.Close()) }) - return pmetricotlp.NewGRPCClient(cc) + return pmetricotlp.NewGRPCClient(cc), exp, tp } -func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { +func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, *tracetest.InMemoryExporter, *trace.TracerProvider) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) @@ -144,7 +190,15 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { require.NoError(t, ln.Close()) }) + exp := tracetest.NewInMemoryExporter() + + tp := trace.NewTracerProvider(trace.WithSyncer(exp)) + telset := componenttest.NewNopTelemetrySettings() + telset.TracerProvider = tp + set := receivertest.NewNopSettings() + set.TelemetrySettings = telset + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "metrics") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, @@ -152,8 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - - bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() @@ -162,5 +215,5 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { _ = srv.Serve(ln) }() - return ln.Addr() + return ln.Addr(), exp, tp } diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index af9bc335ea19..da3eb1b28bce 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -22,13 +22,13 @@ type Receiver struct { ptraceotlp.UnimplementedGRPCServer nextConsumer consumer.Traces obsrecv *receiverhelper.ObsReport - boundedQueue *admission.BoundedQueue + boundedQueue admission.Queue sizer *ptrace.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, @@ -41,25 +41,22 @@ func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelp // Export implements the service Export traces func. func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { td := req.Traces() - // We need to ensure that it propagates the receiver name as a tag numSpans := td.SpanCount() if numSpans == 0 { return ptraceotlp.NewExportResponse(), nil } + ctx = r.obsrecv.StartTracesOp(ctx) - sizeBytes := int64(r.sizer.TracesSize(req.Traces())) - err := r.boundedQueue.Acquire(ctx, sizeBytes) - if err != nil { - return ptraceotlp.NewExportResponse(), err + var err error + sizeBytes := uint64(r.sizer.TracesSize(req.Traces())) + if release, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + err = r.nextConsumer.ConsumeTraces(ctx, td) + release() // immediate release + } else { + err = acqErr } - defer func() { - if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) - } - }() - err = r.nextConsumer.ConsumeTraces(ctx, td) r.obsrecv.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) return ptraceotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index b968b79d20d8..ce769f5451c4 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -8,133 +8,181 @@ import ( "errors" "net" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/multierr" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( - maxWaiters = 10 - maxBytes = int64(250) + maxBytes = 250 ) -func TestExport(t *testing.T) { +type testSink struct { + consumertest.TracesSink + context.Context + context.CancelFunc +} + +func newTestSink() *testSink { + ctx, cancel := context.WithCancel(context.Background()) + return &testSink{ + Context: ctx, + CancelFunc: cancel, + } +} + +func (ts *testSink) unblock() { + time.Sleep(10 * time.Millisecond) + ts.CancelFunc() +} + +func (ts *testSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + <-ts.Context.Done() + return ts.TracesSink.ConsumeTraces(ctx, td) +} + +func TestExport_Success(t *testing.T) { td := testdata.GenerateTraces(1) req := ptraceotlp.NewExportRequestFromTraces(td) - traceSink := new(consumertest.TracesSink) - traceClient := makeTraceServiceClient(t, traceSink) + traceSink := newTestSink() + traceClient, selfExp, selfProv := makeTraceServiceClient(t, traceSink) + + go traceSink.unblock() resp, err := traceClient.Export(context.Background(), req) require.NoError(t, err, "Failed to export trace: %v", err) require.NotNil(t, resp, "The response is missing") require.Len(t, traceSink.AllTraces(), 1) assert.EqualValues(t, td, traceSink.AllTraces()[0]) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } func TestExport_EmptyRequest(t *testing.T) { - traceSink := new(consumertest.TracesSink) - traceClient := makeTraceServiceClient(t, traceSink) - resp, err := traceClient.Export(context.Background(), ptraceotlp.NewExportRequest()) + traceSink := newTestSink() + traceClient, selfExp, selfProv := makeTraceServiceClient(t, traceSink) + empty := ptraceotlp.NewExportRequest() + + go traceSink.unblock() + resp, err := traceClient.Export(context.Background(), empty) assert.NoError(t, err, "Failed to export trace: %v", err) assert.NotNil(t, resp, "The response is missing") + + require.Empty(t, traceSink.AllTraces()) + + // No self-tracing spans are issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Empty(t, selfExp.GetSpans()) } func TestExport_ErrorConsumer(t *testing.T) { td := testdata.GenerateTraces(1) req := ptraceotlp.NewExportRequestFromTraces(td) - traceClient := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error"))) + traceClient, selfExp, selfProv := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error"))) resp, err := traceClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Equal(t, ptraceotlp.ExportResponse{}, resp) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } -func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { +func TestExport_AdmissionRequestTooLarge(t *testing.T) { td := testdata.GenerateTraces(10) - traceSink := new(consumertest.TracesSink) + traceSink := newTestSink() req := ptraceotlp.NewExportRequestFromTraces(td) + traceClient, selfExp, selfProv := makeTraceServiceClient(t, traceSink) - traceClient := makeTraceServiceClient(t, traceSink) - + go traceSink.unblock() resp, err := traceClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = rejecting request, request is too large") assert.Equal(t, ptraceotlp.ExportResponse{}, resp) -} -func TestExport_TooManyWaiters(t *testing.T) { - bc := testconsumer.NewBlockingConsumer() + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) +} - traceClient := makeTraceServiceClient(t, bc) - bg := context.Background() - var errs, err error +func TestExport_AdmissionLimitExceeded(t *testing.T) { td := testdata.GenerateTraces(1) + traceSink := newTestSink() req := ptraceotlp.NewExportRequestFromTraces(td) - var mtx sync.Mutex - numResponses := 0 - // Send request that will acquire all of the semaphores bytes and block. - go func() { - _, err = traceClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() - }() - for i := 0; i < maxWaiters+1; i++ { + traceClient, selfExp, selfProv := makeTraceServiceClient(t, traceSink) + + var wait sync.WaitGroup + wait.Add(10) + + var expectSuccess atomic.Int32 + + for i := 0; i < 10; i++ { go func() { - _, err := traceClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() + defer wait.Done() + _, err := traceClient.Export(context.Background(), req) + if err == nil { + // some succeed! + expectSuccess.Add(1) + return + } + assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = rejecting request, too much pending data") }() } - // sleep so all async requests are blocked on semaphore Acquire. - time.Sleep(1 * time.Second) + traceSink.unblock() + wait.Wait() - // unblock and wait for errors to be returned and written. - bc.Unblock() - assert.Eventually(t, func() bool { - mtx.Lock() - defer mtx.Unlock() - errSlice := multierr.Errors(errs) - return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3*time.Second, 10*time.Millisecond) + // 10 self-tracing spans are issued + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 10) - assert.ErrorContains(t, errs, "too many waiters") + // Expect the correct number of success and failure. + testSuccess := 0 + for _, span := range selfExp.GetSpans() { + switch span.Status.Code { + case codes.Ok, codes.Unset: + testSuccess++ + } + } + require.Equal(t, int(expectSuccess.Load()), testSuccess) } -func makeTraceServiceClient(t *testing.T, tc consumer.Traces) ptraceotlp.GRPCClient { - addr := otlpReceiverOnGRPCServer(t, tc) +func makeTraceServiceClient(t *testing.T, tc consumer.Traces) (ptraceotlp.GRPCClient, *tracetest.InMemoryExporter, *trace.TracerProvider) { + addr, exp, tp := otlpReceiverOnGRPCServer(t, tc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) t.Cleanup(func() { require.NoError(t, cc.Close()) }) - return ptraceotlp.NewGRPCClient(cc) + return ptraceotlp.NewGRPCClient(cc), exp, tp } -func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { +func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, *tracetest.InMemoryExporter, *trace.TracerProvider) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) @@ -142,7 +190,15 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { require.NoError(t, ln.Close()) }) + exp := tracetest.NewInMemoryExporter() + + tp := trace.NewTracerProvider(trace.WithSyncer(exp)) + telset := componenttest.NewNopTelemetrySettings() + telset.TracerProvider = tp + set := receivertest.NewNopSettings() + set.TelemetrySettings = telset + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "trace") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, @@ -150,7 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() @@ -159,5 +215,5 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { _ = srv.Serve(ln) }() - return ln.Addr() + return ln.Addr(), exp, tp } diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 587c59ee448b..56fa4e198832 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -45,7 +45,7 @@ type otelArrowReceiver struct { obsrepGRPC *receiverhelper.ObsReport netReporter *netstats.NetworkReporter - boundedQueue *admission.BoundedQueue + boundedQueue admission.Queue settings receiver.Settings } @@ -59,14 +59,19 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive } if cfg.Arrow.DeprecatedWaiterLimit != 0 { - set.Logger.Warn("arrow.waiter_limit is deprecated, using admission.waiter_limit instead.") + set.Logger.Warn("arrow.waiter_limit is deprecated, using admission.waiting_limit_mib instead.") } netReporter, err := netstats.NewReceiverNetworkReporter(set) if err != nil { return nil, err } - bq := admission.NewBoundedQueue(set.TracerProvider, int64(cfg.Admission.RequestLimitMiB<<20), cfg.Admission.WaiterLimit) + var bq admission.Queue + if cfg.Admission.RequestLimitMiB > 0 { + bq = admission.NewBoundedQueue(set.TelemetrySettings, cfg.Admission.RequestLimitMiB<<20, cfg.Admission.WaitingLimitMiB<<20) + } else { + bq = admission.NewUnboundedQueue() + } r := &otelArrowReceiver{ cfg: cfg, settings: set, diff --git a/receiver/otelarrowreceiver/testdata/config.yaml b/receiver/otelarrowreceiver/testdata/config.yaml index e911cafdd0c5..d92e8bf8740d 100644 --- a/receiver/otelarrowreceiver/testdata/config.yaml +++ b/receiver/otelarrowreceiver/testdata/config.yaml @@ -29,4 +29,4 @@ protocols: memory_limit_mib: 123 admission: request_limit_mib: 80 - waiter_limit: 100 \ No newline at end of file + waiting_limit_mib: 10