From 3864b85d3c8f4586d1e12c3f67bb5ef877f9beeb Mon Sep 17 00:00:00 2001 From: tonic Date: Wed, 9 Jun 2021 19:02:10 +0800 Subject: [PATCH] fix waiting forever problem --- core.go | 6 +++--- rpc/rpc.go | 28 ++++++++++++++++------------ 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/core.go b/core.go index 42779a057..3433e66d6 100644 --- a/core.go +++ b/core.go @@ -78,8 +78,8 @@ func serve(c *cli.Context) error { defer cluster.Finalizer() cluster.DisasterRecover() - rpcch := make(chan struct{}, 1) - vibranium := rpc.New(cluster, config, rpcch) + stop := make(chan struct{}, 1) + vibranium := rpc.New(cluster, config, stop) s, err := net.Listen("tcp", config.Bind) if err != nil { log.Errorf(context.TODO(), "[main] %v", err) @@ -129,7 +129,7 @@ func serve(c *cli.Context) error { <-ctx.Done() log.Info("[main] Interrupt by signal") - close(rpcch) + close(stop) unregisterService() grpcServer.GracefulStop() log.Info("[main] gRPC server gracefully stopped.") diff --git a/rpc/rpc.go b/rpc/rpc.go index 3aad87dc0..2d02fe0fc 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -26,7 +26,7 @@ type Vibranium struct { cluster cluster.Cluster config types.Config counter sync.WaitGroup - rpcch chan struct{} + stop chan struct{} TaskNum int } @@ -50,14 +50,18 @@ func (v *Vibranium) WatchServiceStatus(_ *pb.Empty, stream pb.CoreRPC_WatchServi if err != nil { return grpcstatus.Error(WatchServiceStatus, err.Error()) } - for status := range ch { - s := toRPCServiceStatus(status) - if err = stream.Send(s); err != nil { - v.logUnsentMessages(ctx, "WatchServicesStatus", err, s) - return grpcstatus.Error(WatchServiceStatus, err.Error()) + for { + select { + case status := <-ch: + s := toRPCServiceStatus(status) + if err = stream.Send(s); err != nil { + v.logUnsentMessages(ctx, "WatchServicesStatus", err, s) + return grpcstatus.Error(WatchServiceStatus, err.Error()) + } + case <-v.stop: + return nil } } - return nil } // ListNetworks list networks for pod @@ -246,7 +250,7 @@ func (v *Vibranium) NodeStatusStream(_ *pb.Empty, stream pb.CoreRPC_NodeStatusSt if err := stream.Send(r); err != nil { v.logUnsentMessages(ctx, "NodeStatusStream", err, m) } - case <-v.rpcch: + case <-v.stop: return nil } } @@ -407,7 +411,7 @@ func (v *Vibranium) WorkloadStatusStream(opts *pb.WorkloadStatusStreamOptions, s if err := stream.Send(r); err != nil { v.logUnsentMessages(ctx, "WorkloadStatusStream", err, m) } - case <-v.rpcch: + case <-v.stop: return nil } } @@ -813,7 +817,7 @@ func (v *Vibranium) LogStream(opts *pb.LogStreamOptions, stream pb.CoreRPC_LogSt if err = stream.Send(toRPCLogStreamMessage(m)); err != nil { v.logUnsentMessages(ctx, "LogStream", err, m) } - case <-v.rpcch: + case <-v.stop: return nil } } @@ -941,6 +945,6 @@ func (v *Vibranium) logUnsentMessages(ctx context.Context, msgType string, err e } // New will new a new cluster instance -func New(cluster cluster.Cluster, config types.Config, rpcch chan struct{}) *Vibranium { - return &Vibranium{cluster: cluster, config: config, counter: sync.WaitGroup{}, rpcch: rpcch} +func New(cluster cluster.Cluster, config types.Config, stop chan struct{}) *Vibranium { + return &Vibranium{cluster: cluster, config: config, counter: sync.WaitGroup{}, stop: stop} }