Skip to content

Commit

Permalink
Tie in the strategy storages
Browse files Browse the repository at this point in the history
This change adds support for the strategy stores, previously referenced as client config managers.

This implements both the local file strategy store and the remote (gRPC) store.

Fixes #6695

Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling committed Apr 5, 2022
1 parent 637d773 commit 2ad8d4a
Show file tree
Hide file tree
Showing 11 changed files with 715 additions and 145 deletions.
4 changes: 3 additions & 1 deletion extension/jaegerremotesampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package jaegerremotesampling
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -60,7 +61,8 @@ func TestLoadConfig(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}},
Source: Source{
File: "/etc/otel/sampling_strategies.json",
ReloadInterval: time.Second,
File: "/etc/otelcol/sampling_strategies.json",
},
},
ext1)
Expand Down
86 changes: 70 additions & 16 deletions extension/jaegerremotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,102 @@ package jaegerremotesampling // import "github.com/open-telemetry/opentelemetry-

import (
"context"
"fmt"

grpcStore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal"
)

var _ component.Extension = (*jrsExtension)(nil)

type jrsExtension struct {
httpServer component.Component
cfg *Config
telemetry component.TelemetrySettings

httpServer component.Component
samplingStore strategystore.StrategyStore

closers []func() error
}

func newExtension(cfg *Config, telemetry component.TelemetrySettings) (*jrsExtension, error) {
// TODO(jpkroehling): get a proper instance
cfgMgr := internal.NewClientConfigManager()
ext := &jrsExtension{}
func newExtension(cfg *Config, telemetry component.TelemetrySettings) *jrsExtension {
jrse := &jrsExtension{
cfg: cfg,
telemetry: telemetry,
}
return jrse
}

if cfg.HTTPServerSettings != nil {
httpServer, err := internal.NewHTTP(telemetry, *cfg.HTTPServerSettings, cfgMgr)
func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error {
// the config validation will take care of ensuring we have one and only one of the following about the
// source of the sampling config:
// - remote (gRPC)
// - local file
// we can then use a simplified logic here to assign the appropriate store
if jrse.cfg.Source.File != "" {
opts := static.Options{
StrategiesFile: jrse.cfg.Source.File,
ReloadInterval: jrse.cfg.Source.ReloadInterval,
}
ss, err := static.NewStrategyStore(opts, jrse.telemetry.Logger)
if err != nil {
return nil, err
return fmt.Errorf("failed to create the local file strategy store: %v", err)
}
ext.httpServer = httpServer

// there's a Close function on the concrete type, which is not visible to us...
// how can we close it then?
jrse.samplingStore = ss
}

return ext, nil
}
if jrse.cfg.Source.Remote != nil {
opts, err := jrse.cfg.Source.Remote.ToDialOptions(host, jrse.telemetry)
if err != nil {
return fmt.Errorf("error while setting up the remote sampling source: %v", err)
}
conn, err := grpc.Dial(jrse.cfg.Source.Remote.Endpoint, opts...)
if err != nil {
return fmt.Errorf("error while connecting to the remote sampling source: %v", err)
}

jrse.samplingStore = grpcStore.NewConfigManager(conn)
jrse.closers = append(jrse.closers, func() error {
return conn.Close()
})
}

if jrse.cfg.HTTPServerSettings != nil {
httpServer, err := internal.NewHTTP(jrse.telemetry, *jrse.cfg.HTTPServerSettings, jrse.samplingStore)
if err != nil {
return fmt.Errorf("error while creating the HTTP server: %v", err)
}
jrse.httpServer = httpServer
}

func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error {
// then we start our own server interfaces, starting with the HTTP one
err := jrse.httpServer.Start(ctx, host)
if err != nil {
return err
return fmt.Errorf("error while starting the HTTP server: %v", err)
}

return nil
}

func (jrse *jrsExtension) Shutdown(ctx context.Context) error {
err := jrse.httpServer.Shutdown(ctx)
if err != nil {
return err
// we probably don't want to break whenever an error occurs, we want to continue and close the other resources
if err := jrse.httpServer.Shutdown(ctx); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err))
}

for _, closer := range jrse.closers {
if err := closer(); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the sampling store", zap.Error(err))
}
}

return nil
Expand Down
96 changes: 43 additions & 53 deletions extension/jaegerremotesampling/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,89 +16,79 @@ package jaegerremotesampling

import (
"context"
"errors"
"fmt"
"net"
"path/filepath"
"testing"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"google.golang.org/grpc"
)

func TestNewExtension(t *testing.T) {
// test
e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
cfg := createDefaultConfig().(*Config)
cfg.Source.File = filepath.Join("testdata", "strategy.json")
e := newExtension(cfg, componenttest.NewNopTelemetrySettings())

// verify
assert.NotNil(t, e)
}

func TestStartAndShutdown(t *testing.T) {
func TestStartAndShutdownLocalFile(t *testing.T) {
// prepare
e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
cfg := createDefaultConfig().(*Config)
cfg.Source.File = filepath.Join("testdata", "strategy.json")

e := newExtension(cfg, componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
require.NoError(t, err)
require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))

// test and verify
assert.NoError(t, e.Shutdown(context.Background()))
}

func TestFailedToStartHTTPServer(t *testing.T) {
// prepare
errBooBoo := errors.New("the server made a boo boo")

e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
func TestStartAndShutdownRemote(t *testing.T) {
// prepare the socket the mock server will listen at
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

e.httpServer = &mockComponent{
StartFunc: func(_ context.Context, _ component.Host) error {
return errBooBoo
},
// create the mock server
server := grpc.NewServer()
go func() {
err = server.Serve(lis)
require.NoError(t, err)
}()

// register the service
api_v2.RegisterSamplingManagerServer(server, &samplingServer{})

// create the config, pointing to the mock server
cfg := createDefaultConfig().(*Config)
cfg.Source.Remote = &configgrpc.GRPCClientSettings{
Endpoint: fmt.Sprintf("localhost:%d", lis.Addr().(*net.TCPAddr).Port),
WaitForReady: true,
}

// test and verify
assert.Equal(t, errBooBoo, e.Start(context.Background(), componenttest.NewNopHost()))
}

func TestFailedToShutdownHTTPServer(t *testing.T) {
// prepare
errBooBoo := errors.New("the server made a boo boo")

e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
// create the extension
e := newExtension(cfg, componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
require.NoError(t, err)

e.httpServer = &mockComponent{
ShutdownFunc: func(_ context.Context) error {
return errBooBoo
},
}
require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))

// test and verify
assert.Equal(t, errBooBoo, e.Shutdown(context.Background()))
}

type mockComponent struct {
StartFunc func(_ context.Context, _ component.Host) error
ShutdownFunc func(_ context.Context) error
// test
assert.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, e.Shutdown(context.Background()))
}

func (s *mockComponent) Start(ctx context.Context, host component.Host) error {
if s.StartFunc == nil {
return nil
}

return s.StartFunc(ctx, host)
type samplingServer struct {
api_v2.UnimplementedSamplingManagerServer
}

func (s *mockComponent) Shutdown(ctx context.Context) error {
if s.ShutdownFunc == nil {
return nil
}

return s.ShutdownFunc(ctx)
func (s samplingServer) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
}, nil
}
3 changes: 2 additions & 1 deletion extension/jaegerremotesampling/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ func createDefaultConfig() config.Extension {
Endpoint: ":14250",
},
},
Source: Source{},
}
}

func createExtension(_ context.Context, set component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) {
return newExtension(cfg.(*Config), set.TelemetrySettings)
return newExtension(cfg.(*Config), set.TelemetrySettings), nil
}
48 changes: 44 additions & 4 deletions extension/jaegerremotesampling/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,86 @@ require (
github.com/jaegertracing/jaeger v1.32.0
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.48.0
go.uber.org/zap v1.21.0
google.golang.org/grpc v1.45.0
)

require (
cloud.google.com/go/compute v1.5.0 // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/knadh/koanf v1.4.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.1.16 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/shirou/gopsutil/v3 v3.22.2 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cobra v1.4.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.10.1 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.48.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.31.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.31.0 // indirect
go.opentelemetry.io/contrib/zpages v0.31.0 // indirect
go.opentelemetry.io/otel v1.6.1 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.28.0 // indirect
go.opentelemetry.io/otel/metric v0.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.6.1 // indirect
go.opentelemetry.io/otel/sdk/metric v0.28.0 // indirect
go.opentelemetry.io/otel/trace v1.6.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // indirect
google.golang.org/grpc v1.45.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
Loading

0 comments on commit 2ad8d4a

Please sign in to comment.