Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
incr signal type after batch processing
Browse files Browse the repository at this point in the history
rubvs committed Dec 6, 2024

Verified

This commit was signed with the committer’s verified signature.
rubvs Ruben van Staden
1 parent a57bcc1 commit e857f20
Showing 2 changed files with 19 additions and 12 deletions.
23 changes: 13 additions & 10 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
@@ -198,16 +198,12 @@ func (p *Processor) readBatch(
err = v2.DecodeNestedError(reader, &input, batch)
case metricsetEventType:
err = v2.DecodeNestedMetricset(reader, &input, batch)
result.MetricAccepted++
case spanEventType:
err = v2.DecodeNestedSpan(reader, &input, batch)
result.SpanAccepted++
case transactionEventType:
err = v2.DecodeNestedTransaction(reader, &input, batch)
result.TransactionAccepted++
case logEventType:
err = v2.DecodeNestedLog(reader, &input, batch)
result.LogAccepted++
case rumv3ErrorEventType:
err = rumv3.DecodeNestedError(reader, &input, batch)
case rumv3TransactionEventType:
@@ -300,18 +296,25 @@ func (p *Processor) handleStream(
if n == 0 {
return readErr
}
if err := p.processBatch(ctx, processor, batch); err != nil {
if err := processor.ProcessBatch(ctx, batch); err != nil {
return fmt.Errorf("cannot process batch: %w", err)
}
for _, v := range *batch {
switch v.Type() {
case modelpb.SpanEventType:
result.SpanAccepted++
case modelpb.TransactionEventType:
result.TransactionAccepted++
case modelpb.MetricEventType:
result.MetricAccepted++
case modelpb.LogEventType:
result.LogAccepted++
}
}
result.Accepted += n
return readErr
}

// processBatch processes the batch and returns the events to the pool after it's been processed.
func (p *Processor) processBatch(ctx context.Context, processor modelpb.BatchProcessor, batch *modelpb.Batch) error {
return processor.ProcessBatch(ctx, batch)
}

// getStreamReader returns a streamReader that reads ND-JSON lines from r.
func (p *Processor) getStreamReader(r io.Reader) *streamReader {
return &streamReader{
8 changes: 6 additions & 2 deletions input/elasticapm/processor_test.go
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ import (

func TestHandleStreamReaderError(t *testing.T) {
readErr := errors.New("read failed")
cnt := 5
var calls int
var reader readerFunc = func(p []byte) (int, error) {
calls++
@@ -43,7 +44,7 @@ func TestHandleStreamReaderError(t *testing.T) {
}
buf := bytes.NewBuffer(nil)
buf.WriteString(validMetadata + "\n")
for i := 0; i < 5; i++ {
for i := 0; i < cnt; i++ {
buf.WriteString(validTransaction + "\n")
}
return copy(p, buf.Bytes()), nil
@@ -60,7 +61,10 @@ func TestHandleStreamReaderError(t *testing.T) {
reader, 10, nopBatchProcessor{}, &actualResult,
)
assert.ErrorIs(t, err, readErr)
assert.Equal(t, Result{Accepted: 5}, actualResult)
assert.Equal(t, Result{
Accepted: cnt,
TransactionAccepted: cnt,
}, actualResult)
}

type readerFunc func([]byte) (int, error)

0 comments on commit e857f20

Please sign in to comment.