Skip to content

Commit

Permalink
test: assert that result details arr properly populated
Browse files Browse the repository at this point in the history
  • Loading branch information
rubvs committed Dec 10, 2024
1 parent e857f20 commit e82bbba
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 46 deletions.
10 changes: 6 additions & 4 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,16 @@ func (p *Processor) handleStream(
}
for _, v := range *batch {
switch v.Type() {
case modelpb.ErrorEventType:
result.AcceptedDetails.Error++
case modelpb.SpanEventType:
result.SpanAccepted++
result.AcceptedDetails.Span++
case modelpb.TransactionEventType:
result.TransactionAccepted++
result.AcceptedDetails.Transaction++
case modelpb.MetricEventType:
result.MetricAccepted++
result.AcceptedDetails.Metric++
case modelpb.LogEventType:
result.LogAccepted++
result.AcceptedDetails.Log++
}
}
result.Accepted += n
Expand Down
102 changes: 75 additions & 27 deletions input/elasticapm/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

const batchSize = 10

type readerFunc func([]byte) (int, error)

func (f readerFunc) Read(p []byte) (int, error) {
return f(p)
}

func TestHandleStreamReaderError(t *testing.T) {
readErr := errors.New("read failed")
cnt := 5
Expand All @@ -57,22 +65,22 @@ func TestHandleStreamReaderError(t *testing.T) {

var actualResult Result
err := sp.HandleStream(
context.Background(), &modelpb.APMEvent{},
reader, 10, nopBatchProcessor{}, &actualResult,
context.Background(),
&modelpb.APMEvent{},
reader,
batchSize,
nopBatchProcessor{},
&actualResult,
)
assert.ErrorIs(t, err, readErr)
assert.Equal(t, Result{
Accepted: cnt,
TransactionAccepted: cnt,
Accepted: cnt,
AcceptedDetails: ProcessedDetail{
Transaction: cnt,
},
}, actualResult)
}

type readerFunc func([]byte) (int, error)

func (f readerFunc) Read(p []byte) (int, error) {
return f(p)
}

func TestHandleStreamBatchProcessorError(t *testing.T) {
payload := validMetadata + "\n" + validTransaction + "\n"
for _, test := range []struct {
Expand All @@ -89,11 +97,14 @@ func TestHandleStreamBatchProcessorError(t *testing.T) {
processor := modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error {
return test.err
})

var actualResult Result
err := sp.HandleStream(
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, processor, &actualResult,
context.Background(),
&modelpb.APMEvent{},
strings.NewReader(payload),
batchSize,
processor,
&actualResult,
)
assert.ErrorIs(t, err, test.err)
assert.Zero(t, actualResult)
Expand Down Expand Up @@ -190,9 +201,12 @@ func TestHandleStreamErrors(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})
err := p.HandleStream(
context.Background(), &modelpb.APMEvent{},
strings.NewReader(test.payload), 10,
nopBatchProcessor{}, &actualResult,
context.Background(),
&modelpb.APMEvent{},
strings.NewReader(test.payload),
batchSize,
nopBatchProcessor{},
&actualResult,
)
assert.Equal(t, test.err, err)
assert.Zero(t, actualResult.Accepted)
Expand Down Expand Up @@ -224,13 +238,31 @@ func TestHandleStream(t *testing.T) {
MaxEventSize: 100 * 1024,
Semaphore: semaphore.NewWeighted(1),
})

var actualResult Result
err := p.HandleStream(
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, batchProcessor,
&Result{},
context.Background(),
&modelpb.APMEvent{},
strings.NewReader(payload),
batchSize,
batchProcessor,
&actualResult,
)
require.NoError(t, err)

// Assert that batch result is properly populated.
assert.Equal(t, Result{
Accepted: 5,
AcceptedDetails: ProcessedDetail{
Transaction: 1,
Span: 1,
Metric: 1,
Log: 1,
Error: 1,
},
}, actualResult)

// Assert that processor is properly executed.
processors := make([]modelpb.APMEventType, len(events))
for i, event := range events {
processors[i] = event.Type()
Expand Down Expand Up @@ -264,8 +296,11 @@ func TestHandleStreamRUMv3(t *testing.T) {
})
var result Result
err := p.HandleStream(
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, batchProcessor,
context.Background(),
&modelpb.APMEvent{},
strings.NewReader(payload),
batchSize,
batchProcessor,
&result,
)
require.NoError(t, err)
Expand Down Expand Up @@ -315,8 +350,11 @@ func TestHandleStreamBaseEvent(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})
err := p.HandleStream(
context.Background(), &baseEvent,
strings.NewReader(payload), 10, batchProcessor,
context.Background(),
&baseEvent,
strings.NewReader(payload),
batchSize,
batchProcessor,
&Result{},
)
require.NoError(t, err)
Expand Down Expand Up @@ -351,12 +389,18 @@ func TestLabelLeak(t *testing.T) {
MaxEventSize: 100 * 1024,
Semaphore: semaphore.NewWeighted(1),
})
var actualResult Result
err := p.HandleStream(context.Background(), baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult)
err := p.HandleStream(
context.Background(),
baseEvent,
strings.NewReader(payload),
batchSize,
batchProcessor,
&Result{})
require.NoError(t, err)

txs := processed
assert.Len(t, txs, 2)

// Assert first tx
assert.Equal(t, modelpb.NumericLabels{
"time_set": {Value: 1652185276},
Expand All @@ -369,8 +413,12 @@ func TestLabelLeak(t *testing.T) {
}, modelpb.Labels(txs[0].Labels))

// Assert second tx
assert.Equal(t, modelpb.NumericLabels{"numeric": {Global: true, Value: 1}}, modelpb.NumericLabels(txs[1].NumericLabels))
assert.Equal(t, modelpb.Labels{"ci_commit": {Global: true, Value: "unknown"}}, modelpb.Labels(txs[1].Labels))
assert.Equal(t, modelpb.NumericLabels{
"numeric": {Global: true, Value: 1},
}, modelpb.NumericLabels(txs[1].NumericLabels))
assert.Equal(t, modelpb.Labels{
"ci_commit": {Global: true, Value: "unknown"},
}, modelpb.Labels(txs[1].Labels))
}

type nopBatchProcessor struct{}
Expand Down
28 changes: 13 additions & 15 deletions input/elasticapm/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,25 @@ type Result struct {
Errors []error

// Accepted holds the number of valid events accepted.
Accepted int
SpanAccepted int
TransactionAccepted int
MetricAccepted int
LogAccepted int
Accepted int
AcceptedDetails ProcessedDetail

// TooLarge holds the number of events that were rejected due
// to exceeding the event size limit.
TooLarge int
SpanTooLarge int
TransactionTooLarge int
MetricTooLarge int
LogTooLarge int
TooLarge int

// Invalid holds the number of events that were rejected due
// to being invalid, excluding those that are counted by TooLarge.
Invalid int
SpanInvalid int
TransactionInvalid int
MetricInvalid int
LogInvalid int
Invalid int
}

// ProcessedDetail holds the number of events processed for each type.
type ProcessedDetail struct {
Transaction int
Span int
Metric int
Log int
Error int
}

func (r *Result) addError(err error) {
Expand Down

0 comments on commit e82bbba

Please sign in to comment.