Skip to content

Commit

Permalink
Flow tenant through processors as a string (#3661)
Browse files Browse the repository at this point in the history
* Flow tenant through processors as a string

Signed-off-by: Ed Snible <[email protected]>

* GetTenant()/WithTenant() functions

Signed-off-by: Ed Snible <[email protected]>

* Tests for WithTenant()/GetTenant()

Signed-off-by: Ed Snible <[email protected]>
  • Loading branch information
esnible authored May 10, 2022
1 parent 8fd5c0c commit 483cf1a
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 44 deletions.
8 changes: 4 additions & 4 deletions cmd/collector/app/model_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ import (
)

// ProcessSpan processes a Domain Model Span
type ProcessSpan func(span *model.Span)
type ProcessSpan func(span *model.Span, tenant string)

// ProcessSpans processes a batch of Domain Model Spans
type ProcessSpans func(spans []*model.Span)
type ProcessSpans func(spans []*model.Span, tenant string)

// FilterSpan decides whether to allow or disallow a span
type FilterSpan func(span *model.Span) bool

// ChainedProcessSpan chains spanProcessors as a single ProcessSpan call
func ChainedProcessSpan(spanProcessors ...ProcessSpan) ProcessSpan {
return func(span *model.Span) {
return func(span *model.Span, tenant string) {
for _, processor := range spanProcessors {
processor(span)
processor(span, tenant)
}
}
}
6 changes: 3 additions & 3 deletions cmd/collector/app/model_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
func TestChainedProcessSpan(t *testing.T) {
happened1 := false
happened2 := false
func1 := func(span *model.Span) { happened1 = true }
func2 := func(span *model.Span) { happened2 = true }
func1 := func(span *model.Span, tenant string) { happened1 = true }
func2 := func(span *model.Span, tenant string) { happened2 = true }
chained := ChainedProcessSpan(func1, func2)
chained(&model.Span{})
chained(&model.Span{}, "")
assert.True(t, happened1)
assert.True(t, happened2)
}
4 changes: 2 additions & 2 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func (o options) apply(opts ...Option) options {
ret.hostMetrics = metrics.NullFactory
}
if ret.preProcessSpans == nil {
ret.preProcessSpans = func(spans []*model.Span) {}
ret.preProcessSpans = func(spans []*model.Span, tenant string) {}
}
if ret.sanitizer == nil {
ret.sanitizer = func(span *model.Span) *model.Span { return span }
}
if ret.preSave == nil {
ret.preSave = func(span *model.Span) {}
ret.preSave = func(span *model.Span, tenant string) {}
}
if ret.spanFilter == nil {
ret.spanFilter = func(span *model.Span) bool { return true }
Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func TestAllOptionSet(t *testing.T) {
Options.ServiceMetrics(metrics.NullFactory),
Options.Logger(zap.NewNop()),
Options.NumWorkers(5),
Options.PreProcessSpans(func(spans []*model.Span) {}),
Options.PreProcessSpans(func(spans []*model.Span, tenant string) {}),
Options.Sanitizer(func(span *model.Span) *model.Span { return span }),
Options.QueueSize(10),
Options.DynQueueSizeWarmup(1000),
Options.DynQueueSizeMemory(1024),
Options.PreSave(func(span *model.Span) {}),
Options.PreSave(func(span *model.Span, tenant string) {}),
Options.CollectorTags(map[string]string{"extra": "tags"}),
)
assert.EqualValues(t, 5, opts.numWorkers)
Expand All @@ -59,8 +59,8 @@ func TestNoOptionsSet(t *testing.T) {
assert.Nil(t, opts.collectorTags)
assert.False(t, opts.reportBusy)
assert.False(t, opts.blockingSubmit)
assert.NotPanics(t, func() { opts.preProcessSpans(nil) })
assert.NotPanics(t, func() { opts.preSave(nil) })
assert.NotPanics(t, func() { opts.preProcessSpans(nil, "") })
assert.NotPanics(t, func() { opts.preSave(nil, "") })
assert.True(t, opts.spanFilter(nil))
span := model.Span{}
assert.EqualValues(t, &span, opts.sanitizer(&span))
Expand Down
1 change: 1 addition & 0 deletions cmd/collector/app/processor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var ErrBusy = errors.New("server busy")
type SpansOptions struct {
SpanFormat SpanFormat
InboundTransport InboundTransport
Tenant string
}

// SpanProcessor handles model spans
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/root_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// handleRootSpan returns a function that records throughput for root spans
func handleRootSpan(aggregator strategystore.Aggregator, logger *zap.Logger) ProcessSpan {
return func(span *model.Span) {
return func(span *model.Span, tenant string) {
// TODO simply checking parentId to determine if a span is a root span is not sufficient. However,
// we can be sure that only a root span will have sampler tags.
if span.ParentSpanID() != model.NewSpanID(0) {
Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/root_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,27 @@ func TestHandleRootSpan(t *testing.T) {

// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
processor(span)
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)

// Testing span with service name but no operation
span.References = []model.SpanRef{}
span.Process = &model.Process{
ServiceName: "service",
}
processor(span)
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)

// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = "GET"
processor(span)
processor(span, "")
assert.Equal(t, 0, aggregator.callCount)

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
processor(span)
processor(span, "")
assert.Equal(t, 1, aggregator.callCount)
}
22 changes: 14 additions & 8 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/queue"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -61,6 +62,7 @@ type spanProcessor struct {
type queueItem struct {
queuedTime time.Time
span *model.Span
tenant string
}

// NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans.
Expand Down Expand Up @@ -141,16 +143,19 @@ func (sp *spanProcessor) Close() error {
return nil
}

func (sp *spanProcessor) saveSpan(span *model.Span) {
func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) {
if nil == span.Process {
sp.logger.Error("process is empty for the span")
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span)
return
}

startTime := time.Now()
// TODO context should be propagated from upstream components
if err := sp.spanWriter.WriteSpan(context.TODO(), span); err != nil {
// Since we save spans asynchronously from receiving them, we cannot reuse
// the inbound Context, as it may be cancelled by the time we reach this point,
// so we need to start a new Context.
ctx := storage.WithTenant(context.Background(), tenant)
if err := sp.spanWriter.WriteSpan(ctx, span); err != nil {
sp.logger.Error("Failed to save span", zap.Error(err))
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span)
} else {
Expand All @@ -161,17 +166,17 @@ func (sp *spanProcessor) saveSpan(span *model.Span) {
sp.metrics.SaveLatency.Record(time.Since(startTime))
}

func (sp *spanProcessor) countSpan(span *model.Span) {
func (sp *spanProcessor) countSpan(span *model.Span, tenant string) {
sp.bytesProcessed.Add(uint64(span.Size()))
sp.spansProcessed.Inc()
}

func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options processor.SpansOptions) ([]bool, error) {
sp.preProcessSpans(mSpans)
sp.preProcessSpans(mSpans, options.Tenant)
sp.metrics.BatchSize.Update(int64(len(mSpans)))
retMe := make([]bool, len(mSpans))
for i, mSpan := range mSpans {
ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport)
ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport, options.Tenant)
if !ok && sp.reportBusy {
return nil, processor.ErrBusy
}
Expand All @@ -181,7 +186,7 @@ func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options processor.Sp
}

func (sp *spanProcessor) processItemFromQueue(item *queueItem) {
sp.processSpan(sp.sanitizer(item.span))
sp.processSpan(sp.sanitizer(item.span), item.tenant)
sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime))
}

Expand All @@ -206,7 +211,7 @@ func (sp *spanProcessor) addCollectorTags(span *model.Span) {
typedTags.Sort()
}

func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport) bool {
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport, tenant string) bool {
spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport)
spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span)

Expand All @@ -224,6 +229,7 @@ func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.
item := &queueItem{
queuedTime: time.Now(),
span: span,
tenant: tenant,
}
return sp.queue.Produce(item)
}
Expand Down
42 changes: 39 additions & 3 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"io"
"reflect"
"sync"
"testing"
"time"
Expand All @@ -34,6 +35,7 @@ import (
zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)
Expand Down Expand Up @@ -168,12 +170,21 @@ type fakeSpanWriter struct {
spansLock sync.Mutex
spans []*model.Span
err error
tenants map[string]bool
}

func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
n.spansLock.Lock()
defer n.spansLock.Unlock()
n.spans = append(n.spans, span)

// Record all unique tenants arriving in span Contexts
if n.tenants == nil {
n.tenants = make(map[string]bool)
}

n.tenants[storage.GetTenant(ctx)] = true

return n.err
}

Expand Down Expand Up @@ -331,7 +342,7 @@ func TestSpanProcessorWithNilProcess(t *testing.T) {
p := NewSpanProcessor(w, nil, Options.ServiceMetrics(serviceMetrics)).(*spanProcessor)
defer assert.NoError(t, p.Close())

p.saveSpan(&model.Span{})
p.saveSpan(&model.Span{}, "")

expected := []metricstest.ExpectedMetric{{
Name: "service.spans.saved-by-svc|debug=false|result=err|svc=__unknown", Value: 1,
Expand Down Expand Up @@ -395,7 +406,7 @@ func TestSpanProcessorCountSpan(t *testing.T) {
p := NewSpanProcessor(w, nil, Options.HostMetrics(m), Options.DynQueueSizeMemory(1000)).(*spanProcessor)
p.background(10*time.Millisecond, p.updateGauges)

p.processSpan(&model.Span{})
p.processSpan(&model.Span{}, "")
assert.NotEqual(t, uint64(0), p.bytesProcessed)

for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -550,7 +561,7 @@ func TestAdditionalProcessors(t *testing.T) {

// additional processor is called
count := 0
f := func(s *model.Span) {
f := func(s *model.Span, t string) {
count++
}
p = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1))
Expand All @@ -566,3 +577,28 @@ func TestAdditionalProcessors(t *testing.T) {
assert.NoError(t, p.Close())
assert.Equal(t, 1, count)
}

func TestSpanProcessorContextPropagation(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.QueueSize(1))

dummyTenant := "context-prop-test-tenant"

res, err := p.ProcessSpans([]*model.Span{
{
Process: &model.Process{
ServiceName: "x",
},
},
}, processor.SpansOptions{
Tenant: dummyTenant,
})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
assert.NoError(t, p.Close())

// Verify that the dummy tenant from SpansOptions context made it to writer
assert.Equal(t, true, w.tenants[dummyTenant])
// Verify no other tenantKey context values made it to writer
assert.True(t, reflect.DeepEqual(w.tenants, map[string]bool{dummyTenant: true}))
}
15 changes: 0 additions & 15 deletions storage/empty_test.go

This file was deleted.

43 changes: 43 additions & 0 deletions storage/tenant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import "context"

// tenantKeyType is a custom type for the key "tenant", following context.Context convention
type tenantKeyType string

const (
// tenantKey holds tenancy for spans
tenantKey = tenantKeyType("tenant")
)

// WithTenant creates a Context with a tenant association
func WithTenant(ctx context.Context, tenant string) context.Context {
return context.WithValue(context.Background(), tenantKey, tenant)
}

// GetTenant retrieves a tenant associated with a Context
func GetTenant(ctx context.Context) string {
tenant := ctx.Value(tenantKey)
if tenant == nil {
return ""
}

if s, ok := tenant.(string); ok {
return s
}
return ""
}
Loading

0 comments on commit 483cf1a

Please sign in to comment.