Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drop failed to exporter batches and return error when forcing flush a span processor #1860

Merged
merged 6 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Make `NewSplitDriver` from `go.opentelemetry.io/otel/exporters/otlp` take variadic arguments instead of a `SplitConfig` item.
`NewSplitDriver` now automatically implements an internal `noopDriver` for `SplitConfig` fields that are not initialized. (#1798)
- BatchSpanProcessor now report export failures when calling `ForceFlush()` method. (#1860)

### Deprecated

Expand All @@ -23,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Only report errors from the `"go.opentelemetry.io/otel/sdk/resource".Environment` function when they are not `nil`. (#1850, #1851)
- The `Shutdown` method of the simple `SpanProcessor` in the `go.opentelemetry.io/otel/sdk/trace` package now honors the context deadline or cancellation. (#1616, #1856)
- BatchSpanProcessor now drops span batches that failed to be exported. (#1860)

### Security

Expand Down
23 changes: 14 additions & 9 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,14 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
wait := make(chan struct{})
wait := make(chan error)
go func() {
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
wait <- bsp.exportSpans(ctx)
close(wait)
}()
// Wait until the export is finished or the context is cancelled/timed out
select {
case <-wait:
case err = <-wait:
case <-ctx.Done():
err = ctx.Err()
}
Expand Down Expand Up @@ -216,11 +214,18 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
defer cancel()
}

if len(bsp.batch) > 0 {
if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil {
if l := len(bsp.batch); l > 0 {
err := bsp.e.ExportSpans(ctx, bsp.batch)

// A new batch is always created after exporting, even if the batch failed to be exported.
//
// It is up to the exporter to implement any type of retry logic if a batch is failing
// to be exported, since it is specific to the protocol and backend being sent to.
bsp.batch = bsp.batch[:0]

if err != nil {
return err
}
bsp.batch = bsp.batch[:0]
}
return nil
}
Expand All @@ -244,7 +249,7 @@ func (bsp *batchSpanProcessor) processQueue() {
case sd := <-bsp.queue:
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
if shouldExport {
if !bsp.timer.Stop() {
Expand Down
77 changes: 71 additions & 6 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,23 @@ type testBatchExporter struct {
batchCount int
shutdownCount int
delay time.Duration
errors []error
droppedCount int
idx int
err error
}

func (t *testBatchExporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot) error {
t.mu.Lock()
defer t.mu.Unlock()

if t.idx < len(t.errors) {
t.droppedCount += len(ss)
err := t.errors[t.idx]
t.idx++
return err
}

time.Sleep(t.delay)

select {
Expand Down Expand Up @@ -338,12 +348,8 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
// Force flush any held span batches
err := ssp.ForceFlush(context.Background())

gotNumOfSpans := te.len()
spanDifference := option.wantNumSpans - gotNumOfSpans
if spanDifference > 10 || spanDifference < 0 {
t.Errorf("number of exported span not equal to or within 10 less than: got %+v, want %+v\n",
gotNumOfSpans, option.wantNumSpans)
}
assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10)

gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
Expand All @@ -353,6 +359,65 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
assert.NoError(t, err)
}

func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) {
te := testBatchExporter{
errors: []error{errors.New("fail to export")},
}
tp := basicTracerProvider(t)
option := testOption{
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(2000),
},
wantNumSpans: 1000,
wantBatchCount: 1,
genNumSpans: 1000,
}
ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOption")
generateSpan(t, option.parallel, tr, option)

// Force flush any held span batches
err := ssp.ForceFlush(context.Background())
assert.Error(t, err)
assert.EqualError(t, err, "fail to export")

// First flush will fail, nothing should be exported.
assertMaxSpanDiff(t, te.droppedCount, option.wantNumSpans, 10)
assert.Equal(t, 0, te.len())
assert.Equal(t, 0, te.getBatchCount())

// Generate a new batch, this will succeed
generateSpan(t, option.parallel, tr, option)

// Force flush any held span batches
err = ssp.ForceFlush(context.Background())
assert.NoError(t, err)

assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10)
gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
}

func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) {
spanDifference := want - got
if spanDifference < 0 {
spanDifference = spanDifference * -1
}
if spanDifference > maxDif {
t.Errorf("number of exported span not equal to or within %d less than: got %+v, want %+v\n",
maxDif, got, want)
}
}

func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
Expand Down