Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTLP receiver to collector #3701

Merged
merged 8 commits into from
May 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"time"

"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
Expand All @@ -49,6 +51,7 @@ type Collector struct {
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
otlpReceiver component.TracesReceiver
tlsGRPCCertWatcherCloser io.Closer
tlsHTTPCertWatcherCloser io.Closer
tlsZipkinCertWatcherCloser io.Closer
Expand Down Expand Up @@ -106,7 +109,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector %w", err)
return fmt.Errorf("could not start gRPC server: %w", err)
}
c.grpcServer = grpcServer

Expand All @@ -120,7 +123,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
Logger: c.logger,
})
if err != nil {
return fmt.Errorf("could not start the HTTP server %w", err)
return fmt.Errorf("could not start HTTP server: %w", err)
}
c.hServer = httpServer

Expand All @@ -138,10 +141,23 @@ func (c *Collector) Start(options *CollectorOptions) error {
MetricsFactory: c.metricsFactory,
})
if err != nil {
return fmt.Errorf("could not start the Zipkin server %w", err)
return fmt.Errorf("could not start Zipkin server: %w", err)
}
c.zkServer = zkServer

otlpReceiver, err := handler.StartOtelReceiver(
handler.OtelReceiverOptions{
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
GRPCHostPort: options.OTLP.GRPCHostPort,
HTTPHostPort: options.OTLP.HTTPHostPort,
},
c.logger,
c.spanProcessor,
)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
}
c.otlpReceiver = otlpReceiver

c.publishOpts(options)

return nil
Expand All @@ -155,12 +171,12 @@ func (c *Collector) publishOpts(cOpts *CollectorOptions) {

// Close the component and all its underlying dependencies
func (c *Collector) Close() error {
// gRPC server
// Stop gRPC server
if c.grpcServer != nil {
c.grpcServer.GracefulStop()
}

// HTTP server
// Stop HTTP server
if c.hServer != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.hServer.Shutdown(timeout); err != nil {
Expand All @@ -169,7 +185,7 @@ func (c *Collector) Close() error {
defer cancel()
}

// Zipkin server
// Stop Zipkin server
if c.zkServer != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.zkServer.Shutdown(timeout); err != nil {
Expand All @@ -178,6 +194,15 @@ func (c *Collector) Close() error {
defer cancel()
}

// Stop OpenTelemetry OTLP receiver
if c.otlpReceiver != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.otlpReceiver.Shutdown(timeout); err != nil {
c.logger.Fatal("failed to stop the OTLP receiver", zap.Error(err))
}
defer cancel()
}

if err := c.spanProcessor.Close(); err != nil {
c.logger.Error("failed to close span processor.", zap.Error(err))
}
Expand Down
99 changes: 73 additions & 26 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/fork"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"
Expand All @@ -33,6 +34,15 @@ import (

var _ (io.Closer) = (*Collector)(nil)

func optionsForEphemeralPorts() *CollectorOptions {
collectorOpts := &CollectorOptions{}
collectorOpts.GRPC.HostPort = ":0"
collectorOpts.HTTP.HostPort = ":0"
collectorOpts.OTLP.GRPCHostPort = ":0"
collectorOpts.OTLP.HTTPHostPort = ":0"
return collectorOpts
}

func TestNewCollector(t *testing.T) {
// prepare
hc := healthcheck.New()
Expand All @@ -49,13 +59,51 @@ func TestNewCollector(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})
collectorOpts := &CollectorOptions{}
collectorOpts := optionsForEphemeralPorts()
require.NoError(t, c.Start(collectorOpts))
assert.NoError(t, c.Close())
}

// test
c.Start(collectorOpts)
func TestCollector_StartErrors(t *testing.T) {
run := func(name string, options *CollectorOptions, expErr string) {
t.Run(name, func(t *testing.T) {
hc := healthcheck.New()
logger := zap.NewNop()
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
})
err := c.Start(options)
require.Error(t, err)
assert.Contains(t, err.Error(), expErr)
})
}

// verify
assert.NoError(t, c.Close())
var options *CollectorOptions

options = optionsForEphemeralPorts()
options.GRPC.HostPort = ":-1"
run("gRPC", options, "could not start gRPC server")

options = optionsForEphemeralPorts()
options.HTTP.HostPort = ":-1"
run("HTTP", options, "could not start HTTP server")

options = optionsForEphemeralPorts()
options.Zipkin.HTTPHostPort = ":-1"
run("Zipkin", options, "could not start Zipkin server")

options = optionsForEphemeralPorts()
options.OTLP.HTTPHostPort = ":-1"
run("OTLP", options, "could not start OTLP receiver")
}

type mockStrategyStore struct {
Expand Down Expand Up @@ -83,12 +131,11 @@ func TestCollector_PublishOpts(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})
collectorOpts := &CollectorOptions{
NumWorkers: 24,
QueueSize: 42,
}
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
collectorOpts.QueueSize = 42

c.Start(collectorOpts)
require.NoError(t, c.Start(collectorOpts))
defer c.Close()

forkFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{
Expand Down Expand Up @@ -119,16 +166,13 @@ func TestAggregator(t *testing.T) {
HealthCheck: hc,
Aggregator: agg,
})
collectorOpts := &CollectorOptions{
QueueSize: 10,
NumWorkers: 10,
}

// test
c.Start(collectorOpts)
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
collectorOpts.QueueSize = 10
require.NoError(t, c.Start(collectorOpts))

// assert that aggregator was added to the collector
_, err := c.spanProcessor.ProcessSpans([]*model.Span{
spans := []*model.Span{
{
OperationName: "y",
Process: &model.Process{
Expand All @@ -145,15 +189,18 @@ func TestAggregator(t *testing.T) {
},
},
},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
}
_, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
assert.NoError(t, err)

// verify
assert.NoError(t, c.Close())

// assert that aggregator was used
assert.Equal(t, 1, agg.callCount)

// assert that aggregator close was called
assert.Equal(t, 1, agg.closeCount)
// spans are processed by background workers, so we may need to wait
for i := 0; i < 1000; i++ {
if agg.callCount.Load() == 1 && agg.closeCount.Load() == 1 {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.EqualValues(t, 1, agg.callCount.Load(), "aggregator was used")
assert.EqualValues(t, 1, agg.closeCount.Load(), "aggregator close was called")
}
5 changes: 5 additions & 0 deletions cmd/collector/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type CollectorOptions struct {
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
}
// OTLP section defines options for servers accepting OpenTelemetry OTLP format
OTLP struct {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored in #3710

GRPCHostPort string
HTTPHostPort string
}
// Zipkin section defines options for Zipkin HTTP server
Zipkin struct {
// HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
Expand Down
42 changes: 31 additions & 11 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,64 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip"
_ "google.golang.org/grpc/encoding/gzip" // register zip encoding
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
batchConsumer batchConsumer
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler {
return &GRPCHandler{
logger: logger,
spanProcessor: spanProcessor,
logger: logger,
batchConsumer: batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
},
},
}
}

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
for _, span := range r.GetBatch().Spans {
batch := &r.Batch
err := g.batchConsumer.consume(batch)
return &api_v2.PostSpansResponse{}, err
}

type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
}

func (c *batchConsumer) consume(batch *model.Batch) error {
for _, span := range batch.Spans {
if span.GetProcess() == nil {
span.Process = r.Batch.Process
span.Process = batch.Process
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{
_, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
})
if err != nil {
if err == processor.ErrBusy {
return nil, status.Errorf(codes.ResourceExhausted, err.Error())
return status.Errorf(codes.ResourceExhausted, err.Error())
}
g.logger.Error("cannot process spans", zap.Error(err))
return nil, err
c.logger.Error("cannot process spans", zap.Error(err))
return err
}
return &api_v2.PostSpansResponse{}, nil
return nil
}
Loading