-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SLT-141] feat(metrics): multiple exports #3099
Changes from 6 commits
7ba8a43
9632578
c762c4c
3c15801
829812b
e831fe9
68def3c
51284b9
6bcd3ab
38e924b
f8a2048
c163704
89e996b
780b4c5
1d82704
08fa2a4
1c63c19
d08315e
1d670a0
d6ee535
48f2afc
4db816f
3923d42
bad4071
f8e240d
a19300c
bfd1d7b
d5e69f7
ceeddbc
4c6825a
732f5fa
8a04f9e
0e3a5ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,46 @@ | ||||||||||||||||||||||||||||||||||||||||||||||
package metrics | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace" | ||||||||||||||||||||||||||||||||||||||||||||||
tracesdk "go.opentelemetry.io/otel/sdk/trace" | ||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
type multiExporter struct { | ||||||||||||||||||||||||||||||||||||||||||||||
exporters []*otlptrace.Exporter | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// NewMultiExporter creates a new multi exporter that forwards spans to multiple OTLP trace exporters. | ||||||||||||||||||||||||||||||||||||||||||||||
// It takes in one or more otlptrace.Exporter instances and ensures that spans are sent to all of them. | ||||||||||||||||||||||||||||||||||||||||||||||
// This is useful when you need to send trace data to multiple backends or endpoints. | ||||||||||||||||||||||||||||||||||||||||||||||
func NewMultiExporter(exporters ...*otlptrace.Exporter) tracesdk.SpanExporter { | ||||||||||||||||||||||||||||||||||||||||||||||
return &multiExporter{ | ||||||||||||||||||||||||||||||||||||||||||||||
exporters: exporters, | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// ExportSpans exports a batch of spans. | ||||||||||||||||||||||||||||||||||||||||||||||
func (m *multiExporter) ExportSpans(ctx context.Context, ss []tracesdk.ReadOnlySpan) error { | ||||||||||||||||||||||||||||||||||||||||||||||
for _, exporter := range m.exporters { | ||||||||||||||||||||||||||||||||||||||||||||||
err := exporter.ExportSpans(ctx, ss) | ||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
return fmt.Errorf("could not export spans: %w", err) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Method Implementation Review: ExportSpans The Consider modifying the error handling to allow all exporters a chance to receive spans before returning an error. This could be achieved by collecting all errors and returning a combined error if necessary. - if err != nil {
- return fmt.Errorf("could not export spans: %w", err)
- }
+ if err != nil {
+ // Collect errors from all exporters
+ allErrors = append(allErrors, err)
+ }
+ }
+ if len(allErrors) > 0 {
+ return fmt.Errorf("could not export spans to some exporters: %v", allErrors)
+ }
ToolsGitHub Check: codecov/patch
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @golangisfun123 thoughts in desirable behavior? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider collecting errors from all exporters before returning. The current implementation of As suggested in the existing review comments, it would be better to collect errors from all exporters and return a combined error. This way, all exporters will have a chance to receive the spans, and the caller will be informed of all the errors that occurred. Apply this diff to collect errors from all exporters: func (m *multiExporter) ExportSpans(ctx context.Context, ss []tracesdk.ReadOnlySpan) error {
+ var errs []error
for _, exporter := range m.exporters {
err := exporter.ExportSpans(ctx, ss)
if err != nil {
- return fmt.Errorf("could not export spans: %w", err)
+ errs = append(errs, err)
}
}
+ if len(errs) > 0 {
+ return fmt.Errorf("could not export spans to some exporters: %v", errs)
+ }
return nil
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Shutdown notifies the exporter of a pending halt to operations. | ||||||||||||||||||||||||||||||||||||||||||||||
func (m *multiExporter) Shutdown(ctx context.Context) error { | ||||||||||||||||||||||||||||||||||||||||||||||
for _, exporter := range m.exporters { | ||||||||||||||||||||||||||||||||||||||||||||||
err := exporter.Shutdown(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
return fmt.Errorf("could not stop exporter: %w", err) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Method Implementation Review: Shutdown Similar to Consider implementing a similar error aggregation strategy as suggested for - if err != nil {
- return fmt.Errorf("could not stop exporter: %w", err)
- }
+ if err != nil {
+ // Collect errors from all exporters
+ allErrors = append(allErrors, err)
+ }
+ }
+ if len(allErrors) > 0 {
+ return fmt.Errorf("could not stop some exporters: %v", allErrors)
+ }
ToolsGitHub Check: codecov/patch
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider collecting errors from all exporters before returning. Similar to the As suggested in the existing review comments, it would be better to collect errors from all exporters and return a combined error. This way, all exporters will be attempted to be shut down, and the caller will be informed of all the errors that occurred. Apply this diff to collect errors from all exporters: func (m *multiExporter) Shutdown(ctx context.Context) error {
+ var errs []error
for _, exporter := range m.exporters {
err := exporter.Shutdown(ctx)
if err != nil {
- return fmt.Errorf("could not stop exporter: %w", err)
+ errs = append(errs, err)
}
}
+ if len(errs) > 0 {
+ return fmt.Errorf("could not stop some exporters: %v", errs)
+ }
return nil
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
var _ tracesdk.SpanExporter = &multiExporter{} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,15 +3,15 @@ | |
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
"github.com/synapsecns/sanguine/core" | ||
"github.com/synapsecns/sanguine/core/config" | ||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace" | ||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" | ||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" | ||
tracesdk "go.opentelemetry.io/otel/sdk/trace" | ||
"os" | ||
"strings" | ||
"time" | ||
) | ||
|
||
type otlpHandler struct { | ||
|
@@ -28,23 +28,48 @@ | |
} | ||
|
||
func (n *otlpHandler) Start(ctx context.Context) (err error) { | ||
var client otlptrace.Client | ||
transport := transportFromString(core.GetEnv(otlpTransportEnv, otlpTransportGRPC.String())) | ||
switch transport { | ||
case otlpTransportHTTP: | ||
client = otlptracehttp.NewClient() | ||
case otlpTransportGRPC: | ||
client = otlptracegrpc.NewClient() | ||
default: | ||
return fmt.Errorf("unknown transport type: %s", os.Getenv(otlpTransportEnv)) | ||
// TODO: generalize this to allow for more than two exporters. | ||
client, err := buildClientFromTransport( | ||
transportFromString( | ||
core.GetEnv(otlpTransportEnv, otlpTransportGRPC.String()), | ||
), | ||
core.GetEnv(otlpTransportEnv, "localhost:4317"), | ||
) | ||
if err != nil { | ||
return fmt.Errorf("could not create client: %w", err) | ||
} | ||
|
||
secondaryClient, err := buildClientFromTransport( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should be able to tolerate cases where user doesn't define a secondary transport |
||
transportFromString( | ||
core.GetEnv(otlpTransportEnvSecondary, otlpTransportGRPC.String()), | ||
), | ||
core.GetEnv(otlpTransportEnvSecondary, "localhost:4317"), | ||
) | ||
if err != nil { | ||
return fmt.Errorf("could not create secondary client: %w", err) | ||
} | ||
|
||
exporter, err := otlptrace.New(ctx, client) | ||
if err != nil { | ||
return fmt.Errorf("failed to create otlp exporter: %w", err) | ||
} | ||
|
||
n.baseHandler = newBaseHandler(n.buildInfo, tracesdk.WithBatcher(exporter, tracesdk.WithMaxQueueSize(1000000), tracesdk.WithMaxExportBatchSize(2000)), tracesdk.WithSampler(tracesdk.AlwaysSample())) | ||
secondaryExporter, err := otlptrace.New(ctx, secondaryClient) | ||
if err != nil { | ||
return fmt.Errorf("failed to create secondary otlp exporter: %w", err) | ||
} | ||
|
||
multiExporter := NewMultiExporter(exporter, secondaryExporter) | ||
|
||
n.baseHandler = newBaseHandler( | ||
n.buildInfo, | ||
tracesdk.WithBatcher( | ||
multiExporter, | ||
tracesdk.WithMaxQueueSize(defaultMaxQueueSize), | ||
tracesdk.WithMaxExportBatchSize(defaultMaxExportBatch), | ||
), | ||
tracesdk.WithSampler(tracesdk.AlwaysSample()), | ||
) | ||
|
||
// start the new parent | ||
err = n.baseHandler.Start(ctx) | ||
|
@@ -90,7 +115,8 @@ | |
} | ||
|
||
const ( | ||
otlpTransportEnv = "OTEL_EXPORTER_OTLP_TRANSPORT" | ||
otlpTransportEnv = "OTEL_EXPORTER_OTLP_TRANSPORT_PRIMARY" | ||
otlpTransportEnvSecondary = "OTEL_EXPORTER_OTLP_TRANSPORT_SECONDARY" | ||
) | ||
|
||
//go:generate go run golang.org/x/tools/cmd/stringer -type=otlpTransportType -linecomment | ||
|
@@ -101,6 +127,17 @@ | |
otlpTransportGRPC // grpc | ||
) | ||
|
||
func buildClientFromTransport(transport otlpTransportType, url string) (otlptrace.Client, error) { | ||
switch transport { | ||
case otlpTransportHTTP: | ||
return otlptracehttp.NewClient(otlptracehttp.WithEndpointURL(url)), nil | ||
case otlpTransportGRPC: | ||
return otlptracegrpc.NewClient(otlptracegrpc.WithEndpointURL(url)), nil | ||
default: | ||
return nil, fmt.Errorf("unknown transport type: %s", transport.String()) | ||
Comment on lines
+194
to
+195
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification Add unit tests for error handling in The error handling for unknown transport types in the
Analysis chainImprove error handling in the Static analysis tools have reported that the error handling for unknown transport types (lines 198-199) is not covered by tests. It's important to add unit tests to cover this scenario and ensure the robustness of the function. Consider adding a test case that provides an invalid transport type and verifies that an error is returned. Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Verify test coverage for the error handling of unknown transport types.
# Test: Search for tests covering the error handling of unknown transport types. Expect: At least one test.
rg --type go $'func Test.*UnknownTransportType'
Length of output: 48 Script: #!/bin/bash
# Description: Search for test cases involving the buildClientFromTransport function to check error handling coverage.
# Find the definition of the buildClientFromTransport function to understand its location and context.
ast-grep --lang go --pattern 'func buildClientFromTransport($_) $_ { $$$ }'
# Search for test functions that call buildClientFromTransport.
rg --type go 'buildClientFromTransport'
Length of output: 428 Script: #!/bin/bash
# Description: Search for Go test files that might indirectly test the buildClientFromTransport function.
# List all Go test files in the repository.
fd --type f --extension go --exec basename {} \; | grep '_test.go'
# Search for any references to the buildClientFromTransport function in these test files.
rg --type go 'buildClientFromTransport' --glob '*_test.go'
Length of output: 5242 ToolsGitHub Check: codecov/patch
|
||
} | ||
} | ||
trajan0x marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// transportFromString converts a string to a transport type. | ||
// Defaults to http if the string is not recognized. | ||
func transportFromString(transport string) otlpTransportType { | ||
|
@@ -114,3 +151,8 @@ | |
// (see uber's go stye guide for details) | ||
return otlpTransportType(0) | ||
} | ||
|
||
const ( | ||
defaultMaxQueueSize = 1000000 | ||
defaultMaxExportBatch = 2000 | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test Coverage Issue
The static analysis tool has flagged several lines in the
ExportSpans
andShutdown
methods as not covered by tests. This lack of coverage could lead to undetected issues in production.It is crucial to add unit tests covering these methods to ensure their functionality under various scenarios, especially error handling.
Would you like assistance in writing these tests or perhaps a suggestion on how to structure them?
Also applies to: 38-38, 46-47
Tools
GitHub Check: codecov/patch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved w/ https://github.com/synapsecns/sanguine/pull/3099/files#diff-c442c650f2d04184c303955213570f9cab4ebd616939cb9633b9034ae8f25ed6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@trajan0x, I've reviewed the changes, and the test coverage issue has been resolved. The
ExportSpans
andShutdown
methods are now covered in theTestMultiExporter
function, ensuring that their functionality is tested. Great work on addressing this!Analysis chain
Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 411
Script:
Length of output: 162
Script:
Length of output: 92
Script:
Length of output: 1604