Skip to content

Commit

Permalink
Merge pull request #890 from fabiolb/feature/make_grpc_sizes_configur…
Browse files Browse the repository at this point in the history
…able

add configurable grpc message sizes to #632
  • Loading branch information
nathanejohnson authored Jul 15, 2022
2 parents 03f0889 + 48d1757 commit 0422e81
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 8 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type Proxy struct {
RequestID string
STSHeader STSHeader
AuthSchemes map[string]AuthScheme
GRPCMaxRxMsgSize int
GRPCMaxTxMsgSize int
}

type STSHeader struct {
Expand Down
2 changes: 2 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var defaultConfig = &Config{
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
},
Registry: Registry{
Backend: "consul",
Expand Down
2 changes: 2 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.IntVar(&cfg.Proxy.STSHeader.MaxAge, "proxy.header.sts.maxage", defaultConfig.Proxy.STSHeader.MaxAge, "enable and set the max-age value for HSTS")
f.BoolVar(&cfg.Proxy.STSHeader.Subdomains, "proxy.header.sts.subdomains", defaultConfig.Proxy.STSHeader.Subdomains, "direct HSTS to include subdomains")
f.BoolVar(&cfg.Proxy.STSHeader.Preload, "proxy.header.sts.preload", defaultConfig.Proxy.STSHeader.Preload, "direct HSTS to pass the preload directive")
f.IntVar(&cfg.Proxy.GRPCMaxRxMsgSize, "proxy.grpcmaxrxmsgsize", defaultConfig.Proxy.GRPCMaxRxMsgSize, "max grpc receive message size (in bytes)")
f.IntVar(&cfg.Proxy.GRPCMaxTxMsgSize, "proxy.grpcmaxtxmsgsize", defaultConfig.Proxy.GRPCMaxTxMsgSize, "max grpc transmit message size (in bytes)")
f.StringVar(&gzipContentTypesValue, "proxy.gzip.contenttype", defaultValues.GZIPContentTypesValue, "regexp of content types to compress")
f.StringVar(&listenerValue, "proxy.addr", defaultValues.ListenerValue, "listener config")
f.StringVar(&certSourcesValue, "proxy.cs", defaultValues.CertSourcesValue, "certificate sources")
Expand Down
11 changes: 10 additions & 1 deletion fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,16 @@
#
# proxy.auth = name=mybasicauth;type=basic;file=p/creds.htpasswd
# name=myotherauth;type=basic;file=p/other-creds.htpasswd;realm=myrealm

#
#
# proxy.grpcmaxrxmsgsize configures the grpc max receive message size in bytes.
# The default is
# proxy.grpcmaxrxmsgsize = 4194304
#
# proxy.grpcmaxtxmsgsize configures the grpc max transmit messsage size in bytes
# The default is
# proxy.grpcmaxtxmsgsize = 4194304
#

# log.access.format configures the format of the access log.
#
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config, statsHandler *proxy.Gr
GlobCache: globCache,
}

handler := grpc_proxy.TransparentHandler(proxy.GetGRPCDirector(tlscfg))
handler := grpc_proxy.TransparentHandler(proxy.GetGRPCDirector(tlscfg, cfg))

return []grpc.ServerOption{
grpc.CustomCodec(grpc_proxy.Codec()),
grpc.UnknownServiceHandler(handler),
grpc.StreamInterceptor(proxyInterceptor.Stream),
grpc.StatsHandler(statsHandler),
grpc.MaxRecvMsgSize(1024 * 1024 * 1024),
grpc.MaxSendMsgSize(1024 * 1024 * 1024),
grpc.MaxRecvMsgSize(cfg.Proxy.GRPCMaxRxMsgSize),
grpc.MaxSendMsgSize(cfg.Proxy.GRPCMaxTxMsgSize),
}
}

Expand Down
10 changes: 6 additions & 4 deletions proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func (s *gRPCServer) Serve(lis net.Listener) error {
return s.server.Serve(lis)
}

func GetGRPCDirector(tlscfg *tls.Config) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
func GetGRPCDirector(tlscfg *tls.Config, cfg *config.Config) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {

connectionPool := newGrpcConnectionPool(tlscfg)
connectionPool := newGrpcConnectionPool(tlscfg, cfg)

return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx)
Expand Down Expand Up @@ -230,14 +230,16 @@ type grpcConnectionPool struct {
lock sync.RWMutex
cleanupInterval time.Duration
tlscfg *tls.Config
cfg *config.Config
}

func newGrpcConnectionPool(tlscfg *tls.Config) *grpcConnectionPool {
func newGrpcConnectionPool(tlscfg *tls.Config, cfg *config.Config) *grpcConnectionPool {
cp := &grpcConnectionPool{
connections: make(map[string]*grpc.ClientConn),
lock: sync.RWMutex{},
cleanupInterval: time.Second * 5,
tlscfg: tlscfg,
cfg: cfg,
}

go cp.cleanup()
Expand All @@ -259,7 +261,7 @@ func (p *grpcConnectionPool) Get(ctx context.Context, target *route.Target) (*gr

func (p *grpcConnectionPool) newConnection(ctx context.Context, target *route.Target) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(grpc_proxy.Codec()), grpc.MaxCallRecvMsgSize(1024*1024*1024)),
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(grpc_proxy.Codec()), grpc.MaxCallRecvMsgSize(p.cfg.Proxy.GRPCMaxRxMsgSize)),
}

if target.URL.Scheme == "grpcs" && p.tlscfg != nil {
Expand Down

0 comments on commit 0422e81

Please sign in to comment.