-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[jaeger-v2] Add support for artificial jaeger storage receiver #5242
Changes from 5 commits
840bb4e
ceeab3f
da4451e
a33c866
c8d3b84
84e5845
2f5f2e0
ffd73ab
063fdcd
80ca79a
3d6cfe7
14224ef
2aaf536
efcf080
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package storagereceiver | ||
|
||
import ( | ||
"github.com/asaskevich/govalidator" | ||
) | ||
|
||
type Config struct { | ||
TraceStorage string `valid:"required" mapstructure:"trace_storage"` | ||
} | ||
|
||
func (cfg *Config) Validate() error { | ||
_, err := govalidator.ValidateStruct(cfg) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package storagereceiver | ||
|
||
import ( | ||
"errors" | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/confmap/confmaptest" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
t.Parallel() | ||
|
||
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) | ||
require.NoError(t, err) | ||
|
||
tests := []struct { | ||
id component.ID | ||
expected component.Config | ||
expectedErr error | ||
}{ | ||
{ | ||
id: component.NewIDWithName(componentType, ""), | ||
expectedErr: errors.New("non zero value required"), | ||
}, | ||
{ | ||
id: component.NewIDWithName(componentType, "external-storage"), | ||
expected: &Config{ | ||
TraceStorage: "external-storage", | ||
}, | ||
}, | ||
} | ||
|
||
for _, tt := range tests { | ||
t.Run(tt.id.String(), func(t *testing.T) { | ||
factory := NewFactory() | ||
cfg := factory.CreateDefaultConfig() | ||
|
||
sub, err := cm.Sub(tt.id.String()) | ||
require.NoError(t, err) | ||
require.NoError(t, component.UnmarshalConfig(sub, cfg)) | ||
|
||
if tt.expectedErr != nil { | ||
require.ErrorContains(t, component.ValidateConfig(cfg), tt.expectedErr.Error()) | ||
} else { | ||
require.NoError(t, component.ValidateConfig(cfg)) | ||
assert.Equal(t, tt.expected, cfg) | ||
} | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package storagereceiver | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/receiver" | ||
) | ||
|
||
// componentType is the name of this extension in configuration. | ||
const componentType = component.Type("jaeger_storage_receiver") | ||
|
||
func NewFactory() receiver.Factory { | ||
return receiver.NewFactory( | ||
componentType, | ||
createDefaultConfig, | ||
receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), | ||
) | ||
} | ||
|
||
func createDefaultConfig() component.Config { | ||
return &Config{} | ||
} | ||
|
||
func createTracesReceiver(ctx context.Context, set receiver.CreateSettings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { | ||
cfg := config.(*Config) | ||
|
||
return newTracesReceiver(cfg, set, nextConsumer) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package storagereceiver | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/receiver/receivertest" | ||
) | ||
|
||
func TestCreateDefaultConfig(t *testing.T) { | ||
cfg := createDefaultConfig().(*Config) | ||
require.NotNil(t, cfg, "failed to create default config") | ||
require.NoError(t, componenttest.CheckConfigStruct(cfg)) | ||
} | ||
|
||
func TestCreateTracesReceiver(t *testing.T) { | ||
cfg := createDefaultConfig().(*Config) | ||
f := NewFactory() | ||
r, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) | ||
require.NoError(t, err) | ||
assert.NotNil(t, r) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package storagereceiver | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/jaegertracing/jaeger/pkg/testutils" | ||
) | ||
|
||
func TestMain(m *testing.M) { | ||
testutils.VerifyGoLeaks(m) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package storagereceiver | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/receiver" | ||
"go.uber.org/zap" | ||
|
||
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" | ||
"github.com/jaegertracing/jaeger/model" | ||
"github.com/jaegertracing/jaeger/storage/spanstore" | ||
) | ||
|
||
type storageReceiver struct { | ||
cancelConsumeLoop context.CancelFunc | ||
config *Config | ||
settings receiver.CreateSettings | ||
consumedTraces map[model.TraceID]*consumedTrace | ||
nextConsumer consumer.Traces | ||
spanReader spanstore.Reader | ||
} | ||
|
||
type consumedTrace struct { | ||
spanIDs map[model.SpanID]struct{} | ||
} | ||
|
||
func newTracesReceiver(config *Config, set receiver.CreateSettings, nextConsumer consumer.Traces) (*storageReceiver, error) { | ||
return &storageReceiver{ | ||
config: config, | ||
settings: set, | ||
consumedTraces: make(map[model.TraceID]*consumedTrace), | ||
nextConsumer: nextConsumer, | ||
}, nil | ||
} | ||
|
||
func (r *storageReceiver) Start(_ context.Context, host component.Host) error { | ||
f, err := jaegerstorage.GetStorageFactory(r.config.TraceStorage, host) | ||
if err != nil { | ||
return fmt.Errorf("cannot find storage factory: %w", err) | ||
} | ||
|
||
if r.spanReader, err = f.CreateSpanReader(); err != nil { | ||
return fmt.Errorf("cannot create span reader: %w", err) | ||
} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
r.cancelConsumeLoop = cancel | ||
|
||
go func() { | ||
if err := r.consumeLoop(ctx); err != nil { | ||
r.settings.ReportStatus(component.NewFatalErrorEvent(err)) | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
func (r *storageReceiver) consumeLoop(ctx context.Context) error { | ||
for { | ||
services, err := r.spanReader.GetServices(ctx) | ||
if err != nil { | ||
r.settings.Logger.Error("Failed to get services from consumer", zap.Error(err)) | ||
return err | ||
} | ||
|
||
for _, svc := range services { | ||
if err := r.consumeTraces(ctx, svc); err != nil { | ||
r.settings.Logger.Error("Failed to consume traces from consumer", zap.Error(err)) | ||
} | ||
} | ||
|
||
if ctx.Err() != nil { | ||
r.settings.Logger.Error("Consumer stopped", zap.Error(ctx.Err())) | ||
return ctx.Err() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I actually follow kafkareceiver from otelcol-contrib since they're both a "pull" strategy to retrieve traces. https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/3652b444802fad4819b50fbda41e0957000d8279/receiver/kafkareceiver/kafka_receiver.go#L171-L185 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's wrong to treat orderly shutdown as error and log all kinds of error messages. You can check Done instead select {
case <-ctx.Done():
return
default:
time.Sleep(time.Second)
} |
||
} | ||
} | ||
|
||
func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error { | ||
traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, trace := range traces { | ||
traceID := trace.Spans[0].TraceID | ||
if _, ok := r.consumedTraces[traceID]; !ok { | ||
r.consumedTraces[traceID] = &consumedTrace{ | ||
spanIDs: make(map[model.SpanID]struct{}), | ||
} | ||
} | ||
if len(trace.Spans) > len(r.consumedTraces[traceID].spanIDs) { | ||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
r.consumeSpans(ctx, r.consumedTraces[traceID], trace.Spans) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *storageReceiver) consumeSpans(ctx context.Context, tc *consumedTrace, spans []*model.Span) error { | ||
// Spans are consumed one at a time because we don't know whether all spans | ||
// in a trace have been completely exported | ||
for _, span := range spans { | ||
if _, ok := tc.spanIDs[span.SpanID]; !ok { | ||
tc.spanIDs[span.SpanID] = struct{}{} | ||
td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ | ||
{ | ||
Spans: []*model.Span{span}, | ||
Process: span.Process, | ||
}, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
r.nextConsumer.ConsumeTraces(ctx, td) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *storageReceiver) Shutdown(_ context.Context) error { | ||
r.cancelConsumeLoop() | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the
r.cancelConsumeLoop()
is called fromShutdown
, we can capture that withctx.Err()
. Is it better to check withctx.Done
? I followed kafkareceiver implementation from otelcol-contrib actually https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/3652b444802fad4819b50fbda41e0957000d8279/receiver/kafkareceiver/kafka_receiver.go#L171-L185