Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI Option for gRPC Max Receive Message Size #3214

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions cmd/collector/app/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import (
)

const (
collectorDynQueueSizeMemory = "collector.queue-size-memory"
collectorGRPCHostPort = "collector.grpc-server.host-port"
collectorHTTPHostPort = "collector.http-server.host-port"
collectorNumWorkers = "collector.num-workers"
collectorQueueSize = "collector.queue-size"
collectorTags = "collector.tags"
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins"
collectorZipkinHTTPHostPort = "collector.zipkin.host-port"
collectorDynQueueSizeMemory = "collector.queue-size-memory"
collectorGRPCHostPort = "collector.grpc-server.host-port"
collectorHTTPHostPort = "collector.http-server.host-port"
collectorNumWorkers = "collector.num-workers"
collectorQueueSize = "collector.queue-size"
collectorTags = "collector.tags"
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins"
collectorZipkinHTTPHostPort = "collector.zipkin.host-port"
collectorGRPCMaxReceiveMessageLength = "collector.grpc-server.max-message-size"
)

var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Expand Down Expand Up @@ -69,12 +70,15 @@ type CollectorOptions struct {
CollectorZipkinAllowedOrigins string
// CollectorZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests
CollectorZipkinAllowedHeaders string
// CollectorGRPCMaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector.
CollectorGRPCMaxReceiveMessageLength int
}

// AddFlags adds flags for CollectorOptions
func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector")
flags.Int(collectorGRPCMaxReceiveMessageLength, DefaultGRPCMaxReceiveMessageLength, "The maximum receivable message size for the collector's GRPC server")
flags.String(collectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:14250 or :14250) of the collector's GRPC server")
flags.String(collectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:14268 or :14268) of the collector's HTTP server")
flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}")
Expand All @@ -100,6 +104,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.QueueSize = v.GetInt(collectorQueueSize)
cOpts.TLSGRPC = tlsGRPCFlagsConfig.InitFromViper(v)
cOpts.TLSHTTP = tlsHTTPFlagsConfig.InitFromViper(v)
cOpts.CollectorGRPCMaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength)

return cOpts
}
11 changes: 11 additions & 0 deletions cmd/collector/app/builder_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,14 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) {
assert.Equal(t, "127.0.0.1:1234", c.CollectorGRPCHostPort)
assert.Equal(t, "0.0.0.0:3456", c.CollectorZipkinHTTPHostPort)
}

func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--collector.grpc-server.max-message-size=8388608",
})
c.InitFromViper(v)

assert.Equal(t, 8388608, c.CollectorGRPCMaxReceiveMessageLength)
}
11 changes: 6 additions & 5 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: builderOpts.CollectorGRPCHostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLSGRPC,
SamplingStore: c.strategyStore,
Logger: c.logger,
HostPort: builderOpts.CollectorGRPCHostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLSGRPC,
SamplingStore: c.strategyStore,
Logger: c.logger,
MaxReceiveMessageLength: builderOpts.CollectorGRPCMaxReceiveMessageLength,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
DefaultNumWorkers = 50
// DefaultQueueSize is the size of the processor's queue
DefaultQueueSize = 2000
// DefaultGRPCMaxReceiveMessageLength is the default max receivable message size for the gRPC Collector
DefaultGRPCMaxReceiveMessageLength = 4 * 1024 * 1024
)

type options struct {
Expand Down
22 changes: 14 additions & 8 deletions cmd/collector/app/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,21 @@ import (

// GRPCServerParams to construct a new Jaeger Collector gRPC Server
type GRPCServerParams struct {
TLSConfig tlscfg.Options
HostPort string
Handler *handler.GRPCHandler
SamplingStore strategystore.StrategyStore
Logger *zap.Logger
OnError func(error)
TLSConfig tlscfg.Options
HostPort string
Handler *handler.GRPCHandler
SamplingStore strategystore.StrategyStore
Logger *zap.Logger
OnError func(error)
MaxReceiveMessageLength int
}

// StartGRPCServer based on the given parameters
func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {
var server *grpc.Server
var grpcOpts []grpc.ServerOption

grpcOpts = append(grpcOpts, grpc.MaxRecvMsgSize(params.MaxReceiveMessageLength))

if params.TLSConfig.Enabled {
// user requested a server with TLS, setup creds
Expand All @@ -51,10 +55,12 @@ func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {
}

creds := credentials.NewTLS(tlsCfg)
server = grpc.NewServer(grpc.Creds(creds))
grpcOpts = append(grpcOpts, grpc.Creds(creds))

server = grpc.NewServer(grpcOpts...)
} else {
// server without TLS
server = grpc.NewServer()
server = grpc.NewServer(grpcOpts...)
}

listener, err := net.Listen("tcp", params.HostPort)
Expand Down
21 changes: 21 additions & 0 deletions cmd/collector/app/server/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/test/bufconn"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand Down Expand Up @@ -92,3 +93,23 @@ func TestSpanCollector(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, response)
}

func TestCollectorStartWithTLS(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &GRPCServerParams{
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
Logger: logger,
MaxReceiveMessageLength: 8 * 1024 * 1024,
TLSConfig: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
},
}

server, err := StartGRPCServer(params)
require.NoError(t, err)
defer server.Stop()
}