Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for gRPC-Plugin #5388

Merged
merged 12 commits into from
May 11, 2024
349 changes: 169 additions & 180 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,31 @@
package config

import (
"context"
"fmt"
"os/exec"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)

var pluginHealthCheckInterval = time.Second * 60

// Configuration describes the options to customize the storage behavior.
type Configuration struct {
PluginBinary string `yaml:"binary" mapstructure:"binary"`
PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"`
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options

pluginHealthCheck *time.Ticker
pluginHealthCheckDone chan bool
pluginRPCClient plugin.ClientProtocol
remoteConn *grpc.ClientConn
}
// type Configuration struct {
// PluginBinary string `yaml:"binary" mapstructure:"binary"`
// PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"`
// PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
// RemoteServerAddr string `yaml:"server" mapstructure:"server"`
// RemoteTLS tlscfg.Options
// RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
// TenancyOpts tenancy.Options
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// pluginHealthCheck *time.Ticker
// pluginHealthCheckDone chan bool
// pluginRPCClient plugin.ClientProtocol
// remoteConn *grpc.ClientConn
// }

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
Expand All @@ -73,158 +62,158 @@ type PluginBuilder interface {
}

// Build instantiates a PluginServices
func (c *Configuration) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
if c.PluginBinary != "" {
return c.buildPlugin(logger, tracerProvider)
} else {
return c.buildRemote(logger, tracerProvider)
}
}

func (c *Configuration) Close() error {
if c.pluginHealthCheck != nil {
c.pluginHealthCheck.Stop()
c.pluginHealthCheckDone <- true
}
if c.remoteConn != nil {
c.remoteConn.Close()
}

return c.RemoteTLS.Close()
}

func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))),
grpc.WithBlock(),
}
if c.RemoteTLS.Enabled {
tlsCfg, err := c.RemoteTLS.Config(logger)
if err != nil {
return nil, err
}
creds := credentials.NewTLS(tlsCfg)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout)
defer cancel()

tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
var err error
// TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test
c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
if err != nil {
return nil, fmt.Errorf("error connecting to remote storage: %w", err)
}

grpcClient := shared.NewGRPCClient(c.remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
},
Capabilities: grpcClient,
}, nil
}

func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))),
}

tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: shared.Handshake,
VersionedPlugins: map[int]plugin.PluginSet{
1: shared.PluginMap,
},
Cmd: cmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Logger: hclog.New(&hclog.LoggerOptions{
Level: hclog.LevelFromString(c.PluginLogLevel),
}),
GRPCDialOptions: opts,
})

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
}

raw, err := rpcClient.Dispense(shared.StoragePluginIdentifier)
if err != nil {
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %w", err)
}

// in practice, the type of `raw` is *shared.grpcClient, and type casts below cannot fail
storagePlugin, ok := raw.(shared.StoragePlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.StoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
archiveStoragePlugin, ok := raw.(shared.ArchiveStoragePlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
streamingSpanWriterPlugin, ok := raw.(shared.StreamingSpanWriterPlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.StreamingSpanWriterPlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
capabilities, ok := raw.(shared.PluginCapabilities)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}

if err := c.startPluginHealthCheck(rpcClient, logger); err != nil {
return nil, fmt.Errorf("initial plugin health check failed: %w", err)
}

return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
StreamingSpanWriter: streamingSpanWriterPlugin,
},
Capabilities: capabilities,
killPluginClient: client.Kill,
}, nil
}

func (c *Configuration) startPluginHealthCheck(rpcClient plugin.ClientProtocol, logger *zap.Logger) error {
c.pluginRPCClient = rpcClient
c.pluginHealthCheckDone = make(chan bool)
c.pluginHealthCheck = time.NewTicker(pluginHealthCheckInterval)

go func() {
for {
select {
case <-c.pluginHealthCheckDone:
return
case <-c.pluginHealthCheck.C:
if err := c.pluginRPCClient.Ping(); err != nil {
logger.Fatal("plugin health check failed", zap.Error(err))
}
}
}
}()

return c.pluginRPCClient.Ping()
}
// func (c *Configuration) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
// if c.PluginBinary != "" {
// return c.buildPlugin(logger, tracerProvider)
// } else {
// return c.buildRemote(logger, tracerProvider)
// }
// }

// func (c *Configuration) Close() error {
// if c.pluginHealthCheck != nil {
// c.pluginHealthCheck.Stop()
// c.pluginHealthCheckDone <- true
// }
// if c.remoteConn != nil {
// c.remoteConn.Close()
// }

// return c.RemoteTLS.Close()
// }

// func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
// opts := []grpc.DialOption{
// grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))),
// grpc.WithBlock(),
// }
// if c.RemoteTLS.Enabled {
// tlsCfg, err := c.RemoteTLS.Config(logger)
// if err != nil {
// return nil, err
// }
// creds := credentials.NewTLS(tlsCfg)
// opts = append(opts, grpc.WithTransportCredentials(creds))
// } else {
// opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
// }

// ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout)
// defer cancel()

// tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
// if tenancyMgr.Enabled {
// opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
// opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
// }
// var err error
// // TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test
// c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
// if err != nil {
// return nil, fmt.Errorf("error connecting to remote storage: %w", err)
// }

// grpcClient := shared.NewGRPCClient(c.remoteConn)
// return &ClientPluginServices{
// PluginServices: shared.PluginServices{
// Store: grpcClient,
// ArchiveStore: grpcClient,
// StreamingSpanWriter: grpcClient,
// },
// Capabilities: grpcClient,
// }, nil
// }

// func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
// opts := []grpc.DialOption{
// grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))),
// }

// tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
// if tenancyMgr.Enabled {
// opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
// opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
// }

// // #nosec G204
// cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)
// client := plugin.NewClient(&plugin.ClientConfig{
// HandshakeConfig: shared.Handshake,
// VersionedPlugins: map[int]plugin.PluginSet{
// 1: shared.PluginMap,
// },
// Cmd: cmd,
// AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
// Logger: hclog.New(&hclog.LoggerOptions{
// Level: hclog.LevelFromString(c.PluginLogLevel),
// }),
// GRPCDialOptions: opts,
// })

// rpcClient, err := client.Client()
// if err != nil {
// return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
// }

// raw, err := rpcClient.Dispense(shared.StoragePluginIdentifier)
// if err != nil {
// return nil, fmt.Errorf("unable to retrieve storage plugin instance: %w", err)
// }

// // in practice, the type of `raw` is *shared.grpcClient, and type casts below cannot fail
// storagePlugin, ok := raw.(shared.StoragePlugin)
// if !ok {
// return nil, fmt.Errorf("unable to cast %T to shared.StoragePlugin for plugin \"%s\"",
// raw, shared.StoragePluginIdentifier)
// }
// archiveStoragePlugin, ok := raw.(shared.ArchiveStoragePlugin)
// if !ok {
// return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
// raw, shared.StoragePluginIdentifier)
// }
// streamingSpanWriterPlugin, ok := raw.(shared.StreamingSpanWriterPlugin)
// if !ok {
// return nil, fmt.Errorf("unable to cast %T to shared.StreamingSpanWriterPlugin for plugin \"%s\"",
// raw, shared.StoragePluginIdentifier)
// }
// capabilities, ok := raw.(shared.PluginCapabilities)
// if !ok {
// return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
// raw, shared.StoragePluginIdentifier)
// }

// if err := c.startPluginHealthCheck(rpcClient, logger); err != nil {
// return nil, fmt.Errorf("initial plugin health check failed: %w", err)
// }

// return &ClientPluginServices{
// PluginServices: shared.PluginServices{
// Store: storagePlugin,
// ArchiveStore: archiveStoragePlugin,
// StreamingSpanWriter: streamingSpanWriterPlugin,
// },
// Capabilities: capabilities,
// killPluginClient: client.Kill,
// }, nil
// }

// func (c *Configuration) startPluginHealthCheck(rpcClient plugin.ClientProtocol, logger *zap.Logger) error {
// c.pluginRPCClient = rpcClient
// c.pluginHealthCheckDone = make(chan bool)
// c.pluginHealthCheck = time.NewTicker(pluginHealthCheckInterval)

// go func() {
// for {
// select {
// case <-c.pluginHealthCheckDone:
// return
// case <-c.pluginHealthCheck.C:
// if err := c.pluginRPCClient.Ping(); err != nil {
// logger.Fatal("plugin health check failed", zap.Error(err))
// }
// }
// }
// }()

// return c.pluginRPCClient.Ping()
// }
Loading
Loading