Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: configure server keepalive #8535

Merged
merged 3 commits into from
Sep 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 47 additions & 24 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ const (
ClusterStateFlagNew = "new"
ClusterStateFlagExisting = "existing"

DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
Expand Down Expand Up @@ -93,6 +96,23 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// gRPC server options

// GRPCKeepAliveMinTime is the minimum interval that a client should
// wait before pinging server. When client pings "too fast", server
// sends goaway and closes the connection (errors: too_many_pings,
// http2.ErrCodeEnhanceYourCalm). When too slow, nothing happens.
// Server expects client pings only when there is any active streams
// (PermitWithoutStream is set false).
GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"`
// GRPCKeepAliveInterval is the frequency of server-to-client ping
// to check if a connection is alive. Close a non-responsive connection
// after an additional duration of Timeout. 0 to disable.
GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"`
// GRPCKeepAliveTimeout is the additional duration of wait
// before closing a non-responsive connection. 0 to disable.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`

// clustering

APUrls, ACUrls []url.URL
Expand Down Expand Up @@ -181,25 +201,28 @@ func NewConfig() *Config {
lcurl, _ := url.Parse(DefaultListenClientURLs)
acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
cfg := &Config{
CorsInfo: &cors.CORSInfo{},
MaxSnapFiles: DefaultMaxSnapshots,
MaxWalFiles: DefaultMaxWALs,
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
LCUrls: []url.URL{*lcurl},
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
StrictReconfigCheck: true,
Metrics: "basic",
EnableV2: true,
AuthToken: "simple",
CorsInfo: &cors.CORSInfo{},
MaxSnapFiles: DefaultMaxSnapshots,
MaxWalFiles: DefaultMaxWALs,
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
LCUrls: []url.URL{*lcurl},
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
StrictReconfigCheck: true,
Metrics: "basic",
EnableV2: true,
AuthToken: "simple",
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
Expand Down
17 changes: 16 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
Expand Down Expand Up @@ -424,9 +425,23 @@ func (e *Etcd) serve() (err error) {
}
h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})

gopts := []grpc.ServerOption{}
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: e.cfg.GRPCKeepAliveMinTime,
PermitWithoutStream: false,
}))
}
if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
Time: e.cfg.GRPCKeepAliveInterval,
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler))
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
}(sctx)
}

Expand Down
11 changes: 8 additions & 3 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ func newServeCtx() *serveCtx {
// serve accepts incoming connections on the listener l,
// creating a new service goroutine for each. The service goroutines
// read requests and then call handler to reply to them.
func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo, handler http.Handler, errHandler func(error)) error {
func (sctx *serveCtx) serve(
s *etcdserver.EtcdServer,
tlsinfo *transport.TLSInfo,
handler http.Handler,
errHandler func(error),
gopts ...grpc.ServerOption) error {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify()
plog.Info("ready to serve client requests")
Expand All @@ -77,7 +82,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo
servLock := v3lock.NewLockServer(v3c)

if sctx.insecure {
gs := v3rpc.Server(s, nil)
gs := v3rpc.Server(s, nil, gopts...)
sctx.grpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
Expand Down Expand Up @@ -111,7 +116,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo
if tlsErr != nil {
return tlsErr
}
gs := v3rpc.Server(s, tlscfg)
gs := v3rpc.Server(s, tlscfg, gopts...)
sctx.grpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
Expand Down
3 changes: 3 additions & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func newConfig() *config {
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.")
fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.")
fs.DurationVar(&cfg.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.Config.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
fs.DurationVar(&cfg.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.Config.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
fs.DurationVar(&cfg.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.Config.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")

// clustering
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
Expand Down
6 changes: 6 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ member flags:
maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
maximum client request size in bytes the server will accept.
--grpc-keepalive-min-time '5s'
minimum duration interval that a client should wait before pinging server.
--grpc-keepalive-interval '2h'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2h seems too high. probably 5 minutes or something? is there a recommended vault from gRPC-go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gRPC Go defaults to 2h. Should etcd use 5min?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2h is fine if it is gRPC's default.

frequency duration of server-to-client ping to check if a connection is alive (0 to disable).
--grpc-keepalive-timeout '20s'
additional duration of wait before closing a non-responsive connection (0 to disable).

clustering flags:

Expand Down
4 changes: 2 additions & 2 deletions etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func init() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
}

func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
Expand All @@ -50,7 +50,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
grpcServer := grpc.NewServer(opts...)
grpcServer := grpc.NewServer(append(opts, gopts...)...)

pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
Expand Down