diff --git a/embed/etcd.go b/embed/etcd.go index 2b5cf537384f..314fd0dde5e9 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -100,9 +100,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return } if !serving { - // errored before starting gRPC server for serveCtx.grpcServerC + // errored before starting gRPC server for serveCtx for _, sctx := range e.sctxs { - close(sctx.grpcServerC) + close(sctx.secureGrpcServerC) + close(sctx.insecureGrpcServerC) } } e.Close() @@ -222,15 +223,14 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) - for _, sctx := range e.sctxs { - for gs := range sctx.grpcServerC { - e.stopGRPCServer(gs) - } + reqTimeout := 2 * time.Second + if e.Server != nil { + reqTimeout = e.Server.Cfg.ReqTimeout() } - for _, sctx := range e.sctxs { - sctx.cancel() + teardownServeCtx(sctx, reqTimeout) } + for i := range e.Clients { if e.Clients[i] != nil { e.Clients[i].Close() diff --git a/embed/serve.go b/embed/serve.go index 12af13cb8845..235eda6bd023 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -21,6 +21,7 @@ import ( "net" "net/http" "strings" + "time" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3client" @@ -54,13 +55,20 @@ type serveCtx struct { userHandlers map[string]http.Handler serviceRegister func(*grpc.Server) - grpcServerC chan *grpc.Server + + secureHTTPServer *http.Server + secureGrpcServerC chan *grpc.Server + insecureGrpcServerC chan *grpc.Server } func newServeCtx() *serveCtx { ctx, cancel := context.WithCancel(context.Background()) - return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler), - grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true + return &serveCtx{ + ctx: ctx, + cancel: cancel, + userHandlers: make(map[string]http.Handler), + secureGrpcServerC: make(chan *grpc.Server, 1), + insecureGrpcServerC: make(chan *grpc.Server, 1), } } @@ -84,7 +92,7 @@ func (sctx *serveCtx) serve( if sctx.insecure { gs := v3rpc.Server(s, nil, gopts...) - sctx.grpcServerC <- gs + sctx.insecureGrpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -118,7 +126,7 @@ func (sctx *serveCtx) serve( return tlsErr } gs := v3rpc.Server(s, tlscfg, gopts...) - sctx.grpcServerC <- gs + sctx.secureGrpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -149,11 +157,13 @@ func (sctx *serveCtx) serve( ErrorLog: logger, // do not log user error } go func() { errHandler(srv.Serve(tlsl)) }() + sctx.secureHTTPServer = srv plog.Infof("serving client requests on %s", sctx.l.Addr().String()) } - close(sctx.grpcServerC) + close(sctx.secureGrpcServerC) + close(sctx.insecureGrpcServerC) return m.Serve() } @@ -269,3 +279,54 @@ func (sctx *serveCtx) registerTrace() { evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) } sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf)) } + +// Attempt to gracefully tear down gRPC server(s) and any associated mechanisms +func teardownServeCtx(sctx *serveCtx, timeout time.Duration) { + if sctx.secure && len(sctx.secureGrpcServerC) > 0 { + gs := <-sctx.secureGrpcServerC + stopSecureServer(gs, sctx.secureHTTPServer, timeout) + } + + if sctx.insecure && len(sctx.insecureGrpcServerC) > 0 { + gs := <-sctx.insecureGrpcServerC + stopInsecureServer(gs, timeout) + } + + // Close any open gRPC connections + sctx.cancel() +} + +// When using grpc's ServerHandlerTransport we are responsible for gracefully +// stopping connections and shutting down. +// https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 +func stopSecureServer(gs *grpc.Server, httpSrv *http.Server, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Stop accepting new connections await pending handlers + httpSrv.Shutdown(ctx) + + // Teardown gRPC server + gs.Stop() +} + +// Gracefully shutdown gRPC server when using HTTP2 transport. +func stopInsecureServer(gs *grpc.Server, timeout time.Duration) { + ch := make(chan struct{}) + go func() { + defer close(ch) + // close listeners to stop accepting new connections, + // will block on any existing transports + gs.GracefulStop() + }() + // wait until all pending RPCs are finished + select { + case <-ch: + case <-time.After(timeout): + // took too long, manually close open transports + // e.g. watch streams + gs.Stop() + // concurrent GracefulStop should be interrupted + <-ch + } +}