Skip to content

Commit

Permalink
Introduce OTLP receiver configuration flags (jaegertracing#3710)
Browse files Browse the repository at this point in the history
Signed-off-by: Albert Teoh <[email protected]>
  • Loading branch information
yurishkuro authored and albertteoh committed Jul 13, 2022
1 parent 3625ea7 commit eea3bff
Show file tree
Hide file tree
Showing 13 changed files with 531 additions and 279 deletions.
5 changes: 3 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
agentGrpcRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
"github.com/jaegertracing/jaeger/cmd/all-in-one/setupcontext"
collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app"
collectorFlags "github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/docs"
"github.com/jaegertracing/jaeger/cmd/env"
"github.com/jaegertracing/jaeger/cmd/flags"
Expand Down Expand Up @@ -146,7 +147,7 @@ by default uses only in-memory database.`,
if err != nil {
logger.Fatal("Failed to configure connection for grpc", zap.Error(err))
}
cOpts, err := new(collectorApp.CollectorOptions).InitFromViper(v)
cOpts, err := new(collectorFlags.CollectorOptions).InitFromViper(v, logger)
if err != nil {
logger.Fatal("Failed to initialize collector", zap.Error(err))
}
Expand Down Expand Up @@ -227,7 +228,7 @@ by default uses only in-memory database.`,
agentApp.AddFlags,
agentRep.AddFlags,
agentGrpcRep.AddFlags,
collectorApp.AddFlags,
collectorFlags.AddFlags,
queryApp.AddFlags,
strategyStoreFactory.AddFlags,
metricsReaderFactory.AddFlags,
Expand Down
33 changes: 17 additions & 16 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
Expand All @@ -34,6 +35,11 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
metricNumWorkers = "collector.num-workers"
metricQueueSize = "collector.queue-size"
)

// Collector returns the collector as a manageable unit of work
type Collector struct {
// required to start a new collector
Expand Down Expand Up @@ -82,10 +88,10 @@ func New(params *CollectorParams) *Collector {
}

// Start the component and underlying dependencies
func (c *Collector) Start(options *CollectorOptions) error {
func (c *Collector) Start(options *flags.CollectorOptions) error {
handlerBuilder := &SpanHandlerBuilder{
SpanWriter: c.spanWriter,
CollectorOpts: *options,
CollectorOpts: options,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
}
Expand Down Expand Up @@ -145,28 +151,23 @@ func (c *Collector) Start(options *CollectorOptions) error {
}
c.zkServer = zkServer

otlpReceiver, err := handler.StartOtelReceiver(
handler.OtelReceiverOptions{
GRPCHostPort: options.OTLP.GRPCHostPort,
HTTPHostPort: options.OTLP.HTTPHostPort,
},
c.logger,
c.spanProcessor,
)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
if options.OTLP.Enabled {
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
}
c.otlpReceiver = otlpReceiver
}
c.otlpReceiver = otlpReceiver

c.publishOpts(options)

return nil
}

func (c *Collector) publishOpts(cOpts *CollectorOptions) {
func (c *Collector) publishOpts(cOpts *flags.CollectorOptions) {
internalFactory := c.metricsFactory.Namespace(metrics.NSOptions{Name: "internal"})
internalFactory.Gauge(metrics.Options{Name: collectorNumWorkers}).Update(int64(cOpts.NumWorkers))
internalFactory.Gauge(metrics.Options{Name: collectorQueueSize}).Update(int64(cOpts.QueueSize))
internalFactory.Gauge(metrics.Options{Name: metricNumWorkers}).Update(int64(cOpts.NumWorkers))
internalFactory.Gauge(metrics.Options{Name: metricQueueSize}).Update(int64(cOpts.QueueSize))
}

// Close the component and all its underlying dependencies
Expand Down
25 changes: 17 additions & 8 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
Expand All @@ -34,12 +35,14 @@ import (

var _ (io.Closer) = (*Collector)(nil)

func optionsForEphemeralPorts() *CollectorOptions {
collectorOpts := &CollectorOptions{}
func optionsForEphemeralPorts() *flags.CollectorOptions {
collectorOpts := &flags.CollectorOptions{}
collectorOpts.GRPC.HostPort = ":0"
collectorOpts.HTTP.HostPort = ":0"
collectorOpts.OTLP.GRPCHostPort = ":0"
collectorOpts.OTLP.HTTPHostPort = ":0"
collectorOpts.OTLP.Enabled = true
collectorOpts.OTLP.GRPC.HostPort = ":0"
collectorOpts.OTLP.HTTP.HostPort = ":0"
collectorOpts.Zipkin.HTTPHostPort = ":0"
return collectorOpts
}

Expand All @@ -59,13 +62,15 @@ func TestNewCollector(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})

collectorOpts := optionsForEphemeralPorts()
require.NoError(t, c.Start(collectorOpts))
assert.NotNil(t, c.SpanHandlers())
assert.NoError(t, c.Close())
}

func TestCollector_StartErrors(t *testing.T) {
run := func(name string, options *CollectorOptions, expErr string) {
run := func(name string, options *flags.CollectorOptions, expErr string) {
t.Run(name, func(t *testing.T) {
hc := healthcheck.New()
logger := zap.NewNop()
Expand All @@ -87,7 +92,7 @@ func TestCollector_StartErrors(t *testing.T) {
})
}

var options *CollectorOptions
var options *flags.CollectorOptions

options = optionsForEphemeralPorts()
options.GRPC.HostPort = ":-1"
Expand All @@ -102,8 +107,12 @@ func TestCollector_StartErrors(t *testing.T) {
run("Zipkin", options, "could not start Zipkin server")

options = optionsForEphemeralPorts()
options.OTLP.HTTPHostPort = ":-1"
run("OTLP", options, "could not start OTLP receiver")
options.OTLP.GRPC.HostPort = ":-1"
run("OTLP/GRPC", options, "could not start OTLP receiver")

options = optionsForEphemeralPorts()
options.OTLP.HTTP.HostPort = ":-1"
run("OTLP/HTTP", options, "could not start OTLP receiver")
}

type mockStrategyStore struct {
Expand Down
158 changes: 0 additions & 158 deletions cmd/collector/app/flags.go

This file was deleted.

Loading

0 comments on commit eea3bff

Please sign in to comment.