Skip to content

Commit

Permalink
coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed May 27, 2022
1 parent b6bb0f8 commit 143db32
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 42 deletions.
16 changes: 8 additions & 8 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector: %w", err)
return fmt.Errorf("could not start gRPC server: %w", err)
}
c.grpcServer = grpcServer

Expand All @@ -123,7 +123,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
Logger: c.logger,
})
if err != nil {
return fmt.Errorf("could not start the HTTP server: %w", err)
return fmt.Errorf("could not start HTTP server: %w", err)
}
c.hServer = httpServer

Expand All @@ -141,7 +141,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
MetricsFactory: c.metricsFactory,
})
if err != nil {
return fmt.Errorf("could not start the Zipkin server: %w", err)
return fmt.Errorf("could not start Zipkin server: %w", err)
}
c.zkServer = zkServer

Expand All @@ -154,7 +154,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
c.spanProcessor,
)
if err != nil {
return err
return fmt.Errorf("could not start OTLP server: %w", err)
}
c.otlpReceiver = otlpReceiver

Expand All @@ -171,12 +171,12 @@ func (c *Collector) publishOpts(cOpts *CollectorOptions) {

// Close the component and all its underlying dependencies
func (c *Collector) Close() error {
// gRPC server
// Stop gRPC server
if c.grpcServer != nil {
c.grpcServer.GracefulStop()
}

// HTTP server
// Stop HTTP server
if c.hServer != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.hServer.Shutdown(timeout); err != nil {
Expand All @@ -185,7 +185,7 @@ func (c *Collector) Close() error {
defer cancel()
}

// Zipkin server
// Stop Zipkin server
if c.zkServer != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.zkServer.Shutdown(timeout); err != nil {
Expand All @@ -194,7 +194,7 @@ func (c *Collector) Close() error {
defer cancel()
}

// OpenTelemetry OTLP receiver
// Stop OpenTelemetry OTLP receiver
if c.otlpReceiver != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.otlpReceiver.Shutdown(timeout); err != nil {
Expand Down
99 changes: 73 additions & 26 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/fork"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"
Expand All @@ -33,6 +34,15 @@ import (

var _ (io.Closer) = (*Collector)(nil)

func optionsForEphemeralPorts() *CollectorOptions {
collectorOpts := &CollectorOptions{}
collectorOpts.GRPC.HostPort = ":0"
collectorOpts.HTTP.HostPort = ":0"
collectorOpts.OTLP.GRPCHostPort = ":0"
collectorOpts.OTLP.HTTPHostPort = ":0"
return collectorOpts
}

func TestNewCollector(t *testing.T) {
// prepare
hc := healthcheck.New()
Expand All @@ -49,13 +59,51 @@ func TestNewCollector(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})
collectorOpts := &CollectorOptions{}
collectorOpts := optionsForEphemeralPorts()
require.NoError(t, c.Start(collectorOpts))
assert.NoError(t, c.Close())
}

// test
c.Start(collectorOpts)
func TestCollector_StartErrors(t *testing.T) {
run := func(name string, options *CollectorOptions, expErr string) {
t.Run(name, func(t *testing.T) {
hc := healthcheck.New()
logger := zap.NewNop()
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
})
err := c.Start(options)
require.Error(t, err)
assert.Contains(t, err.Error(), expErr)
})
}

// verify
assert.NoError(t, c.Close())
var options *CollectorOptions

options = optionsForEphemeralPorts()
options.GRPC.HostPort = ":-1"
run("gRPC", options, "could not start gRPC server")

options = optionsForEphemeralPorts()
options.HTTP.HostPort = ":-1"
run("HTTP", options, "could not start HTTP server")

options = optionsForEphemeralPorts()
options.Zipkin.HTTPHostPort = ":-1"
run("Zipkin", options, "could not start Zipkin server")

options = optionsForEphemeralPorts()
options.OTLP.HTTPHostPort = ":-1"
run("OTLP", options, "could not start OTLP server")
}

type mockStrategyStore struct {
Expand Down Expand Up @@ -83,12 +131,11 @@ func TestCollector_PublishOpts(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})
collectorOpts := &CollectorOptions{
NumWorkers: 24,
QueueSize: 42,
}
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
collectorOpts.QueueSize = 42

c.Start(collectorOpts)
require.NoError(t, c.Start(collectorOpts))
defer c.Close()

forkFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{
Expand Down Expand Up @@ -119,16 +166,13 @@ func TestAggregator(t *testing.T) {
HealthCheck: hc,
Aggregator: agg,
})
collectorOpts := &CollectorOptions{
QueueSize: 10,
NumWorkers: 10,
}

// test
c.Start(collectorOpts)
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
collectorOpts.QueueSize = 10
require.NoError(t, c.Start(collectorOpts))

// assert that aggregator was added to the collector
_, err := c.spanProcessor.ProcessSpans([]*model.Span{
spans := []*model.Span{
{
OperationName: "y",
Process: &model.Process{
Expand All @@ -145,15 +189,18 @@ func TestAggregator(t *testing.T) {
},
},
},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
}
_, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
assert.NoError(t, err)

// verify
assert.NoError(t, c.Close())

// assert that aggregator was used
assert.Equal(t, 1, agg.callCount)

// assert that aggregator close was called
assert.Equal(t, 1, agg.closeCount)
// spans are processed by background workers, so we may need to wait
for i := 0; i < 1000; i++ {
if agg.callCount.Load() == 1 && agg.closeCount.Load() == 1 {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.EqualValues(t, 1, agg.callCount.Load(), "aggregator was used")
assert.EqualValues(t, 1, agg.closeCount.Load(), "aggregator close was called")
}
17 changes: 9 additions & 8 deletions cmd/collector/app/root_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
)

type mockAggregator struct {
callCount int
closeCount int
callCount atomic.Int32
closeCount atomic.Int32
}

func (t *mockAggregator) RecordThroughput(service, operation, samplerType string, probability float64) {
t.callCount++
t.callCount.Inc()
}
func (t *mockAggregator) Start() {}
func (t *mockAggregator) Close() error {
t.closeCount++
t.closeCount.Inc()
return nil
}

Expand All @@ -44,26 +45,26 @@ func TestHandleRootSpan(t *testing.T) {
// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)
assert.EqualValues(t, 0, aggregator.callCount.Load())

// Testing span with service name but no operation
span.References = []model.SpanRef{}
span.Process = &model.Process{
ServiceName: "service",
}
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)
assert.EqualValues(t, 0, aggregator.callCount.Load())

// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = "GET"
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)
assert.EqualValues(t, 0, aggregator.callCount.Load())

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
processor(span, "")
assert.Equal(t, 1, aggregator.callCount)
assert.EqualValues(t, 1, aggregator.callCount.Load())
}

0 comments on commit 143db32

Please sign in to comment.