Skip to content

Commit

Permalink
Merge pull request #7603 from tamird/quit-proper
Browse files Browse the repository at this point in the history
cli: quit command waits for server shutdown
  • Loading branch information
tamird authored Jul 5, 2016
2 parents 02d6a29 + aa9a13c commit 96de84d
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 155 deletions.
51 changes: 43 additions & 8 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/server/serverpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util/envutil"
"github.com/cockroachdb/cockroach/util/grpcutil"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/sdnotify"
Expand Down Expand Up @@ -494,19 +495,53 @@ func runQuit(_ *cobra.Command, _ []string) error {

ctx := stopperContext(stopper)

if _, err := c.Drain(ctx, &serverpb.DrainRequest{
On: onModes,
Shutdown: true,
}); err != nil {
waitForShutdown := func(stream serverpb.Admin_DrainClient) error {
for {
if _, err := stream.Recv(); !grpcutil.IsClosedConnection(err) {
return err
}
return nil
}
}

{
stream, err := c.Drain(ctx, &serverpb.DrainRequest{
On: onModes,
Shutdown: true,
})
if err != nil {
return err
}
_, err = stream.Recv()
if err == nil {
err = waitForShutdown(stream)
if err == nil {
fmt.Println("ok")
}
return err
}
fmt.Printf("graceful shutdown failed, proceeding with hard shutdown: %v\n", err)
if _, err := c.Drain(ctx, &serverpb.DrainRequest{
}

{
// Not passing drain modes so the server doesn't bother and goes
// straight to shutdown.
stream, err := c.Drain(ctx, &serverpb.DrainRequest{
Shutdown: true,
}); err != nil {
})
if err != nil {
return err
}
_, err = stream.Recv()
if err == nil {
err = waitForShutdown(stream)
if err == nil {
fmt.Println("ok")
}
return err
}
return fmt.Errorf("hard shutdown failed: %v", err)
}
fmt.Println("ok")
return nil
}

// freezeClusterCmd command issues a cluster-wide freeze.
Expand Down
30 changes: 20 additions & 10 deletions server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (s *adminServer) Health(ctx context.Context, req *serverpb.HealthRequest) (
return &serverpb.HealthResponse{}, nil
}

func (s *adminServer) Drain(ctx context.Context, req *serverpb.DrainRequest) (*serverpb.DrainResponse, error) {
func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_DrainServer) error {
on := make([]serverpb.DrainMode, len(req.On))
for i := range req.On {
on[i] = serverpb.DrainMode(req.On[i])
Expand All @@ -668,21 +668,31 @@ func (s *adminServer) Drain(ctx context.Context, req *serverpb.DrainRequest) (*s

nowOn, err := s.server.Drain(on)
if err != nil {
return nil, err
return err
}

nowOnInts := make([]int32, len(nowOn))
res := serverpb.DrainResponse{
On: make([]int32, len(nowOn)),
}
for i := range nowOn {
nowOnInts[i] = int32(nowOn[i])
res.On[i] = int32(nowOn[i])
}
if err := stream.Send(&res); err != nil {
return err
}

if req.Shutdown {
s.server.stopper.Quiesce()
go func() {
time.Sleep(50 * time.Millisecond)
s.server.stopper.Stop()
}()
go s.server.stopper.Stop()
}

ctx := stream.Context()

select {
case <-s.server.stopper.IsStopped():
return nil
case <-ctx.Done():
return ctx.Err()
}
return &serverpb.DrainResponse{On: nowOnInts}, nil
}

// waitForStoreFrozen polls the given stores until they all report having no
Expand Down
Loading

0 comments on commit 96de84d

Please sign in to comment.