Skip to content

Commit

Permalink
disable lock on watch status api
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Sep 6, 2018
1 parent cf4801f commit f08e41e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
4 changes: 3 additions & 1 deletion core.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func serve() {
log.Fatalf("[main] %v", err)
}

vibranium := rpc.New(cluster, config)
rpcch := make(chan struct{}, 1)
vibranium := rpc.New(cluster, config, rpcch)
s, err := net.Listen("tcp", config.Bind)
if err != nil {
log.Fatalf("[main] %v", err)
Expand Down Expand Up @@ -93,6 +94,7 @@ func serve() {
signal.Notify(sigs, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM)
sig := <-sigs
log.Infof("[main] Get signal %v.", sig)
close(rpcch)
grpcServer.GracefulStop()
log.Info("[main] gRPC server gracefully stopped.")

Expand Down
45 changes: 26 additions & 19 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Vibranium struct {
cluster cluster.Cluster
config types.Config
counter sync.WaitGroup
rpcch chan struct{}
TaskNum int
}

Expand Down Expand Up @@ -306,28 +307,34 @@ func (v *Vibranium) RemoveImage(opts *pb.RemoveImageOptions, stream pb.CoreRPC_R
}

// DeployStatus watch and show deployed status
// TODO should close channels when ctrl+c send
func (v *Vibranium) DeployStatus(opts *pb.DeployStatusOptions, stream pb.CoreRPC_DeployStatusServer) error {
v.taskAdd("DeployStatus", true)
defer v.taskDone("DeployStatus", true)
log.Infof("[rpc] DeployStatus start %s", opts.Appname)
defer log.Infof("[rpc] DeployStatus stop %s", opts.Appname)

ch := v.cluster.DeployStatusStream(stream.Context(), opts.Appname, opts.Entrypoint, opts.Nodename)
for m := range ch {
if m.Err != nil {
return m.Err
}
if err := stream.Send(&pb.DeployStatusMessage{
Action: m.Action,
Appname: m.Appname,
Entrypoint: m.Entrypoint,
Nodename: m.Nodename,
Id: m.ID,
Data: []byte(m.Data),
}); err != nil {
v.logUnsentMessages("DeployStatus", m)
for {
select {
case m, ok := <-ch:
if !ok {
return nil
}
if m.Err != nil {
return m.Err
}
if err := stream.Send(&pb.DeployStatusMessage{
Action: m.Action,
Appname: m.Appname,
Entrypoint: m.Entrypoint,
Nodename: m.Nodename,
Id: m.ID,
Data: []byte(m.Data),
}); err != nil {
v.logUnsentMessages("DeployStatus", m)
}
case <-v.rpcch:
return nil
}
}
return nil
}

// RunAndWait is lambda
Expand Down Expand Up @@ -511,6 +518,6 @@ func (v *Vibranium) logUnsentMessages(msgType string, msg interface{}) {
}

// New will new a new cluster instance
func New(cluster cluster.Cluster, config types.Config) *Vibranium {
return &Vibranium{cluster: cluster, config: config, counter: sync.WaitGroup{}}
func New(cluster cluster.Cluster, config types.Config, rpcch chan struct{}) *Vibranium {
return &Vibranium{cluster: cluster, config: config, counter: sync.WaitGroup{}, rpcch: rpcch}
}
2 changes: 1 addition & 1 deletion rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func initConfig(mStore *mockstore.MockStore) (types.Config, *Vibranium) {
log.Fatal(err)
}
cluster.SetStore(mStore)
vibranium := New(cluster, config)
vibranium := New(cluster, config, make(chan struct{}))

return config, vibranium

Expand Down

0 comments on commit f08e41e

Please sign in to comment.