Skip to content

Commit

Permalink
[jaeger-v2] Add support for artificial jaeger storage receiver (#5242)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #4843
- Separate Jaeger storage receiver PR from and will be used by jaeger-v2
Kafka PR #4971

## Description of the changes
- Implement Jaeger storage receiver to be used by Jaeger-v2 Kafka
integration test.

## How was this change tested?
- Added some unit tests.

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: James Ryans <[email protected]>
  • Loading branch information
james-ryans authored Mar 4, 2024
1 parent 286b94a commit 4fd7e71
Show file tree
Hide file tree
Showing 12 changed files with 629 additions and 3 deletions.
23 changes: 23 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Storage Receiver

`storagereceiver` is a fake receiver that creates an artificial stream of traces by:

- repeatedly querying one of Jaeger storage backends for all traces (by service).
- tracking new traces / spans and passing them to the next component in the pipeline.

# Getting Started

The following settings are required:

- `trace_storage` (no default): name of a storage backend defined in `jaegerstorage` extension

The following settings can be optionally configured:

- `pull_interval` (default = 0s): The delay between each iteration of pulling traces.

```yaml
receivers:
jaeger_storage_receiver:
trace_storage: external-storage
pull_interval: 0s
```
20 changes: 20 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"time"

"github.com/asaskevich/govalidator"
)

type Config struct {
TraceStorage string `valid:"required" mapstructure:"trace_storage"`
PullInterval time.Duration `mapstructure:"pull_interval"`
}

func (cfg *Config) Validate() error {
_, err := govalidator.ValidateStruct(cfg)
return err
}
66 changes: 66 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"errors"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected component.Config
expectedErr error
}{
{
id: component.NewIDWithName(componentType, ""),
expectedErr: errors.New("non zero value required"),
},
{
id: component.NewIDWithName(componentType, "defaults"),
expected: &Config{
TraceStorage: "storage",
PullInterval: 0,
},
},
{
id: component.NewIDWithName(componentType, "filled"),
expected: &Config{
TraceStorage: "storage",
PullInterval: 2 * time.Second,
},
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

if tt.expectedErr != nil {
require.ErrorContains(t, component.ValidateConfig(cfg), tt.expectedErr.Error())
} else {
require.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
}
})
}
}
33 changes: 33 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

// componentType is the name of this extension in configuration.
const componentType = component.Type("jaeger_storage_receiver")

func NewFactory() receiver.Factory {
return receiver.NewFactory(
componentType,
createDefaultConfig,
receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment),
)
}

func createDefaultConfig() component.Config {
return &Config{}
}

func createTracesReceiver(ctx context.Context, set receiver.CreateSettings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) {
cfg := config.(*Config)

return newTracesReceiver(cfg, set, nextConsumer)
}
28 changes: 28 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NotNil(t, cfg, "failed to create default config")
require.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestCreateTracesReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
f := NewFactory()
r, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
require.NoError(t, err)
assert.NotNil(t, r)
}
14 changes: 14 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
138 changes: 138 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"context"
"fmt"
"time"

jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type storageReceiver struct {
cancelConsumeLoop context.CancelFunc
config *Config
settings receiver.CreateSettings
consumedTraces map[model.TraceID]*consumedTrace
nextConsumer consumer.Traces
spanReader spanstore.Reader
}

type consumedTrace struct {
spanIDs map[model.SpanID]struct{}
}

func newTracesReceiver(config *Config, set receiver.CreateSettings, nextConsumer consumer.Traces) (*storageReceiver, error) {
return &storageReceiver{
config: config,
settings: set,
consumedTraces: make(map[model.TraceID]*consumedTrace),
nextConsumer: nextConsumer,
}, nil
}

func (r *storageReceiver) Start(ctx context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactory(r.config.TraceStorage, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}

if r.spanReader, err = f.CreateSpanReader(); err != nil {
return fmt.Errorf("cannot create span reader: %w", err)
}

ctx, cancel := context.WithCancel(ctx)
r.cancelConsumeLoop = cancel

go func() {
if err := r.consumeLoop(ctx); err != nil {
r.settings.ReportStatus(component.NewFatalErrorEvent(err))
}
}()

return nil
}

func (r *storageReceiver) consumeLoop(ctx context.Context) error {
for {
services, err := r.spanReader.GetServices(ctx)
if err != nil {
r.settings.Logger.Error("Failed to get services from consumer", zap.Error(err))
return err
}

for _, svc := range services {
if err := r.consumeTraces(ctx, svc); err != nil {
r.settings.Logger.Error("Failed to consume traces from consumer", zap.Error(err))
}
}

select {
case <-ctx.Done():
r.settings.Logger.Info("Consumer stopped")
return nil
default:
time.Sleep(r.config.PullInterval)
}
}
}

func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error {
traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{
ServiceName: serviceName,
})
if err != nil {
return err
}

for _, trace := range traces {
traceID := trace.Spans[0].TraceID
if _, ok := r.consumedTraces[traceID]; !ok {
r.consumedTraces[traceID] = &consumedTrace{
spanIDs: make(map[model.SpanID]struct{}),
}
}
r.consumeSpans(ctx, r.consumedTraces[traceID], trace.Spans)
}

return nil
}

func (r *storageReceiver) consumeSpans(ctx context.Context, tc *consumedTrace, spans []*model.Span) error {
// Spans are consumed one at a time because we don't know whether all spans
// in a trace have been completely exported
for _, span := range spans {
if _, ok := tc.spanIDs[span.SpanID]; !ok {
tc.spanIDs[span.SpanID] = struct{}{}
td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{
{
Spans: []*model.Span{span},
Process: span.Process,
},
})
if err != nil {
return err
}
r.nextConsumer.ConsumeTraces(ctx, td)
}
}

return nil
}

func (r *storageReceiver) Shutdown(_ context.Context) error {
if r.cancelConsumeLoop != nil {
r.cancelConsumeLoop()
}
return nil
}
Loading

0 comments on commit 4fd7e71

Please sign in to comment.