-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathotlp.go
288 lines (239 loc) · 8.46 KB
/
otlp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package metrics
import (
"context"
"crypto/tls"
"fmt"
"google.golang.org/grpc/credentials"
"strings"
"time"
"github.com/synapsecns/sanguine/core"
"github.com/synapsecns/sanguine/core/config"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
)
type otlpHandler struct {
*baseHandler
buildInfo config.BuildInfo
}
// NewOTLPMetricsHandler creates a new newrelic metrics handler.
func NewOTLPMetricsHandler(buildInfo config.BuildInfo) Handler {
return &otlpHandler{
buildInfo: buildInfo,
baseHandler: newBaseHandler(buildInfo),
}
}
func (n *otlpHandler) Start(ctx context.Context) (err error) {
var exporters []tracesdk.SpanExporter
primaryExporter, err := makeOTLPExporter(ctx, "")
if err != nil {
return fmt.Errorf("could not create default client: %w", err)
}
exporters = append(exporters, primaryExporter)
// Loop to create additional exporters
for i := 1; ; i++ {
envSuffix := fmt.Sprintf("_%d", i)
// if this is empty we can assume no config exists at all.
endpointEnv := otelEndpointEnv + envSuffix
// no more transports to add.
if !core.HasEnv(endpointEnv) {
break
}
exporter, err := makeOTLPExporter(ctx, envSuffix)
if err != nil {
return fmt.Errorf("could not create exporter %d: %w", i, err)
}
exporters = append(exporters, exporter)
}
// create the multi-exporter with all the exporters
multiExporter := NewMultiExporter(exporters...)
n.baseHandler = newBaseHandler(
n.buildInfo,
tracesdk.WithBatcher(
multiExporter,
tracesdk.WithMaxQueueSize(defaultMaxQueueSize),
tracesdk.WithMaxExportBatchSize(defaultMaxExportBatch),
),
tracesdk.WithSampler(tracesdk.AlwaysSample()),
)
// start the new parent
err = n.baseHandler.Start(ctx)
if err != nil {
return fmt.Errorf("could not start base handler: %w", err)
}
go func() {
handleShutdown(ctx, n.baseHandler.unwrappedTP)
}()
return nil
}
func (n *otlpHandler) Type() HandlerType {
return OTLP
}
// wait for the context to be canceled.
// then flush the traces and shutdown the exporter.
func handleShutdown(ctx context.Context, provider *tracesdk.TracerProvider) {
<-ctx.Done()
const spanWaitTime = time.Millisecond
const shutdownAllowance = time.Second * 10
// allow only 10 seconds for graceful shutdown.
// we use without cancel to copy the parents values while making sure our derived context is not canceled.
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), shutdownAllowance)
defer cancel()
// don't shutdown immediately, wait for a bit to allow the last spans to be sent. This is in process and should be aymptotic to instant.
time.Sleep(spanWaitTime)
err := provider.ForceFlush(shutdownCtx)
if err != nil {
logger.Warnf("could not flush traces: %v", err)
}
err = provider.Shutdown(shutdownCtx)
if err != nil {
logger.Warnf("could not shutdown traces: %v", err)
}
}
const (
otelEndpointEnv = "OTEL_EXPORTER_OTLP_ENDPOINT"
otelTransportEnv = "OTEL_EXPORTER_OTLP_TRANSPORT"
otelInsecureEvn = "OTEL_EXPORTER_OTLP_SECURE_MODE"
otelHeadersEnv = "OTEL_EXPORTER_OTLP_HEADERS"
)
//go:generate go run golang.org/x/tools/cmd/stringer -type=otlpTransportType -linecomment
type otlpTransportType uint8
const (
otlpTransportHTTP otlpTransportType = iota + 1 // http
otlpTransportGRPC // grpc
)
// getEnvSuffix returns the value of an environment variable with a suffix.
func getEnvSuffix(env, suffix, defaultRet string) string {
return core.GetEnv(makeEnv(env, suffix), defaultRet)
}
func makeEnv(env, suffix string) string {
return env + suffix
}
// makeOTLPTrace creates a new OTLP client based on the transport type and url.
func makeOTLPExporter(ctx context.Context, envSuffix string) (*otlptrace.Exporter, error) {
transport := transportFromString(getEnvSuffix(otelTransportEnv, envSuffix, otlpTransportGRPC.String()))
url := getEnvSuffix(otelEndpointEnv, envSuffix, "")
secure := core.GetEnvBool(makeEnv(otelInsecureEvn, envSuffix), false)
headers := getEnvSuffix(otelHeadersEnv, envSuffix, "")
isCorrect := envSuffix != ""
if isCorrect != secure {
return nil, fmt.Errorf("could not create exporter: secure mode is not set correctly")
}
// I spent about 2 hours trying to figure out why this was failing to no avail. I'm going to leave it as is for now.
// My best guess is the issue is around the tsl config.
// Should you attempt to fix this and fail, please increment the counter above, although I send my umost encouragement.
if secure && transport == otlpTransportHTTP {
return nil, fmt.Errorf("could not create exporter: http transport does not support secure mode")
}
oteltraceClient, err := buildClientFromTransport(
transport,
withURL(url),
// defaults to true
withSecure(secure),
withHeaders(headers),
)
if err != nil {
return nil, fmt.Errorf("could not create client from transport: %w", err)
}
exporter, err := otlptrace.New(ctx, oteltraceClient)
if err != nil {
return nil, fmt.Errorf("ocould not create client: %w", err)
}
return exporter, nil
}
// buildClientFromTransport creates a new OTLP client based on the transport type and url.
func buildClientFromTransport(transport otlpTransportType, options ...Option) (otlptrace.Client, error) {
to := transportOptions{}
for _, option := range options {
if err := option(&to); err != nil {
return nil, fmt.Errorf("could not apply option: %w", err)
}
}
// TODO: make sure url is validated
switch transport {
case otlpTransportHTTP:
return otlptracehttp.NewClient(to.httpOptions...), nil
case otlpTransportGRPC:
return otlptracegrpc.NewClient(to.grpcOptions...), nil
default:
return nil, fmt.Errorf("unknown transport type: %s", transport.String())
}
}
type transportOptions struct {
// httpOptions are the options for the http transport.
httpOptions []otlptracehttp.Option
// grpcOptions are the options for the grpc transport.
grpcOptions []otlptracegrpc.Option
}
// Option Each option appends the correspond option for both http and grpc options.
// only one will be used in creating the actual client.
type Option func(*transportOptions) error
func withURL(url string) Option {
return func(o *transportOptions) error {
o.httpOptions = append(o.httpOptions, otlptracehttp.WithEndpointURL(url))
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithEndpointURL(url))
return nil
}
}
func withSecure(secure bool) Option {
return func(o *transportOptions) error {
if secure {
tlsCreds := credentials.NewClientTLSFromCert(nil, "")
// note: you do not need to specify the tls creds for http, this happens automatically when https:// is used as the protocol scheme.
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithTLSCredentials(tlsCreds))
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
// RootCAs is nil, which means the default system root CAs are used
}
o.httpOptions = append(o.httpOptions, otlptracehttp.WithTLSClientConfig(tlsConfig))
} else {
o.httpOptions = append(o.httpOptions, otlptracehttp.WithInsecure())
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithInsecure())
}
return nil
}
}
func withHeaders(headers string) Option {
return func(o *transportOptions) error {
realHeaders := headersToMap(headers)
o.httpOptions = append(o.httpOptions, otlptracehttp.WithHeaders(realHeaders))
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithHeaders(realHeaders))
return nil
}
}
func headersToMap(input string) map[string]string {
// Initialize the map
result := make(map[string]string)
// Split the input string by comma to get key=value pairs
pairs := strings.Split(input, ",")
// Iterate over each pair
for _, pair := range pairs {
// Split each pair by '=' to get the key and value
kv := strings.SplitN(pair, "=", 2)
if len(kv) == 2 {
key := kv[0]
value := kv[1]
// Add the key and value to the map
result[key] = value
}
}
return result
}
// transportFromString converts a string to a transport type.
// Defaults to http if the string is not recognized.
func transportFromString(transport string) otlpTransportType {
switch strings.ToLower(transport) {
case otlpTransportHTTP.String():
return otlpTransportHTTP
case otlpTransportGRPC.String():
return otlpTransportGRPC
}
// will be unknown since we use iota+1
// (see uber's go stye guide for details)
return otlpTransportType(0)
}
const (
defaultMaxQueueSize = 1000000
defaultMaxExportBatch = 2000
)