-
Notifications
You must be signed in to change notification settings - Fork 32
/
multiexporter.go
83 lines (68 loc) · 2.21 KB
/
multiexporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package metrics
import (
"context"
"fmt"
"go.uber.org/multierr"
"sync"
"time"
"go.opentelemetry.io/otel/sdk/trace"
)
// MultiExporter is an interface that allows exporting spans to multiple OTLP trace exporters.
type MultiExporter interface {
trace.SpanExporter
AddExporter(exporter trace.SpanExporter)
}
type multiExporter struct {
exporters []trace.SpanExporter
}
// NewMultiExporter creates a new multi exporter that forwards spans to multiple OTLP trace exporters.
// It takes in one or more trace.SpanExporter instances and ensures that spans are sent to all of them.
// This is useful when you need to send trace data to multiple backends or endpoints.
func NewMultiExporter(exporters ...trace.SpanExporter) MultiExporter {
return &multiExporter{
exporters: exporters,
}
}
const defaultTimeout = 30 * time.Second
// ExportSpans exports a batch of spans.
func (m *multiExporter) ExportSpans(parentCtx context.Context, ss []trace.ReadOnlySpan) error {
return m.doParallel(parentCtx, func(ctx context.Context, exporter trace.SpanExporter) error {
return exporter.ExportSpans(ctx, ss)
})
}
func (m *multiExporter) doParallel(parentCtx context.Context, fn func(context.Context, trace.SpanExporter) error) error {
ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
defer cancel()
var wg sync.WaitGroup
var errors []error
var mu sync.Mutex
wg.Add(len(m.exporters))
for _, exporter := range m.exporters {
go func(exporter trace.SpanExporter) {
defer wg.Done()
err := fn(ctx, exporter)
if err != nil {
mu.Lock()
errors = append(errors, fmt.Errorf("error in doMultiple: %w", err))
mu.Unlock()
}
}(exporter)
}
wg.Wait()
if len(errors) > 0 {
// nolint: wrapcheck
return multierr.Combine(errors...)
}
return nil
}
// Shutdown notifies the exporter of a pending halt to operations.
func (m *multiExporter) Shutdown(ctx context.Context) error {
return m.doParallel(ctx, func(ctx context.Context, exporter trace.SpanExporter) error {
return exporter.Shutdown(ctx)
})
}
// AddExporter adds an exporter to the multi exporter.
func (m *multiExporter) AddExporter(exporter trace.SpanExporter) {
m.exporters = append(m.exporters, exporter)
}
var _ trace.SpanExporter = &multiExporter{}