From 77da6ff5a3dcff4723ae866cceb4a35d0ef817ba Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 14 Apr 2017 09:24:52 -0700 Subject: [PATCH] embed: gracefully shut down gRPC server Fix https://github.com/coreos/etcd/issues/7322. Signed-off-by: Gyu-Ho Lee --- embed/etcd.go | 16 ++++++++++++++++ embed/serve.go | 9 ++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/embed/etcd.go b/embed/etcd.go index da1d8f610c3a..ee84347abc05 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -61,6 +61,9 @@ type Etcd struct { sctxs map[string]*serveCtx closeOnce sync.Once + + // OnShutdown is called immediately before releasing etcd server resources. + OnShutdown func() } // StartEtcd launches the etcd server and HTTP handlers for client/server communication. @@ -137,6 +140,18 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if err = e.serve(); err != nil { return } + e.OnShutdown = func() { + // (gRPC server) stops accepting new connections, + // RPCs, and blocks until all pending RPCs are finished + // TODO: drain other requests + for _, sctx := range e.sctxs { + for gs := range sctx.grpcServerC { + plog.Warning("gracefully stopping gRPC server") + gs.GracefulStop() + plog.Warning("gracefully stopped gRPC server") + } + } + } return } @@ -147,6 +162,7 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) + e.OnShutdown() for _, sctx := range e.sctxs { sctx.cancel() diff --git a/embed/serve.go b/embed/serve.go index 46634b7c5f1a..02c093ff81c7 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -52,11 +52,14 @@ type serveCtx struct { userHandlers map[string]http.Handler serviceRegister func(*grpc.Server) + grpcServerC chan *grpc.Server } func newServeCtx() *serveCtx { ctx, cancel := context.WithCancel(context.Background()) - return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler)} + return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler), + grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true + } } // serve accepts incoming connections on the listener l, @@ -72,8 +75,11 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) + defer close(sctx.grpcServerC) + if sctx.insecure { gs := v3rpc.Server(s, nil) + sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -103,6 +109,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle if sctx.secure { gs := v3rpc.Server(s, tlscfg) + sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil {