-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SLT-141] feat(metrics): multiple exports #3099
Changes from all commits
7ba8a43
9632578
c762c4c
3c15801
829812b
e831fe9
68def3c
51284b9
6bcd3ab
38e924b
f8a2048
c163704
89e996b
780b4c5
1d82704
08fa2a4
1c63c19
d08315e
1d670a0
d6ee535
48f2afc
4db816f
3923d42
bad4071
f8e240d
a19300c
bfd1d7b
d5e69f7
ceeddbc
4c6825a
732f5fa
8a04f9e
0e3a5ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package metrics | ||
|
||
// HeadersToMap converts a string of headers to a map. | ||
func HeadersToMap(val string) map[string]string { | ||
return headersToMap(val) | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,83 @@ | ||||||||||||||||||||||||||||||||||||||||
package metrics | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||||||||||||||
"go.uber.org/multierr" | ||||||||||||||||||||||||||||||||||||||||
"sync" | ||||||||||||||||||||||||||||||||||||||||
"time" | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
"go.opentelemetry.io/otel/sdk/trace" | ||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// MultiExporter is an interface that allows exporting spans to multiple OTLP trace exporters. | ||||||||||||||||||||||||||||||||||||||||
type MultiExporter interface { | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should have a test |
||||||||||||||||||||||||||||||||||||||||
trace.SpanExporter | ||||||||||||||||||||||||||||||||||||||||
AddExporter(exporter trace.SpanExporter) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
type multiExporter struct { | ||||||||||||||||||||||||||||||||||||||||
exporters []trace.SpanExporter | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// NewMultiExporter creates a new multi exporter that forwards spans to multiple OTLP trace exporters. | ||||||||||||||||||||||||||||||||||||||||
// It takes in one or more trace.SpanExporter instances and ensures that spans are sent to all of them. | ||||||||||||||||||||||||||||||||||||||||
// This is useful when you need to send trace data to multiple backends or endpoints. | ||||||||||||||||||||||||||||||||||||||||
func NewMultiExporter(exporters ...trace.SpanExporter) MultiExporter { | ||||||||||||||||||||||||||||||||||||||||
return &multiExporter{ | ||||||||||||||||||||||||||||||||||||||||
exporters: exporters, | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
const defaultTimeout = 30 * time.Second | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// ExportSpans exports a batch of spans. | ||||||||||||||||||||||||||||||||||||||||
func (m *multiExporter) ExportSpans(parentCtx context.Context, ss []trace.ReadOnlySpan) error { | ||||||||||||||||||||||||||||||||||||||||
return m.doParallel(parentCtx, func(ctx context.Context, exporter trace.SpanExporter) error { | ||||||||||||||||||||||||||||||||||||||||
return exporter.ExportSpans(ctx, ss) | ||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+35
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider collecting errors from all exporters before returning. The current implementation returns immediately if an error occurs while exporting to one of the exporters. This means that if one exporter fails, the remaining exporters will not receive the spans. It would be better to collect errors from all exporters and return a combined error. This way, all exporters will have a chance to receive the spans, and the caller will be informed of all the errors that occurred. Apply this diff to collect errors from all exporters: func (m *multiExporter) ExportSpans(parentCtx context.Context, ss []trace.ReadOnlySpan) error {
- return m.doParallel(parentCtx, func(ctx context.Context, exporter trace.SpanExporter) error {
- return exporter.ExportSpans(ctx, ss)
- })
+ var errs []error
+ m.doParallel(parentCtx, func(ctx context.Context, exporter trace.SpanExporter) error {
+ err := exporter.ExportSpans(ctx, ss)
+ if err != nil {
+ errs = append(errs, err)
+ }
+ return nil
+ })
+ if len(errs) > 0 {
+ return fmt.Errorf("could not export spans to some exporters: %v", errs)
+ }
+ return nil
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
func (m *multiExporter) doParallel(parentCtx context.Context, fn func(context.Context, trace.SpanExporter) error) error { | ||||||||||||||||||||||||||||||||||||||||
ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) | ||||||||||||||||||||||||||||||||||||||||
defer cancel() | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
var wg sync.WaitGroup | ||||||||||||||||||||||||||||||||||||||||
var errors []error | ||||||||||||||||||||||||||||||||||||||||
var mu sync.Mutex | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
wg.Add(len(m.exporters)) | ||||||||||||||||||||||||||||||||||||||||
for _, exporter := range m.exporters { | ||||||||||||||||||||||||||||||||||||||||
go func(exporter trace.SpanExporter) { | ||||||||||||||||||||||||||||||||||||||||
defer wg.Done() | ||||||||||||||||||||||||||||||||||||||||
err := fn(ctx, exporter) | ||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||
mu.Lock() | ||||||||||||||||||||||||||||||||||||||||
errors = append(errors, fmt.Errorf("error in doMultiple: %w", err)) | ||||||||||||||||||||||||||||||||||||||||
mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
}(exporter) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
wg.Wait() | ||||||||||||||||||||||||||||||||||||||||
if len(errors) > 0 { | ||||||||||||||||||||||||||||||||||||||||
// nolint: wrapcheck | ||||||||||||||||||||||||||||||||||||||||
return multierr.Combine(errors...) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+41
to
+69
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure test coverage for the The static analysis hints indicate that some lines in the Given that Do you want me to generate the unit testing code or open a GitHub issue to track this task? ToolsGitHub Check: codecov/patch
|
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// Shutdown notifies the exporter of a pending halt to operations. | ||||||||||||||||||||||||||||||||||||||||
func (m *multiExporter) Shutdown(ctx context.Context) error { | ||||||||||||||||||||||||||||||||||||||||
return m.doParallel(ctx, func(ctx context.Context, exporter trace.SpanExporter) error { | ||||||||||||||||||||||||||||||||||||||||
return exporter.Shutdown(ctx) | ||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+72
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider collecting errors from all exporters before returning. Similar to the As suggested earlier, it would be better to collect errors from all exporters and return a combined error. This way, all exporters will be attempted to be shut down, and the caller will be informed of all the errors that occurred. Apply this diff to collect errors from all exporters: func (m *multiExporter) Shutdown(ctx context.Context) error {
- return m.doParallel(ctx, func(ctx context.Context, exporter trace.SpanExporter) error {
- return exporter.Shutdown(ctx)
- })
+ var errs []error
+ m.doParallel(ctx, func(ctx context.Context, exporter trace.SpanExporter) error {
+ err := exporter.Shutdown(ctx)
+ if err != nil {
+ errs = append(errs, err)
+ }
+ return nil
+ })
+ if len(errs) > 0 {
+ return fmt.Errorf("could not stop some exporters: %v", errs)
+ }
+ return nil
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
// AddExporter adds an exporter to the multi exporter. | ||||||||||||||||||||||||||||||||||||||||
func (m *multiExporter) AddExporter(exporter trace.SpanExporter) { | ||||||||||||||||||||||||||||||||||||||||
m.exporters = append(m.exporters, exporter) | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
var _ trace.SpanExporter = &multiExporter{} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package metrics_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"github.com/synapsecns/sanguine/core/metrics" | ||
sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
"go.opentelemetry.io/otel/sdk/trace/tracetest" | ||
) | ||
|
||
func TestMultiExporter(t *testing.T) { | ||
// Create in-memory exporters | ||
exporter1 := tracetest.NewInMemoryExporter() | ||
exporter2 := tracetest.NewInMemoryExporter() | ||
|
||
// Create multi-exporter | ||
multiExporter := metrics.NewMultiExporter(exporter1, exporter2) | ||
|
||
// Create test spans | ||
spans := []sdktrace.ReadOnlySpan{ | ||
tracetest.SpanStub{}.Snapshot(), | ||
tracetest.SpanStub{}.Snapshot(), | ||
} | ||
|
||
// Test ExportSpans | ||
err := multiExporter.ExportSpans(context.Background(), spans) | ||
require.NoError(t, err) | ||
|
||
// Verify that spans were exported to both exporters | ||
assert.Equal(t, 2, len(exporter1.GetSpans())) | ||
assert.Equal(t, 2, len(exporter2.GetSpans())) | ||
|
||
// Test Shutdown | ||
err = multiExporter.Shutdown(context.Background()) | ||
require.NoError(t, err) | ||
|
||
// Verify that both exporters were shut down | ||
// Note: InMemoryExporter doesn't have a Stopped() method, so we can't check this directly | ||
// Instead, we can try to export spans again and check for an error | ||
err = multiExporter.ExportSpans(context.Background(), spans) | ||
assert.NoError(t, err, "Expected no error after shutdown") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address the TODO comment.
Please ensure that the documentation is fully updated before merging the PR, as indicated by the TODO comment.
If you need any assistance with completing the documentation, feel free to let me know.