Skip to content

Commit

Permalink
fix: crash with resources loaded twice (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored May 26, 2023
1 parent cebf126 commit 520f570
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 95 deletions.
2 changes: 2 additions & 0 deletions components/ledger/libs/logging/logrus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logging

import (
"context"
"flag"
"io"
"os"
"testing"
Expand Down Expand Up @@ -63,6 +64,7 @@ func NewLogrus(logger *logrus.Logger) *logrusLogger {
func Testing() *logrusLogger {
logger := logrus.New()
logger.SetOutput(io.Discard)
flag.Parse()
if testing.Verbose() {
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.DebugLevel)
Expand Down
35 changes: 33 additions & 2 deletions components/ledger/libs/otlp/cli.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package otlp

import (
"fmt"
"strings"
"sync"

flag "github.com/spf13/pflag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/fx"
)

var (
once sync.Once
onceInitOTLPFlags sync.Once
onceLoadResources sync.Once
)

const (
Expand All @@ -16,8 +22,33 @@ const (
)

func InitOTLPFlags(flags *flag.FlagSet) {
once.Do(func() {
onceInitOTLPFlags.Do(func() {
flags.String(OtelServiceName, "", "OpenTelemetry service name")
flags.StringSlice(OtelResourceAttributes, []string{}, "Additional OTLP resource attributes")
})
}

func LoadResource(serviceName string, resourceAttributes []string) fx.Option {
options := make([]fx.Option, 0)
onceLoadResources.Do(func() {
options = append(options,
fx.Provide(func() (*resource.Resource, error) {
defaultResource := resource.Default()
attributes := make([]attribute.KeyValue, 0)
if serviceName != "" {
attributes = append(attributes, attribute.String("service.name", serviceName))
}
for _, ra := range resourceAttributes {
parts := strings.SplitN(ra, "=", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("malformed otlp attribute: %s", ra)
}
attributes = append(attributes, attribute.String(parts[0], parts[1]))
}
return resource.Merge(defaultResource, resource.NewSchemaless(attributes...))
}),
)
})

return fx.Options(options...)
}
24 changes: 1 addition & 23 deletions components/ledger/libs/otlp/otlpmetrics/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@ package otlpmetrics

import (
"context"
"fmt"
"strings"
"time"

"github.com/formancehq/stack/libs/go-libs/otlp"
"go.opentelemetry.io/contrib/instrumentation/host"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -61,28 +56,11 @@ func ProvideRuntimeMetricsOption(v any, annotations ...fx.Annotation) fx.Option

}

func loadResource(cfg ModuleConfig) (*resource.Resource, error) {
defaultResource := resource.Default()
attributes := make([]attribute.KeyValue, 0)
if cfg.ServiceName != "" {
attributes = append(attributes, semconv.ServiceNameKey.String(cfg.ServiceName))
attributes = append(attributes, semconv.ServiceVersionKey.String(cfg.ServiceVersion))
}
for _, ra := range cfg.ResourceAttributes {
parts := strings.SplitN(ra, "=", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("malformed otlp attribute: %s", ra)
}
attributes = append(attributes, attribute.String(parts[0], parts[1]))
}
return resource.Merge(defaultResource, resource.NewSchemaless(attributes...))
}

func MetricsModule(cfg ModuleConfig) fx.Option {
options := make([]fx.Option, 0)
options = append(options,
fx.Supply(cfg),
fx.Provide(loadResource),
otlp.LoadResource(cfg.ServiceName, cfg.ResourceAttributes),
fx.Decorate(fx.Annotate(func(mp *sdkmetric.MeterProvider) metric.MeterProvider { return mp }, fx.As(new(metric.MeterProvider)))),
fx.Provide(fx.Annotate(func(options ...sdkmetric.Option) *sdkmetric.MeterProvider {
return sdkmetric.NewMeterProvider(options...)
Expand Down
22 changes: 1 addition & 21 deletions components/ledger/libs/otlp/otlptraces/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ package otlptraces

import (
"context"
"fmt"
"strings"

"github.com/formancehq/stack/libs/go-libs/otlp"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
Expand Down Expand Up @@ -53,27 +49,11 @@ func ProvideTracerProviderOption(v any, annotations ...fx.Annotation) fx.Option
return fx.Provide(fx.Annotate(v, annotations...))
}

func loadResource(cfg ModuleConfig) (*resource.Resource, error) {
defaultResource := resource.Default()
attributes := make([]attribute.KeyValue, 0)
if cfg.ServiceName != "" {
attributes = append(attributes, attribute.String("service.name", cfg.ServiceName))
}
for _, ra := range cfg.ResourceAttributes {
parts := strings.SplitN(ra, "=", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("malformed otlp attribute: %s", ra)
}
attributes = append(attributes, attribute.String(parts[0], parts[1]))
}
return resource.Merge(defaultResource, resource.NewSchemaless(attributes...))
}

func TracesModule(cfg ModuleConfig) fx.Option {
options := make([]fx.Option, 0)
options = append(options,
fx.Supply(cfg),
fx.Provide(loadResource),
otlp.LoadResource(cfg.ServiceName, cfg.ResourceAttributes),
fx.Provide(func(tp *tracesdk.TracerProvider) trace.TracerProvider { return tp }),
fx.Provide(fx.Annotate(func(options ...tracesdk.TracerProviderOption) *tracesdk.TracerProvider {
return tracesdk.NewTracerProvider(options...)
Expand Down
5 changes: 2 additions & 3 deletions components/stargate/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ func NewRootCommand() *cobra.Command {
}

root.PersistentFlags().Bool(service.DebugFlag, false, "Debug mode")

otlptraces.InitOTLPTracesFlags(root.Flags())
otlpmetrics.InitOTLPMetricsFlags(root.Flags())
otlptraces.InitOTLPTracesFlags(root.PersistentFlags())
otlpmetrics.InitOTLPMetricsFlags(root.PersistentFlags())

if err := viper.BindPFlags(root.PersistentFlags()); err != nil {
panic(err)
Expand Down
35 changes: 33 additions & 2 deletions libs/go-libs/otlp/cli.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package otlp

import (
"fmt"
"strings"
"sync"

flag "github.com/spf13/pflag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/fx"
)

var (
once sync.Once
onceInitOTLPFlags sync.Once
onceLoadResources sync.Once
)

const (
Expand All @@ -16,8 +22,33 @@ const (
)

func InitOTLPFlags(flags *flag.FlagSet) {
once.Do(func() {
onceInitOTLPFlags.Do(func() {
flags.String(OtelServiceName, "", "OpenTelemetry service name")
flags.StringSlice(OtelResourceAttributes, []string{}, "Additional OTLP resource attributes")
})
}

func LoadResource(serviceName string, resourceAttributes []string) fx.Option {
options := make([]fx.Option, 0)
onceLoadResources.Do(func() {
options = append(options,
fx.Provide(func() (*resource.Resource, error) {
defaultResource := resource.Default()
attributes := make([]attribute.KeyValue, 0)
if serviceName != "" {
attributes = append(attributes, attribute.String("service.name", serviceName))
}
for _, ra := range resourceAttributes {
parts := strings.SplitN(ra, "=", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("malformed otlp attribute: %s", ra)
}
attributes = append(attributes, attribute.String(parts[0], parts[1]))
}
return resource.Merge(defaultResource, resource.NewSchemaless(attributes...))
}),
)
})

return fx.Options(options...)
}
24 changes: 1 addition & 23 deletions libs/go-libs/otlp/otlpmetrics/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@ package otlpmetrics

import (
"context"
"fmt"
"strings"
"time"

"github.com/formancehq/stack/libs/go-libs/otlp"
"go.opentelemetry.io/contrib/instrumentation/host"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -61,28 +56,11 @@ func ProvideRuntimeMetricsOption(v any, annotations ...fx.Annotation) fx.Option

}

func loadResource(cfg ModuleConfig) (*resource.Resource, error) {
defaultResource := resource.Default()
attributes := make([]attribute.KeyValue, 0)
if cfg.ServiceName != "" {
attributes = append(attributes, semconv.ServiceNameKey.String(cfg.ServiceName))
attributes = append(attributes, semconv.ServiceVersionKey.String(cfg.ServiceVersion))
}
for _, ra := range cfg.ResourceAttributes {
parts := strings.SplitN(ra, "=", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("malformed otlp attribute: %s", ra)
}
attributes = append(attributes, attribute.String(parts[0], parts[1]))
}
return resource.Merge(defaultResource, resource.NewSchemaless(attributes...))
}

func MetricsModule(cfg ModuleConfig) fx.Option {
options := make([]fx.Option, 0)
options = append(options,
fx.Supply(cfg),
fx.Provide(loadResource),
otlp.LoadResource(cfg.ServiceName, cfg.ResourceAttributes),
fx.Decorate(fx.Annotate(func(mp *sdkmetric.MeterProvider) metric.MeterProvider { return mp }, fx.As(new(metric.MeterProvider)))),
fx.Provide(fx.Annotate(func(options ...sdkmetric.Option) *sdkmetric.MeterProvider {
return sdkmetric.NewMeterProvider(options...)
Expand Down
22 changes: 1 addition & 21 deletions libs/go-libs/otlp/otlptraces/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ package otlptraces

import (
"context"
"fmt"
"strings"

"github.com/formancehq/stack/libs/go-libs/otlp"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
Expand Down Expand Up @@ -53,27 +49,11 @@ func ProvideTracerProviderOption(v any, annotations ...fx.Annotation) fx.Option
return fx.Provide(fx.Annotate(v, annotations...))
}

func loadResource(cfg ModuleConfig) (*resource.Resource, error) {
defaultResource := resource.Default()
attributes := make([]attribute.KeyValue, 0)
if cfg.ServiceName != "" {
attributes = append(attributes, attribute.String("service.name", cfg.ServiceName))
}
for _, ra := range cfg.ResourceAttributes {
parts := strings.SplitN(ra, "=", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("malformed otlp attribute: %s", ra)
}
attributes = append(attributes, attribute.String(parts[0], parts[1]))
}
return resource.Merge(defaultResource, resource.NewSchemaless(attributes...))
}

func TracesModule(cfg ModuleConfig) fx.Option {
options := make([]fx.Option, 0)
options = append(options,
fx.Supply(cfg),
fx.Provide(loadResource),
otlp.LoadResource(cfg.ServiceName, cfg.ResourceAttributes),
fx.Provide(func(tp *tracesdk.TracerProvider) trace.TracerProvider { return tp }),
fx.Provide(fx.Annotate(func(options ...tracesdk.TracerProviderOption) *tracesdk.TracerProvider {
return tracesdk.NewTracerProvider(options...)
Expand Down

0 comments on commit 520f570

Please sign in to comment.