Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed May 27, 2022
1 parent e573fea commit e107cd6
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 102 deletions.
9 changes: 8 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,14 @@ func (c *Collector) Start(options *CollectorOptions) error {
}
c.zkServer = zkServer

otlpReceiver, err := handler.StartOtelReceiver(c.logger, c.spanProcessor)
otlpReceiver, err := handler.StartOtelReceiver(
handler.OtelReceiverOptions{
GRPCHostPort: options.OTLP.GRPCHostPort,
HTTPHostPort: options.OTLP.HTTPHostPort,
},
c.logger,
c.spanProcessor,
)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/collector/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type CollectorOptions struct {
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
}
// OTLP section defines options for servers accepting OpenTelemetry OTLP format
OTLP struct {
GRPCHostPort string
HTTPHostPort string
}
// Zipkin section defines options for Zipkin HTTP server
Zipkin struct {
// HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
Expand Down
44 changes: 33 additions & 11 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,62 @@ import (
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
logger *zap.Logger
// spanProcessor processor.SpanProcessor
batchConsumer batchConsumer
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler {
return &GRPCHandler{
logger: logger,
spanProcessor: spanProcessor,
logger: logger,
// spanProcessor: spanProcessor,
batchConsumer: batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
},
},
}
}

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
for _, span := range r.GetBatch().Spans {
batch := &r.Batch
err := g.batchConsumer.consume(batch)
return &api_v2.PostSpansResponse{}, err
}

type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
}

func (c *batchConsumer) consume(batch *model.Batch) error {
for _, span := range batch.Spans {
if span.GetProcess() == nil {
span.Process = r.Batch.Process
span.Process = batch.Process
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{
_, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
})
if err != nil {
if err == processor.ErrBusy {
return nil, status.Errorf(codes.ResourceExhausted, err.Error())
return status.Errorf(codes.ResourceExhausted, err.Error())
}
g.logger.Error("cannot process spans", zap.Error(err))
return nil, err
c.logger.Error("cannot process spans", zap.Error(err))
return err
}
return &api_v2.PostSpansResponse{}, nil
return nil
}
70 changes: 47 additions & 23 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand Down Expand Up @@ -123,32 +124,55 @@ func TestGRPCCompressionEnabled(t *testing.T) {
defer conn.Close()

// Do not use string constant imported from grpc, since we are actually testing that package is imported by the handler.
_, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{},
grpc.UseCompressor("gzip"))
_, err := client.PostSpans(
context.Background(),
&api_v2.PostSpansRequest{},
grpc.UseCompressor("gzip"),
)
require.NoError(t, err)
}

func TestPostSpansWithError(t *testing.T) {
expectedError := errors.New("test-error")
processor := &mockSpanProcessor{expectedError: expectedError}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
client, conn := newClient(t, addr)
defer conn.Close()
r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{
Batch: model.Batch{
Spans: []*model.Span{
{
OperationName: "fake-operation",
},
},
testCases := []struct {
processorError error
expectedError string
expectedLog string
}{
{
processorError: errors.New("test-error"),
expectedError: "test-error",
expectedLog: "test-error",
},
})
require.Error(t, err)
require.Nil(t, r)
require.Contains(t, err.Error(), expectedError.Error())
require.Len(t, processor.getSpans(), 1)
{
processorError: processor.ErrBusy,
expectedError: "server busy",
},
}
for _, test := range testCases {
t.Run(test.expectedError, func(t *testing.T) {
processor := &mockSpanProcessor{expectedError: test.processorError}
logger, logBuf := testutils.NewLogger()
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(logger, processor)
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
client, conn := newClient(t, addr)
defer conn.Close()
r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{
Batch: model.Batch{
Spans: []*model.Span{
{
OperationName: "fake-operation",
},
},
},
})
require.Error(t, err)
require.Nil(t, r)
assert.Contains(t, err.Error(), test.expectedError)
assert.Contains(t, logBuf.String(), test.expectedLog)
assert.Len(t, processor.getSpans(), 1)
})
}
}
88 changes: 54 additions & 34 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,66 +29,86 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// A delegation function to assist in tests, because ProtoFromTraces never returns errors despite its API.
var protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error) = otlp2jaeger.ProtoFromTraces
// // A delegation function to assist in tests, because ProtoFromTraces never returns errors despite its API.
// var protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error) = otlp2jaeger.ProtoFromTraces

var _ component.Host = (*otelHost)(nil) // API check

// OtelReceiverOptions allows configuration of the receiver.
type OtelReceiverOptions struct {
GRPCAddress string
HTTPAddress string
GRPCHostPort string
HTTPHostPort string
}

// StartOtelReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports.
func StartOtelReceiver(logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) {
func StartOtelReceiver(options OtelReceiverOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) {
otlpFactory := otlpreceiver.NewFactory()
otlpReceiverConfig := otlpFactory.CreateDefaultConfig()
otlpReceiverConfig := otlpFactory.CreateDefaultConfig().(*otlpreceiver.Config)
if options.GRPCHostPort != "" {
otlpReceiverConfig.GRPC.NetAddr.Endpoint = options.GRPCHostPort
}
if options.HTTPHostPort != "" {
otlpReceiverConfig.HTTP.Endpoint = options.HTTPHostPort
}
otlpReceiverSettings := component.ReceiverCreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: otel.GetTracerProvider(), // TODO we may always want no-op here, not the global default
},
}
// TODO re-implement the logic of NewGRPCHandler, it's fairly trivial
jaegerBatchHandler := NewGRPCHandler(logger, spanProcessor)
nextConsumer, err := consumer.NewTraces(consumer.ConsumeTracesFunc(func(ctx context.Context, ld ptrace.Traces) error {
batches, err := protoFromTraces(ld)
if err != nil {
return err
}
for _, batch := range batches {
// TODO generate metrics
_, err := jaegerBatchHandler.PostSpans(ctx, &api_v2.PostSpansRequest{
Batch: *batch,
})
if err != nil {
return err
}
}
return nil
}))
if err != nil {
return nil, fmt.Errorf("could not create the OTLP consumer: %w", err)
}
otlpReceiver, err := otlpFactory.CreateTracesReceiver(

otlpConsumer := newConsumerDelegate(logger, spanProcessor)
// the following two constructors never return errors given non-nil arguments, so we ignore errors
nextConsumer, _ := consumer.NewTraces(consumer.ConsumeTracesFunc(otlpConsumer.consume))
otlpReceiver, _ := otlpFactory.CreateTracesReceiver(
context.Background(),
otlpReceiverSettings,
otlpReceiverConfig,
nextConsumer,
)
if err != nil {
return nil, fmt.Errorf("could not create the OTLP receiver: %w", err)
}
err = otlpReceiver.Start(context.Background(), &otelHost{logger: logger})
if err != nil {
if err := otlpReceiver.Start(context.Background(), &otelHost{logger: logger}); err != nil {
return nil, fmt.Errorf("could not start the OTLP receiver: %w", err)
}
return otlpReceiver, nil
}

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate {
return &consumerDelegate{
logger: logger,
batchConsumer: batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
SpanFormat: processor.OTLPSpanFormat,
InboundTransport: processor.UnknownTransport, // could be gRPC or HTTP
},
},
protoFromTraces: otlp2jaeger.ProtoFromTraces,
}
}

type consumerDelegate struct {
logger *zap.Logger
batchConsumer batchConsumer
protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error)
}

func (c *consumerDelegate) consume(ctx context.Context, ld ptrace.Traces) error {
batches, err := c.protoFromTraces(ld)
if err != nil {
return err
}
for _, batch := range batches {
err := c.batchConsumer.consume(batch)
if err != nil {
return err
}
}
return nil
}

// otelHost is a mostly no-op implementation of OTEL component.Host
type otelHost struct {
logger *zap.Logger
Expand Down
Loading

0 comments on commit e107cd6

Please sign in to comment.