Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gracefully shutdown master, pserver, fix gometalinter errors #3062

Merged
merged 4 commits into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions go/cmd/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"net"
"net/http"
"net/rpc"
"os"
"os/signal"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -68,6 +70,20 @@ func main() {
store = &master.InMemStore{}
}

shutdown := func() {
log.Infoln("shutting down gracefully")
err := store.Shutdown()
if err != nil {
log.Errorln(err)
}
}

// Guaranteed to run even panic happens.
defer shutdown()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
if err != nil {
log.Fatal(err)
Expand All @@ -84,8 +100,12 @@ func main() {
log.Fatal(err)
}

err = http.Serve(l, nil)
if err != nil {
log.Fatal(err)
}
go func() {
err = http.Serve(l, nil)
if err != nil {
log.Fatal(err)
}
}()

<-c
}
31 changes: 26 additions & 5 deletions go/cmd/pserver/pserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"net"
"net/http"
"net/rpc"
"os"
"os/signal"
"strconv"
"time"

Expand All @@ -33,7 +35,8 @@ func main() {
index := flag.Int("index", -1, "index of this pserver, should be larger or equal than 0")
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd")
etcdTimeout := flag.Duration("etcd-timeout", 5*time.Second, "timeout for etcd calls")
dialTimeout := flag.Duration("dial-timeout", 5*time.Second, "dial timeout")
etcdTTL := flag.Int("etcd-ttl", 5, "etcd time to live in seconds")
numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job")
checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
Expand All @@ -53,7 +56,7 @@ func main() {
if *index >= 0 {
idx = *index
} else {
e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *etcdTimeout)
e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *dialTimeout, *etcdTTL)
idx, err = e.Register(*port)
candy.Must(err)

Expand All @@ -67,6 +70,20 @@ func main() {
}
}

shutdown := func() {
log.Infoln("shutting down gracefully")
sErr := e.Shutdown()
if sErr != nil {
log.Errorln(sErr)
}
}

// Guaranteed to run even panic happens.
defer shutdown()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

s, err := pserver.NewService(idx, *checkpointInterval, *checkpointPath, e, cp)
candy.Must(err)

Expand All @@ -77,7 +94,11 @@ func main() {
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
candy.Must(err)

log.Infof("start pserver at port %d", *port)
err = http.Serve(l, nil)
candy.Must(err)
go func() {
log.Infof("start pserver at port %d", *port)
err = http.Serve(l, nil)
candy.Must(err)
}()

<-c
}
25 changes: 19 additions & 6 deletions go/master/etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@ type EtcdClient struct {
statePath string
client *clientv3.Client
lock *concurrency.Mutex
sess *concurrency.Session
}

// NewEtcdClient creates a new EtcdClient.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below "TODO" comments can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) {
log.Debugf("Connecting to etcd at %v", endpoints)
// TODO(helin): gracefully shutdown etcd store. Because etcd
// store holds a etcd lock, even though the lock will expire
// when the lease timeout, we need to implement graceful
// shutdown to release the lock.
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
Expand All @@ -67,12 +64,12 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
// one master running, but split-brain problem may cause
// multiple master servers running), and the cluster management
// software will kill one of them.
log.Debugf("Trying to acquire lock at %s.", lockPath)
log.Infof("Trying to acquire lock at %s.", lockPath)
err = lock.Lock(context.TODO())
if err != nil {
return nil, err
}
log.Debugf("Successfully acquired lock at %s.", lockPath)
log.Infof("Successfully acquired lock at %s.", lockPath)

put := clientv3.OpPut(addrPath, addr)
resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit()
Expand All @@ -89,6 +86,7 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
statePath: statePath,
client: cli,
lock: lock,
sess: sess,
}

return e, nil
Expand Down Expand Up @@ -157,6 +155,21 @@ func (e *EtcdClient) Load() ([]byte, error) {
return state, nil
}

// Shutdown shuts down the etcd client gracefully.
func (e *EtcdClient) Shutdown() error {
err := e.sess.Close()
newErr := e.client.Close()
if newErr != nil {
if err == nil {
err = newErr
} else {
log.Errorln(newErr)
}
}

return err
}

// GetKey gets the value by the specify key.
func GetKey(c *clientv3.Client, key string, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand Down
5 changes: 5 additions & 0 deletions go/master/inmem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,8 @@ func (m *InMemStore) Load() ([]byte, error) {

return m.buf, nil
}

// Shutdown shuts down the in mem store.
func (m *InMemStore) Shutdown() error {
return nil
}
1 change: 1 addition & 0 deletions go/master/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var ErrPassAfter = errors.New("pass number larger than master")
type Store interface {
Save([]byte) error
Load() ([]byte, error)
Shutdown() error
}

// Chunk is a chunk of data consisted of several data instances.
Expand Down
6 changes: 3 additions & 3 deletions go/pserver/client/c/cclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ var curHandle C.paddle_pserver_client
func add(c *client.Client) C.paddle_pserver_client {
mu.Lock()
defer mu.Unlock()
client := curHandle
cli := curHandle
curHandle++
handleMap[client] = c
return client
handleMap[cli] = c
return cli
}

func get(client C.paddle_pserver_client) *client.Client {
Expand Down
Loading