Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace cassandra-spanstore tracing instrumentation withOTEL #4599

Merged
merged 6 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/cassandra"
Expand Down Expand Up @@ -57,6 +59,7 @@ type Factory struct {
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primarySession cassandra.Session
Expand All @@ -67,6 +70,7 @@ type Factory struct {
// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
}
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger), nil
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -143,7 +147,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger), nil
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
Expand Down
7 changes: 6 additions & 1 deletion plugin/storage/cassandra/savetracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/jaegertracing/jaeger/model"
cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -44,8 +45,12 @@ func main() {
if err != nil {
logger.Fatal("Cannot create Cassandra session", zap.Error(err))
}
tracer, err := jtracer.New("savetracetest")
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}
spanStore := cSpanStore.NewSpanWriter(cqlSession, time.Hour*12, noScope, logger)
spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger)
spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger, tracer.OTEL.Tracer("cSpanStore.SpanReader"))
ctx := context.Background()
if err = spanStore.WriteSpan(ctx, getSomeSpan()); err != nil {
logger.Fatal("Failed to save", zap.Error(err))
Expand Down
70 changes: 39 additions & 31 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"fmt"
"time"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -110,13 +111,15 @@ type SpanReader struct {
operationNamesReader operationNamesReader
metrics spanReaderMetrics
logger *zap.Logger
tracer trace.Tracer
}

// NewSpanReader returns a new SpanReader.
func NewSpanReader(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
tracer trace.Tracer,
) *SpanReader {
readFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "read", Tags: nil})
serviceNamesStorage := NewServiceNamesStorage(session, 0, metricsFactory, logger)
Expand All @@ -134,6 +137,7 @@ func NewSpanReader(
queryServiceNameIndex: casMetrics.NewTable(readFactory, "service_name_index"),
},
logger: logger,
tracer: tracer,
}
}

Expand All @@ -151,9 +155,9 @@ func (s *SpanReader) GetOperations(
}

func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) {
span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.Finish()
span.LogFields(otlog.String("event", "searching"), otlog.Object("trace_id", traceID))
ctx, span := s.startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.End()
span.SetAttributes(attribute.Key("trace_id").String(traceID.String()))

trace, err := s.readTraceInSpan(ctx, traceID)
logErrorToSpan(span, err)
Expand Down Expand Up @@ -305,13 +309,16 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra
}

func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.Finish()
ctx, span := s.startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.End()

results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags))
for k, v := range tq.Tags {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryByTag")
childSpan.LogFields(otlog.String("tag.key", k), otlog.String("tag.value", v))
_, childSpan := s.tracer.Start(ctx, "queryByTag")
childSpan.SetAttributes(
attribute.Key("tag.key").String(k),
attribute.Key("tag.value").String(v),
)
query := s.session.Query(
queryByTag,
tq.ServiceName,
Expand All @@ -322,7 +329,7 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace
tq.NumTraces*limitMultiple,
).PageSize(0)
t, err := s.executeQuery(childSpan, query, s.metrics.queryTagIndex)
childSpan.Finish()
childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -332,8 +339,8 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace
}

func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.Finish()
ctx, span := s.startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.End()

results := dbmodel.UniqueTraceIDs{}

Expand All @@ -349,8 +356,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
endTimeByHour := traceQuery.StartTimeMax.Round(durationBucketSize)

for timeBucket := endTimeByHour; timeBucket.After(startTimeByHour) || timeBucket.Equal(startTimeByHour); timeBucket = timeBucket.Add(-1 * durationBucketSize) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryForTimeBucket")
childSpan.LogFields(otlog.String("timeBucket", timeBucket.String()))
_, childSpan := s.tracer.Start(ctx, "queryForTimeBucket")
childSpan.SetAttributes(attribute.Key("timeBucket").String(timeBucket.String()))
query := s.session.Query(
queryByDuration,
timeBucket,
Expand All @@ -360,7 +367,7 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
maxDurationMicros,
traceQuery.NumTraces*limitMultiple)
t, err := s.executeQuery(childSpan, query, s.metrics.queryDurationIndex)
childSpan.Finish()
childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -376,8 +383,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
}

func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.Finish()
_, span := s.startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.End()
query := s.session.Query(
queryByServiceAndOperationName,
tq.ServiceName,
Expand All @@ -390,8 +397,8 @@ func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spa
}

func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByService", queryByServiceName)
defer span.Finish()
_, span := s.startSpanForQuery(ctx, "queryByService", queryByServiceAndOperationName)
defer span.End()
query := s.session.Query(
queryByServiceName,
tq.ServiceName,
Expand All @@ -402,7 +409,7 @@ func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQuer
return s.executeQuery(span, query, s.metrics.queryServiceNameIndex)
}

func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
start := time.Now()
i := query.Iter()
retMe := dbmodel.UniqueTraceIDs{}
Expand All @@ -414,25 +421,26 @@ func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query,
tableMetrics.Emit(err, time.Since(start))
if err != nil {
logErrorToSpan(span, err)
span.LogFields(otlog.String("query", query.String()))
s.logger.Error("Failed to exec query", zap.Error(err), zap.String("query", query.String()))
return nil, err
}
return retMe, nil
}

func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, name)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "cassandra")
ottag.Component.Set(span, "gocql")
return span, ctx
func (s *SpanReader) startSpanForQuery(ctx context.Context, name, query string) (context.Context, trace.Span) {
ctx, span := s.tracer.Start(ctx, name)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("cassandra"),
attribute.Key("component").String("gocql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
func logErrorToSpan(span trace.Span, err error) {
if err == nil {
return
}
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
53 changes: 38 additions & 15 deletions plugin/storage/cassandra/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
Expand All @@ -37,13 +40,26 @@ import (
)

type spanReaderTest struct {
session *mocks.Session
logger *zap.Logger
logBuffer *testutils.Buffer
reader *SpanReader
session *mocks.Session
logger *zap.Logger
logBuffer *testutils.Buffer
traceBuffer *tracetest.InMemoryExporter
reader *SpanReader
}

func withSpanReader(fn func(r *spanReaderTest)) {
func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter, func()) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func() {
assert.NoError(t, tp.Shutdown(context.Background()))
}
return tp, exporter, closer
}

func withSpanReader(t *testing.T, fn func(r *spanReaderTest)) {
session := &mocks.Session{}
query := &mocks.Query{}
session.On("Query",
Expand All @@ -52,19 +68,22 @@ func withSpanReader(fn func(r *spanReaderTest)) {
query.On("Exec").Return(nil)
logger, logBuffer := testutils.NewLogger()
metricsFactory := metricstest.NewFactory(0)
tracer, exp, closer := tracerProvider(t)
defer closer()
r := &spanReaderTest{
session: session,
logger: logger,
logBuffer: logBuffer,
reader: NewSpanReader(session, metricsFactory, logger),
session: session,
logger: logger,
logBuffer: logBuffer,
traceBuffer: exp,
reader: NewSpanReader(session, metricsFactory, logger, tracer.Tracer("test")),
}
fn(r)
}

var _ spanstore.Reader = &SpanReader{} // check API conformance

func TestSpanReaderGetServices(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
r.reader.serviceNamesReader = func() ([]string, error) { return []string{"service-a"}, nil }
s, err := r.reader.GetServices(context.Background())
assert.NoError(t, err)
Expand All @@ -73,7 +92,7 @@ func TestSpanReaderGetServices(t *testing.T) {
}

func TestSpanReaderGetOperations(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
expectedOperations := []spanstore.Operation{
{
Name: "operation-a",
Expand Down Expand Up @@ -121,7 +140,7 @@ func TestSpanReaderGetTrace(t *testing.T) {
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run("expected err="+testCase.expectedErr, func(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
iter := &mocks.Iterator{}
iter.On("Scan", testCase.scanner).Return(true)
iter.On("Scan", matchEverything()).Return(false)
Expand All @@ -135,6 +154,7 @@ func TestSpanReaderGetTrace(t *testing.T) {

trace, err := r.reader.GetTrace(context.Background(), model.TraceID{})
if testCase.expectedErr == "" {
require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded")
assert.NoError(t, err)
assert.NotNil(t, trace)
} else {
Expand All @@ -148,7 +168,7 @@ func TestSpanReaderGetTrace(t *testing.T) {
}

func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
iter := &mocks.Iterator{}
iter.On("Scan", matchEverything()).Return(false)
iter.On("Close").Return(nil)
Expand All @@ -160,14 +180,16 @@ func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) {
r.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query)

trace, err := r.reader.GetTrace(context.Background(), model.TraceID{})
require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded")
assert.Nil(t, trace)
assert.EqualError(t, err, "trace not found")
})
}

func TestSpanReaderFindTracesBadRequest(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
_, err := r.reader.FindTraces(context.Background(), nil)
require.Empty(t, r.traceBuffer.GetSpans(), "Spans Not recorded")
assert.Error(t, err)
})
}
Expand Down Expand Up @@ -286,7 +308,7 @@ func TestSpanReaderFindTraces(t *testing.T) {
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.caption, func(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
// scanMatcher can match Iter.Scan() parameters and set trace ID fields
scanMatcher := func(name string) interface{} {
traceIDs := []dbmodel.TraceID{
Expand Down Expand Up @@ -384,6 +406,7 @@ func TestSpanReaderFindTraces(t *testing.T) {
}
res, err := r.reader.FindTraces(context.Background(), queryParams)
if testCase.expectedError == "" {
require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded")
assert.NoError(t, err)
assert.Len(t, res, testCase.expectedCount, "expecting certain number of traces")
} else {
Expand Down