diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index 63e25e05..877d8388 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -296,19 +296,27 @@ func (p *Processor) handleStream( if n == 0 { return readErr } - - if err := p.processBatch(ctx, processor, batch); err != nil { + if err := processor.ProcessBatch(ctx, batch); err != nil { return fmt.Errorf("cannot process batch: %w", err) } + for _, v := range *batch { + switch v.Type() { + case modelpb.ErrorEventType: + result.AcceptedDetails.Error++ + case modelpb.SpanEventType: + result.AcceptedDetails.Span++ + case modelpb.TransactionEventType: + result.AcceptedDetails.Transaction++ + case modelpb.MetricEventType: + result.AcceptedDetails.Metric++ + case modelpb.LogEventType: + result.AcceptedDetails.Log++ + } + } result.Accepted += n return readErr } -// processBatch processes the batch and returns the events to the pool after it's been processed. -func (p *Processor) processBatch(ctx context.Context, processor modelpb.BatchProcessor, batch *modelpb.Batch) error { - return processor.ProcessBatch(ctx, batch) -} - // getStreamReader returns a streamReader that reads ND-JSON lines from r. func (p *Processor) getStreamReader(r io.Reader) *streamReader { return &streamReader{ diff --git a/input/elasticapm/processor_test.go b/input/elasticapm/processor_test.go index 3dcea907..fd343e53 100644 --- a/input/elasticapm/processor_test.go +++ b/input/elasticapm/processor_test.go @@ -33,8 +33,17 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) +const batchSize = 10 + +type readerFunc func([]byte) (int, error) + +func (f readerFunc) Read(p []byte) (int, error) { + return f(p) +} + func TestHandleStreamReaderError(t *testing.T) { readErr := errors.New("read failed") + cnt := 5 var calls int var reader readerFunc = func(p []byte) (int, error) { calls++ @@ -43,7 +52,7 @@ func TestHandleStreamReaderError(t *testing.T) { } buf := bytes.NewBuffer(nil) buf.WriteString(validMetadata + "\n") - for i := 0; i < 5; i++ { + for i := 0; i < cnt; i++ { buf.WriteString(validTransaction + "\n") } return copy(p, buf.Bytes()), nil @@ -56,17 +65,20 @@ func TestHandleStreamReaderError(t *testing.T) { var actualResult Result err := sp.HandleStream( - context.Background(), &modelpb.APMEvent{}, - reader, 10, nopBatchProcessor{}, &actualResult, + context.Background(), + &modelpb.APMEvent{}, + reader, + batchSize, + nopBatchProcessor{}, + &actualResult, ) assert.ErrorIs(t, err, readErr) - assert.Equal(t, Result{Accepted: 5}, actualResult) -} - -type readerFunc func([]byte) (int, error) - -func (f readerFunc) Read(p []byte) (int, error) { - return f(p) + assert.Equal(t, Result{ + Accepted: cnt, + AcceptedDetails: AcceptedDetails{ + Transaction: cnt, + }, + }, actualResult) } func TestHandleStreamBatchProcessorError(t *testing.T) { @@ -85,11 +97,14 @@ func TestHandleStreamBatchProcessorError(t *testing.T) { processor := modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error { return test.err }) - var actualResult Result err := sp.HandleStream( - context.Background(), &modelpb.APMEvent{}, - strings.NewReader(payload), 10, processor, &actualResult, + context.Background(), + &modelpb.APMEvent{}, + strings.NewReader(payload), + batchSize, + processor, + &actualResult, ) assert.ErrorIs(t, err, test.err) assert.Zero(t, actualResult) @@ -186,9 +201,12 @@ func TestHandleStreamErrors(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) err := p.HandleStream( - context.Background(), &modelpb.APMEvent{}, - strings.NewReader(test.payload), 10, - nopBatchProcessor{}, &actualResult, + context.Background(), + &modelpb.APMEvent{}, + strings.NewReader(test.payload), + batchSize, + nopBatchProcessor{}, + &actualResult, ) assert.Equal(t, test.err, err) assert.Zero(t, actualResult.Accepted) @@ -220,13 +238,31 @@ func TestHandleStream(t *testing.T) { MaxEventSize: 100 * 1024, Semaphore: semaphore.NewWeighted(1), }) + + var actualResult Result err := p.HandleStream( - context.Background(), &modelpb.APMEvent{}, - strings.NewReader(payload), 10, batchProcessor, - &Result{}, + context.Background(), + &modelpb.APMEvent{}, + strings.NewReader(payload), + batchSize, + batchProcessor, + &actualResult, ) require.NoError(t, err) + // Assert that batch result is properly populated. + assert.Equal(t, Result{ + Accepted: 5, + AcceptedDetails: AcceptedDetails{ + Transaction: 1, + Span: 1, + Metric: 1, + Log: 1, + Error: 1, + }, + }, actualResult) + + // Assert that processor is properly executed. processors := make([]modelpb.APMEventType, len(events)) for i, event := range events { processors[i] = event.Type() @@ -260,8 +296,11 @@ func TestHandleStreamRUMv3(t *testing.T) { }) var result Result err := p.HandleStream( - context.Background(), &modelpb.APMEvent{}, - strings.NewReader(payload), 10, batchProcessor, + context.Background(), + &modelpb.APMEvent{}, + strings.NewReader(payload), + batchSize, + batchProcessor, &result, ) require.NoError(t, err) @@ -311,8 +350,11 @@ func TestHandleStreamBaseEvent(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) err := p.HandleStream( - context.Background(), &baseEvent, - strings.NewReader(payload), 10, batchProcessor, + context.Background(), + &baseEvent, + strings.NewReader(payload), + batchSize, + batchProcessor, &Result{}, ) require.NoError(t, err) @@ -347,12 +389,18 @@ func TestLabelLeak(t *testing.T) { MaxEventSize: 100 * 1024, Semaphore: semaphore.NewWeighted(1), }) - var actualResult Result - err := p.HandleStream(context.Background(), baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult) + err := p.HandleStream( + context.Background(), + baseEvent, + strings.NewReader(payload), + batchSize, + batchProcessor, + &Result{}) require.NoError(t, err) txs := processed assert.Len(t, txs, 2) + // Assert first tx assert.Equal(t, modelpb.NumericLabels{ "time_set": {Value: 1652185276}, @@ -365,8 +413,12 @@ func TestLabelLeak(t *testing.T) { }, modelpb.Labels(txs[0].Labels)) // Assert second tx - assert.Equal(t, modelpb.NumericLabels{"numeric": {Global: true, Value: 1}}, modelpb.NumericLabels(txs[1].NumericLabels)) - assert.Equal(t, modelpb.Labels{"ci_commit": {Global: true, Value: "unknown"}}, modelpb.Labels(txs[1].Labels)) + assert.Equal(t, modelpb.NumericLabels{ + "numeric": {Global: true, Value: 1}, + }, modelpb.NumericLabels(txs[1].NumericLabels)) + assert.Equal(t, modelpb.Labels{ + "ci_commit": {Global: true, Value: "unknown"}, + }, modelpb.Labels(txs[1].Labels)) } type nopBatchProcessor struct{} diff --git a/input/elasticapm/result.go b/input/elasticapm/result.go index 89e9df4c..191d2d2d 100644 --- a/input/elasticapm/result.go +++ b/input/elasticapm/result.go @@ -23,20 +23,37 @@ import ( type Result struct { errorsSpace [5]error + // Errors holds a limited number of errors that occurred while // processing the event stream. If the limit is reached, the // counters above are still incremented. Errors []error - // Accepted holds the number of valid events accepted. + + // Accepted holds the total number of valid events accepted. Accepted int + + // AcceptedDetails provides a detailed breakdown of the count + // of valid events categorized by signal type. + AcceptedDetails AcceptedDetails + // TooLarge holds the number of events that were rejected due // to exceeding the event size limit. TooLarge int + // Invalid holds the number of events that were rejected due // to being invalid, excluding those that are counted by TooLarge. Invalid int } +// ProcessedDetail holds the number of events processed for each type. +type AcceptedDetails struct { + Transaction int + Span int + Metric int + Log int + Error int +} + func (r *Result) addError(err error) { var invalid *InvalidInputError if errors.As(err, &invalid) {