Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow tenant through processors as a string #3661

Merged
merged 3 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/collector/app/model_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ import (
)

// ProcessSpan processes a Domain Model Span
type ProcessSpan func(span *model.Span)
type ProcessSpan func(span *model.Span, tenant string)

// ProcessSpans processes a batch of Domain Model Spans
type ProcessSpans func(spans []*model.Span)
type ProcessSpans func(spans []*model.Span, tenant string)

// FilterSpan decides whether to allow or disallow a span
type FilterSpan func(span *model.Span) bool

// ChainedProcessSpan chains spanProcessors as a single ProcessSpan call
func ChainedProcessSpan(spanProcessors ...ProcessSpan) ProcessSpan {
return func(span *model.Span) {
return func(span *model.Span, tenant string) {
for _, processor := range spanProcessors {
processor(span)
processor(span, tenant)
}
}
}
6 changes: 3 additions & 3 deletions cmd/collector/app/model_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
func TestChainedProcessSpan(t *testing.T) {
happened1 := false
happened2 := false
func1 := func(span *model.Span) { happened1 = true }
func2 := func(span *model.Span) { happened2 = true }
func1 := func(span *model.Span, tenant string) { happened1 = true }
func2 := func(span *model.Span, tenant string) { happened2 = true }
chained := ChainedProcessSpan(func1, func2)
chained(&model.Span{})
chained(&model.Span{}, "")
assert.True(t, happened1)
assert.True(t, happened2)
}
4 changes: 2 additions & 2 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func (o options) apply(opts ...Option) options {
ret.hostMetrics = metrics.NullFactory
}
if ret.preProcessSpans == nil {
ret.preProcessSpans = func(spans []*model.Span) {}
ret.preProcessSpans = func(spans []*model.Span, tenant string) {}
}
if ret.sanitizer == nil {
ret.sanitizer = func(span *model.Span) *model.Span { return span }
}
if ret.preSave == nil {
ret.preSave = func(span *model.Span) {}
ret.preSave = func(span *model.Span, tenant string) {}
}
if ret.spanFilter == nil {
ret.spanFilter = func(span *model.Span) bool { return true }
Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func TestAllOptionSet(t *testing.T) {
Options.ServiceMetrics(metrics.NullFactory),
Options.Logger(zap.NewNop()),
Options.NumWorkers(5),
Options.PreProcessSpans(func(spans []*model.Span) {}),
Options.PreProcessSpans(func(spans []*model.Span, tenant string) {}),
Options.Sanitizer(func(span *model.Span) *model.Span { return span }),
Options.QueueSize(10),
Options.DynQueueSizeWarmup(1000),
Options.DynQueueSizeMemory(1024),
Options.PreSave(func(span *model.Span) {}),
Options.PreSave(func(span *model.Span, tenant string) {}),
Options.CollectorTags(map[string]string{"extra": "tags"}),
)
assert.EqualValues(t, 5, opts.numWorkers)
Expand All @@ -59,8 +59,8 @@ func TestNoOptionsSet(t *testing.T) {
assert.Nil(t, opts.collectorTags)
assert.False(t, opts.reportBusy)
assert.False(t, opts.blockingSubmit)
assert.NotPanics(t, func() { opts.preProcessSpans(nil) })
assert.NotPanics(t, func() { opts.preSave(nil) })
assert.NotPanics(t, func() { opts.preProcessSpans(nil, "") })
assert.NotPanics(t, func() { opts.preSave(nil, "") })
assert.True(t, opts.spanFilter(nil))
span := model.Span{}
assert.EqualValues(t, &span, opts.sanitizer(&span))
Expand Down
1 change: 1 addition & 0 deletions cmd/collector/app/processor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var ErrBusy = errors.New("server busy")
type SpansOptions struct {
SpanFormat SpanFormat
InboundTransport InboundTransport
Tenant string
}

// SpanProcessor handles model spans
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/root_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// handleRootSpan returns a function that records throughput for root spans
func handleRootSpan(aggregator strategystore.Aggregator, logger *zap.Logger) ProcessSpan {
return func(span *model.Span) {
return func(span *model.Span, tenant string) {
// TODO simply checking parentId to determine if a span is a root span is not sufficient. However,
// we can be sure that only a root span will have sampler tags.
if span.ParentSpanID() != model.NewSpanID(0) {
Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/root_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,27 @@ func TestHandleRootSpan(t *testing.T) {

// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
processor(span)
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)

// Testing span with service name but no operation
span.References = []model.SpanRef{}
span.Process = &model.Process{
ServiceName: "service",
}
processor(span)
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)

// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = "GET"
processor(span)
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
processor(span)
processor(span, "")
assert.Equal(t, 1, aggregator.callCount)
}
22 changes: 14 additions & 8 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/queue"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -61,6 +62,7 @@ type spanProcessor struct {
type queueItem struct {
queuedTime time.Time
span *model.Span
tenant string
}

// NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans.
Expand Down Expand Up @@ -141,16 +143,19 @@ func (sp *spanProcessor) Close() error {
return nil
}

func (sp *spanProcessor) saveSpan(span *model.Span) {
func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) {
if nil == span.Process {
sp.logger.Error("process is empty for the span")
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span)
return
}

startTime := time.Now()
// TODO context should be propagated from upstream components
if err := sp.spanWriter.WriteSpan(context.TODO(), span); err != nil {
// Since we save spans asynchronously from receiving them, we cannot reuse
// the inbound Context, as it may be cancelled by the time we reach this point,
// so we need to start a new Context.
ctx := storage.WithTenant(context.Background(), tenant)
if err := sp.spanWriter.WriteSpan(ctx, span); err != nil {
sp.logger.Error("Failed to save span", zap.Error(err))
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span)
} else {
Expand All @@ -161,17 +166,17 @@ func (sp *spanProcessor) saveSpan(span *model.Span) {
sp.metrics.SaveLatency.Record(time.Since(startTime))
}

func (sp *spanProcessor) countSpan(span *model.Span) {
func (sp *spanProcessor) countSpan(span *model.Span, tenant string) {
sp.bytesProcessed.Add(uint64(span.Size()))
sp.spansProcessed.Inc()
}

func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options processor.SpansOptions) ([]bool, error) {
sp.preProcessSpans(mSpans)
sp.preProcessSpans(mSpans, options.Tenant)
sp.metrics.BatchSize.Update(int64(len(mSpans)))
retMe := make([]bool, len(mSpans))
for i, mSpan := range mSpans {
ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport)
ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport, options.Tenant)
if !ok && sp.reportBusy {
return nil, processor.ErrBusy
}
Expand All @@ -181,7 +186,7 @@ func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options processor.Sp
}

func (sp *spanProcessor) processItemFromQueue(item *queueItem) {
sp.processSpan(sp.sanitizer(item.span))
sp.processSpan(sp.sanitizer(item.span), item.tenant)
sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime))
}

Expand All @@ -206,7 +211,7 @@ func (sp *spanProcessor) addCollectorTags(span *model.Span) {
typedTags.Sort()
}

func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport) bool {
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport, tenant string) bool {
spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport)
spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span)

Expand All @@ -224,6 +229,7 @@ func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.
item := &queueItem{
queuedTime: time.Now(),
span: span,
tenant: tenant,
}
return sp.queue.Produce(item)
}
Expand Down
42 changes: 39 additions & 3 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"io"
"reflect"
"sync"
"testing"
"time"
Expand All @@ -34,6 +35,7 @@ import (
zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)
Expand Down Expand Up @@ -168,12 +170,21 @@ type fakeSpanWriter struct {
spansLock sync.Mutex
spans []*model.Span
err error
tenants map[string]bool
}

func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
n.spansLock.Lock()
defer n.spansLock.Unlock()
n.spans = append(n.spans, span)

// Record all unique tenants arriving in span Contexts
if n.tenants == nil {
n.tenants = make(map[string]bool)
}

n.tenants[storage.GetTenant(ctx)] = true

return n.err
}

Expand Down Expand Up @@ -331,7 +342,7 @@ func TestSpanProcessorWithNilProcess(t *testing.T) {
p := NewSpanProcessor(w, nil, Options.ServiceMetrics(serviceMetrics)).(*spanProcessor)
defer assert.NoError(t, p.Close())

p.saveSpan(&model.Span{})
p.saveSpan(&model.Span{}, "")

expected := []metricstest.ExpectedMetric{{
Name: "service.spans.saved-by-svc|debug=false|result=err|svc=__unknown", Value: 1,
Expand Down Expand Up @@ -395,7 +406,7 @@ func TestSpanProcessorCountSpan(t *testing.T) {
p := NewSpanProcessor(w, nil, Options.HostMetrics(m), Options.DynQueueSizeMemory(1000)).(*spanProcessor)
p.background(10*time.Millisecond, p.updateGauges)

p.processSpan(&model.Span{})
p.processSpan(&model.Span{}, "")
assert.NotEqual(t, uint64(0), p.bytesProcessed)

for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -550,7 +561,7 @@ func TestAdditionalProcessors(t *testing.T) {

// additional processor is called
count := 0
f := func(s *model.Span) {
f := func(s *model.Span, t string) {
count++
}
p = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1))
Expand All @@ -566,3 +577,28 @@ func TestAdditionalProcessors(t *testing.T) {
assert.NoError(t, p.Close())
assert.Equal(t, 1, count)
}

func TestSpanProcessorContextPropagation(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.QueueSize(1))

dummyTenant := "context-prop-test-tenant"

res, err := p.ProcessSpans([]*model.Span{
{
Process: &model.Process{
ServiceName: "x",
},
},
}, processor.SpansOptions{
Tenant: dummyTenant,
})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
assert.NoError(t, p.Close())

// Verify that the dummy tenant from SpansOptions context made it to writer
assert.Equal(t, true, w.tenants[dummyTenant])
// Verify no other tenantKey context values made it to writer
assert.True(t, reflect.DeepEqual(w.tenants, map[string]bool{dummyTenant: true}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can simply use assert.Equal(), it should compare the maps correctly iirc.

}
15 changes: 0 additions & 15 deletions storage/empty_test.go

This file was deleted.

43 changes: 43 additions & 0 deletions storage/tenant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import "context"

// tenantKeyType is a custom type for the key "tenant", following context.Context convention
type tenantKeyType string

const (
// tenantKey holds tenancy for spans
tenantKey = tenantKeyType("tenant")
)

// WithTenant creates a Context with a tenant association
func WithTenant(ctx context.Context, tenant string) context.Context {
return context.WithValue(context.Background(), tenantKey, tenant)
}

// GetTenant retrieves a tenant associated with a Context
func GetTenant(ctx context.Context) string {
tenant := ctx.Value(tenantKey)
if tenant == nil {
return ""
}

if s, ok := tenant.(string); ok {
return s
}
return ""
}
Loading