diff --git a/cmd/query/main.go b/cmd/query/main.go index 51c374637d8..af6b6116378 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -163,6 +163,8 @@ func main() { storageFactory.AddFlags, app.AddFlags, metricsReaderFactory.AddFlags, + // add tenancy flags here to avoid panic caused by double registration in all-in-one + tenancy.AddFlags, ) if err := command.Execute(); err != nil { diff --git a/pkg/tenancy/grpc.go b/pkg/tenancy/grpc.go index 1bcf4705ed7..9b2ad64c4a0 100644 --- a/pkg/tenancy/grpc.go +++ b/pkg/tenancy/grpc.go @@ -130,3 +130,20 @@ func NewClientUnaryInterceptor(tc *Manager) grpc.UnaryClientInterceptor { return invoker(ctx, method, req, reply, cc, opts...) }) } + +// NewClientStreamInterceptor injects tenant header into gRPC request metadata. +func NewClientStreamInterceptor(tc *Manager) grpc.StreamClientInterceptor { + return grpc.StreamClientInterceptor(func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + if tenant := GetTenant(ctx); tenant != "" { + ctx = metadata.AppendToOutgoingContext(ctx, tc.Header, tenant) + } + return streamer(ctx, desc, cc, method, opts...) + }) +} diff --git a/pkg/tenancy/grpc_test.go b/pkg/tenancy/grpc_test.go index d2fb69622ce..bea0e66d60e 100644 --- a/pkg/tenancy/grpc_test.go +++ b/pkg/tenancy/grpc_test.go @@ -131,3 +131,23 @@ func TestClientUnaryInterceptor(t *testing.T) { assert.Equal(t, "acme", tenant) assert.Same(t, fakeErr, err) } + +func TestClientStreamInterceptor(t *testing.T) { + tm := NewManager(&Options{Enabled: true, Tenants: []string{"acme"}}) + interceptor := NewClientStreamInterceptor(tm) + var tenant string + fakeErr := errors.New("foo") + ctx := WithTenant(context.Background(), "acme") + streamer := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + md, ok := metadata.FromOutgoingContext(ctx) + assert.True(t, ok) + ten, err := tenantFromMetadata(md, tm.Header) + require.NoError(t, err) + tenant = ten + return nil, fakeErr + } + stream, err := interceptor(ctx, &grpc.StreamDesc{}, nil, "", streamer) + assert.Same(t, fakeErr, err) + require.Nil(t, stream) + assert.Equal(t, "acme", tenant) +} diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index b3a84873cf0..c5cc8154a4f 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" ) @@ -44,6 +45,7 @@ type Configuration struct { RemoteServerAddr string `yaml:"server" mapstructure:"server"` RemoteTLS tlscfg.Options RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"` + TenancyOpts tenancy.Options pluginHealthCheck *time.Ticker pluginHealthCheckDone chan bool @@ -99,6 +101,12 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout) defer cancel() + + tenancyMgr := tenancy.NewManager(&c.TenancyOpts) + if tenancyMgr.Enabled { + opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) + opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) + } conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...) if err != nil { return nil, fmt.Errorf("error connecting to remote storage: %w", err) @@ -116,9 +124,19 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, } func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices, error) { + opts := []grpc.DialOption{ + grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())), + grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())), + } + + tenancyMgr := tenancy.NewManager(&c.TenancyOpts) + if tenancyMgr.Enabled { + opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) + opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) + } + // #nosec G204 cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile) - client := plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: shared.Handshake, VersionedPlugins: map[int]plugin.PluginSet{ @@ -129,10 +147,7 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices, Logger: hclog.New(&hclog.LoggerOptions{ Level: hclog.LevelFromString(c.PluginLogLevel), }), - GRPCDialOptions: []grpc.DialOption{ - grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())), - grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())), - }, + GRPCDialOptions: opts, }) runtime.SetFinalizer(client, func(c *plugin.Client) { diff --git a/plugin/storage/grpc/options.go b/plugin/storage/grpc/options.go index e1be3eaf46a..f74f28bbc97 100644 --- a/plugin/storage/grpc/options.go +++ b/plugin/storage/grpc/options.go @@ -22,6 +22,7 @@ import ( "github.com/spf13/viper" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" ) @@ -71,5 +72,6 @@ func (opt *Options) InitFromViper(v *viper.Viper) error { return fmt.Errorf("failed to parse gRPC storage TLS options: %w", err) } opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout) + opt.Configuration.TenancyOpts = tenancy.InitFromViper(v) return nil } diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go index d68032e9022..a2c0112657b 100644 --- a/plugin/storage/grpc/options_test.go +++ b/plugin/storage/grpc/options_test.go @@ -22,15 +22,17 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/pkg/tenancy" ) func TestOptionsWithFlags(t *testing.T) { opts := &Options{} - v, command := config.Viperize(opts.AddFlags) + v, command := config.Viperize(opts.AddFlags, tenancy.AddFlags) err := command.ParseFlags([]string{ "--grpc-storage-plugin.binary=noop-grpc-plugin", "--grpc-storage-plugin.configuration-file=config.json", "--grpc-storage-plugin.log-level=debug", + "--multi-tenancy.header=x-scope-orgid", }) assert.NoError(t, err) opts.InitFromViper(v) @@ -38,6 +40,8 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, opts.Configuration.PluginBinary, "noop-grpc-plugin") assert.Equal(t, opts.Configuration.PluginConfigurationFile, "config.json") assert.Equal(t, opts.Configuration.PluginLogLevel, "debug") + assert.Equal(t, false, opts.Configuration.TenancyOpts.Enabled) + assert.Equal(t, "x-scope-orgid", opts.Configuration.TenancyOpts.Header) } func TestRemoteOptionsWithFlags(t *testing.T) {