From 9c583e2b58fbc643b2931904c5e58eaa35d834cb Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Tue, 25 Jan 2022 16:58:51 +0800 Subject: [PATCH] use context with cancel --- rpc/counter.go | 29 ++++- rpc/counter_test.go | 4 +- rpc/rpc.go | 302 ++++++++++++++++++++++++-------------------- selfmon/selfmon.go | 5 +- 4 files changed, 196 insertions(+), 144 deletions(-) diff --git a/rpc/counter.go b/rpc/counter.go index 2e968d67e..2d2f5d0d5 100644 --- a/rpc/counter.go +++ b/rpc/counter.go @@ -8,6 +8,14 @@ import ( "golang.org/x/net/context" ) +type task struct { + v *Vibranium + name string + verbose bool + context context.Context + cancel context.CancelFunc +} + // gRPC上全局的计数器 // 只有在任务数为0的时候才给停止 // 为啥会加在gRPC server上呢? @@ -15,26 +23,33 @@ import ( // 增加一个任务, 在任务调用之前要调用一次. // 否则任务不被追踪, 不保证任务能够正常完成. -func (v *Vibranium) taskAdd(ctx context.Context, name string, verbose bool) context.Context { +func (v *Vibranium) newTask(ctx context.Context, name string, verbose bool) *task { if ctx != nil { ctx = context.WithValue(ctx, types.TracingID, utils.RandomString(8)) } + ctx, cancel := context.WithCancel(ctx) if verbose { log.Debugf(ctx, "[task] %s added", name) } v.counter.Add(1) v.TaskNum++ - return ctx + return &task{ + v: v, + name: name, + verbose: verbose, + context: ctx, + cancel: cancel, + } } // 完成一个任务, 在任务执行完之后调用一次. // 否则计数器用完不会为0, 你也别想退出这个进程了. -func (v *Vibranium) taskDone(ctx context.Context, name string, verbose bool) { - if verbose { - log.Debugf(ctx, "[task] %s done", name) +func (t *task) done() { + if t.verbose { + log.Debugf(t.context, "[task] %s done", t.name) } - v.counter.Done() - v.TaskNum-- + t.v.counter.Done() + t.v.TaskNum-- } // Wait for all tasks done diff --git a/rpc/counter_test.go b/rpc/counter_test.go index 55bdbcef3..f31e31514 100644 --- a/rpc/counter_test.go +++ b/rpc/counter_test.go @@ -9,10 +9,10 @@ import ( func TestCounter(t *testing.T) { v := Vibranium{} - v.taskAdd(context.TODO(), "test", true) + task := v.newTask(context.TODO(), "test", true) assert.Equal(t, v.TaskNum, 1) - v.taskDone(context.TODO(), "test", true) + task.done() assert.Equal(t, v.TaskNum, 0) v.Wait() diff --git a/rpc/rpc.go b/rpc/rpc.go index 912b4fb69..c0ee24fdf 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -45,9 +45,9 @@ func (v *Vibranium) Info(ctx context.Context, opts *pb.Empty) (*pb.CoreInfo, err // WatchServiceStatus pushes sibling services func (v *Vibranium) WatchServiceStatus(_ *pb.Empty, stream pb.CoreRPC_WatchServiceStatusServer) (err error) { - ctx := v.taskAdd(stream.Context(), "WatchServiceStatus", false) - defer v.taskDone(ctx, "WatchServiceStatus", false) - ch, err := v.cluster.WatchServiceStatus(ctx) + task := v.newTask(stream.Context(), "WatchServiceStatus", false) + defer task.done() + ch, err := v.cluster.WatchServiceStatus(task.context) if err != nil { return grpcstatus.Error(WatchServiceStatus, err.Error()) } @@ -59,7 +59,7 @@ func (v *Vibranium) WatchServiceStatus(_ *pb.Empty, stream pb.CoreRPC_WatchServi } s := toRPCServiceStatus(status) if err = stream.Send(s); err != nil { - v.logUnsentMessages(ctx, "WatchServicesStatus", err, s) + v.logUnsentMessages(task.context, "WatchServicesStatus", err, s) return grpcstatus.Error(WatchServiceStatus, err.Error()) } case <-v.stop: @@ -70,7 +70,9 @@ func (v *Vibranium) WatchServiceStatus(_ *pb.Empty, stream pb.CoreRPC_WatchServi // ListNetworks list networks for pod func (v *Vibranium) ListNetworks(ctx context.Context, opts *pb.ListNetworkOptions) (*pb.Networks, error) { - networks, err := v.cluster.ListNetworks(ctx, opts.Podname, opts.Driver) + task := v.newTask(ctx, "ListNetworks", false) + defer task.done() + networks, err := v.cluster.ListNetworks(task.context, opts.Podname, opts.Driver) if err != nil { return nil, grpcstatus.Error(ListNetworks, err.Error()) } @@ -84,7 +86,9 @@ func (v *Vibranium) ListNetworks(ctx context.Context, opts *pb.ListNetworkOption // ConnectNetwork connect network func (v *Vibranium) ConnectNetwork(ctx context.Context, opts *pb.ConnectNetworkOptions) (*pb.Network, error) { - subnets, err := v.cluster.ConnectNetwork(ctx, opts.Network, opts.Target, opts.Ipv4, opts.Ipv6) + task := v.newTask(ctx, "ConnectNetwork", false) + defer task.done() + subnets, err := v.cluster.ConnectNetwork(task.context, opts.Network, opts.Target, opts.Ipv4, opts.Ipv6) if err != nil { return nil, grpcstatus.Error(ConnectNetwork, err.Error()) } @@ -93,7 +97,9 @@ func (v *Vibranium) ConnectNetwork(ctx context.Context, opts *pb.ConnectNetworkO // DisconnectNetwork disconnect network func (v *Vibranium) DisconnectNetwork(ctx context.Context, opts *pb.DisconnectNetworkOptions) (*pb.Empty, error) { - if err := v.cluster.DisconnectNetwork(ctx, opts.Network, opts.Target, opts.Force); err != nil { + task := v.newTask(ctx, "DisconnectNetwork", false) + defer task.done() + if err := v.cluster.DisconnectNetwork(task.context, opts.Network, opts.Target, opts.Force); err != nil { return nil, grpcstatus.Error(DisconnectNetwork, err.Error()) } return &pb.Empty{}, nil @@ -101,7 +107,9 @@ func (v *Vibranium) DisconnectNetwork(ctx context.Context, opts *pb.DisconnectNe // AddPod saves a pod, and returns it to client func (v *Vibranium) AddPod(ctx context.Context, opts *pb.AddPodOptions) (*pb.Pod, error) { - p, err := v.cluster.AddPod(ctx, opts.Name, opts.Desc) + task := v.newTask(ctx, "AddPod", false) + defer task.done() + p, err := v.cluster.AddPod(task.context, opts.Name, opts.Desc) if err != nil { return nil, grpcstatus.Error(AddPod, err.Error()) } @@ -111,7 +119,9 @@ func (v *Vibranium) AddPod(ctx context.Context, opts *pb.AddPodOptions) (*pb.Pod // RemovePod removes a pod only if it's empty func (v *Vibranium) RemovePod(ctx context.Context, opts *pb.RemovePodOptions) (*pb.Empty, error) { - if err := v.cluster.RemovePod(ctx, opts.Name); err != nil { + task := v.newTask(ctx, "RemovePod", false) + defer task.done() + if err := v.cluster.RemovePod(task.context, opts.Name); err != nil { return nil, grpcstatus.Error(RemovePod, err.Error()) } return &pb.Empty{}, nil @@ -119,7 +129,9 @@ func (v *Vibranium) RemovePod(ctx context.Context, opts *pb.RemovePodOptions) (* // GetPod show a pod func (v *Vibranium) GetPod(ctx context.Context, opts *pb.GetPodOptions) (*pb.Pod, error) { - p, err := v.cluster.GetPod(ctx, opts.Name) + task := v.newTask(ctx, "GetPod", false) + defer task.done() + p, err := v.cluster.GetPod(task.context, opts.Name) if err != nil { return nil, grpcstatus.Error(GetPod, err.Error()) } @@ -129,7 +141,9 @@ func (v *Vibranium) GetPod(ctx context.Context, opts *pb.GetPodOptions) (*pb.Pod // ListPods returns a list of pods func (v *Vibranium) ListPods(ctx context.Context, _ *pb.Empty) (*pb.Pods, error) { - ps, err := v.cluster.ListPods(ctx) + task := v.newTask(ctx, "ListPods", false) + defer task.done() + ps, err := v.cluster.ListPods(task.context) if err != nil { return nil, grpcstatus.Error(ListPods, err.Error()) } @@ -144,9 +158,9 @@ func (v *Vibranium) ListPods(ctx context.Context, _ *pb.Empty) (*pb.Pods, error) // GetPodResource get pod nodes resource usage func (v *Vibranium) GetPodResource(ctx context.Context, opts *pb.GetPodOptions) (*pb.PodResource, error) { - ctx = v.taskAdd(ctx, "GetPodResource", false) - defer v.taskDone(ctx, "GetPodResource", false) - ch, err := v.cluster.PodResource(ctx, opts.Name) + task := v.newTask(ctx, "GetPodResource", false) + defer task.done() + ch, err := v.cluster.PodResource(task.context, opts.Name) if err != nil { return nil, grpcstatus.Error(PodResource, err.Error()) } @@ -159,15 +173,15 @@ func (v *Vibranium) GetPodResource(ctx context.Context, opts *pb.GetPodOptions) // PodResourceStream returns a stream of NodeResource func (v *Vibranium) PodResourceStream(opts *pb.GetPodOptions, stream pb.CoreRPC_PodResourceStreamServer) error { - ctx := v.taskAdd(stream.Context(), "PodResourceStream", false) - defer v.taskDone(ctx, "PodResourceStream", false) - ch, err := v.cluster.PodResource(ctx, opts.Name) + task := v.newTask(stream.Context(), "PodResourceStream", false) + defer task.done() + ch, err := v.cluster.PodResource(task.context, opts.Name) if err != nil { return grpcstatus.Error(PodResource, err.Error()) } for msg := range ch { if err := stream.Send(toRPCNodeResource(msg)); err != nil { - v.logUnsentMessages(ctx, "PodResourceStream", err, msg) + v.logUnsentMessages(task.context, "PodResourceStream", err, msg) } } return nil @@ -176,8 +190,10 @@ func (v *Vibranium) PodResourceStream(opts *pb.GetPodOptions, stream pb.CoreRPC_ // AddNode saves a node and returns it to client // Method must be called synchronously, or nothing will be returned func (v *Vibranium) AddNode(ctx context.Context, opts *pb.AddNodeOptions) (*pb.Node, error) { + task := v.newTask(ctx, "AddNode", false) + defer task.done() addNodeOpts := toCoreAddNodeOptions(opts) - n, err := v.cluster.AddNode(ctx, addNodeOpts) + n, err := v.cluster.AddNode(task.context, addNodeOpts) if err != nil { return nil, grpcstatus.Error(AddNode, err.Error()) } @@ -187,7 +203,9 @@ func (v *Vibranium) AddNode(ctx context.Context, opts *pb.AddNodeOptions) (*pb.N // RemoveNode removes the node from etcd func (v *Vibranium) RemoveNode(ctx context.Context, opts *pb.RemoveNodeOptions) (*pb.Empty, error) { - if err := v.cluster.RemoveNode(ctx, opts.Nodename); err != nil { + task := v.newTask(ctx, "RemoveNode", false) + defer task.done() + if err := v.cluster.RemoveNode(task.context, opts.Nodename); err != nil { return nil, grpcstatus.Error(RemoveNode, err.Error()) } return &pb.Empty{}, nil @@ -195,14 +213,14 @@ func (v *Vibranium) RemoveNode(ctx context.Context, opts *pb.RemoveNodeOptions) // ListPodNodes returns a list of node for pod func (v *Vibranium) ListPodNodes(ctx context.Context, opts *pb.ListNodesOptions) (*pb.Nodes, error) { - ctx = v.taskAdd(ctx, "ListPodNodes", false) - defer v.taskDone(ctx, "ListPodNodes", false) + task := v.newTask(ctx, "ListPodNodes", false) + defer task.done() timeout := time.Duration(opts.TimeoutInSecond) * time.Second if opts.TimeoutInSecond <= 0 { timeout = v.config.ConnectionTimeout } - ctx, cancel := context.WithTimeout(ctx, timeout) + ctx, cancel := context.WithTimeout(task.context, timeout) defer cancel() ch, err := v.cluster.ListPodNodes(ctx, toCoreListNodesOptions(opts)) @@ -220,14 +238,14 @@ func (v *Vibranium) ListPodNodes(ctx context.Context, opts *pb.ListNodesOptions) // PodNodesStream returns a stream of Node func (v *Vibranium) PodNodesStream(opts *pb.ListNodesOptions, stream pb.CoreRPC_PodNodesStreamServer) error { - ctx := v.taskAdd(stream.Context(), "PodNodesStream", false) - defer v.taskDone(ctx, "PodNodesStream", false) + task := v.newTask(stream.Context(), "PodNodesStream", false) + defer task.done() timeout := time.Duration(opts.TimeoutInSecond) * time.Second if opts.TimeoutInSecond <= 0 { timeout = v.config.ConnectionTimeout } - ctx, cancel := context.WithTimeout(ctx, timeout) + ctx, cancel := context.WithTimeout(task.context, timeout) defer cancel() ch, err := v.cluster.ListPodNodes(ctx, toCoreListNodesOptions(opts)) @@ -237,7 +255,7 @@ func (v *Vibranium) PodNodesStream(opts *pb.ListNodesOptions, stream pb.CoreRPC_ for msg := range ch { if err := stream.Send(toRPCNode(msg)); err != nil { - v.logUnsentMessages(ctx, "PodNodesStream", err, msg) + v.logUnsentMessages(task.context, "PodNodesStream", err, msg) } } return nil @@ -245,7 +263,9 @@ func (v *Vibranium) PodNodesStream(opts *pb.ListNodesOptions, stream pb.CoreRPC_ // GetNode get a node func (v *Vibranium) GetNode(ctx context.Context, opts *pb.GetNodeOptions) (*pb.Node, error) { - n, err := v.cluster.GetNode(ctx, opts.Nodename) + task := v.newTask(ctx, "GetNode", false) + defer task.done() + n, err := v.cluster.GetNode(task.context, opts.Nodename) if err != nil { return nil, grpcstatus.Error(GetNode, err.Error()) } @@ -255,11 +275,13 @@ func (v *Vibranium) GetNode(ctx context.Context, opts *pb.GetNodeOptions) (*pb.N // SetNode set node meta func (v *Vibranium) SetNode(ctx context.Context, opts *pb.SetNodeOptions) (*pb.Node, error) { + task := v.newTask(ctx, "SetNode", false) + defer task.done() setNodeOpts, err := toCoreSetNodeOptions(opts) if err != nil { return nil, grpcstatus.Error(SetNode, err.Error()) } - n, err := v.cluster.SetNode(ctx, setNodeOpts) + n, err := v.cluster.SetNode(task.context, setNodeOpts) if err != nil { return nil, grpcstatus.Error(SetNode, err.Error()) } @@ -268,7 +290,9 @@ func (v *Vibranium) SetNode(ctx context.Context, opts *pb.SetNodeOptions) (*pb.N // SetNodeStatus set status of a node for reporting func (v *Vibranium) SetNodeStatus(ctx context.Context, opts *pb.SetNodeStatusOptions) (*pb.Empty, error) { - if err := v.cluster.SetNodeStatus(ctx, opts.Nodename, opts.Ttl); err != nil { + task := v.newTask(ctx, "SetNodeStatus", false) + defer task.done() + if err := v.cluster.SetNodeStatus(task.context, opts.Nodename, opts.Ttl); err != nil { return nil, grpcstatus.Error(SetNodeStatus, err.Error()) } return &pb.Empty{}, nil @@ -276,7 +300,9 @@ func (v *Vibranium) SetNodeStatus(ctx context.Context, opts *pb.SetNodeStatusOpt // GetNodeStatus set status of a node for reporting func (v *Vibranium) GetNodeStatus(ctx context.Context, opts *pb.GetNodeStatusOptions) (*pb.NodeStatusStreamMessage, error) { - status, err := v.cluster.GetNodeStatus(ctx, opts.Nodename) + task := v.newTask(ctx, "GetNodeStatus", false) + defer task.done() + status, err := v.cluster.GetNodeStatus(task.context, opts.Nodename) if err != nil { return nil, grpcstatus.Error(GetNodeStatus, err.Error()) } @@ -289,10 +315,10 @@ func (v *Vibranium) GetNodeStatus(ctx context.Context, opts *pb.GetNodeStatusOpt // NodeStatusStream watch and show deployed status func (v *Vibranium) NodeStatusStream(_ *pb.Empty, stream pb.CoreRPC_NodeStatusStreamServer) error { - ctx := v.taskAdd(stream.Context(), "NodeStatusStream", true) - defer v.taskDone(ctx, "NodeStatusStream", true) + task := v.newTask(stream.Context(), "NodeStatusStream", true) + defer task.done() - ch := v.cluster.NodeStatusStream(ctx) + ch := v.cluster.NodeStatusStream(task.context) for { select { case m, ok := <-ch: @@ -308,7 +334,7 @@ func (v *Vibranium) NodeStatusStream(_ *pb.Empty, stream pb.CoreRPC_NodeStatusSt r.Error = m.Error.Error() } if err := stream.Send(r); err != nil { - v.logUnsentMessages(ctx, "NodeStatusStream", err, m) + v.logUnsentMessages(task.context, "NodeStatusStream", err, m) } case <-v.stop: return nil @@ -318,7 +344,9 @@ func (v *Vibranium) NodeStatusStream(_ *pb.Empty, stream pb.CoreRPC_NodeStatusSt // GetNodeResource check node resource func (v *Vibranium) GetNodeResource(ctx context.Context, opts *pb.GetNodeResourceOptions) (*pb.NodeResource, error) { - nr, err := v.cluster.NodeResource(ctx, opts.GetOpts().Nodename, opts.Fix) + task := v.newTask(ctx, "GetNodeResource", false) + defer task.done() + nr, err := v.cluster.NodeResource(task.context, opts.GetOpts().Nodename, opts.Fix) if err != nil { return nil, grpcstatus.Error(GetNodeResource, err.Error()) } @@ -328,13 +356,13 @@ func (v *Vibranium) GetNodeResource(ctx context.Context, opts *pb.GetNodeResourc // CalculateCapacity calculates capacity for each node func (v *Vibranium) CalculateCapacity(ctx context.Context, opts *pb.DeployOptions) (*pb.CapacityMessage, error) { - ctx = v.taskAdd(ctx, "CalculateCapacity", true) - defer v.taskDone(ctx, "CalculateCapacity", true) + task := v.newTask(ctx, "CalculateCapacity", true) + defer task.done() deployOpts, err := toCoreDeployOptions(opts) if err != nil { return nil, grpcstatus.Error(CalculateCapacity, err.Error()) } - m, err := v.cluster.CalculateCapacity(ctx, deployOpts) + m, err := v.cluster.CalculateCapacity(task.context, deployOpts) if err != nil { return nil, grpcstatus.Error(CalculateCapacity, err.Error()) } @@ -344,29 +372,33 @@ func (v *Vibranium) CalculateCapacity(ctx context.Context, opts *pb.DeployOption // GetWorkload get a workload // More information will be shown func (v *Vibranium) GetWorkload(ctx context.Context, id *pb.WorkloadID) (*pb.Workload, error) { - workload, err := v.cluster.GetWorkload(ctx, id.Id) + task := v.newTask(ctx, "GetWorkload", false) + defer task.done() + workload, err := v.cluster.GetWorkload(task.context, id.Id) if err != nil { return nil, grpcstatus.Error(GetWorkload, err.Error()) } - return toRPCWorkload(ctx, workload) + return toRPCWorkload(task.context, workload) } // GetWorkloads get lots workloads // like GetWorkload, information should be returned func (v *Vibranium) GetWorkloads(ctx context.Context, cids *pb.WorkloadIDs) (*pb.Workloads, error) { - workloads, err := v.cluster.GetWorkloads(ctx, cids.GetIds()) + task := v.newTask(ctx, "GetWorkloads", false) + defer task.done() + workloads, err := v.cluster.GetWorkloads(task.context, cids.GetIds()) if err != nil { return nil, grpcstatus.Error(GetWorkloads, err.Error()) } - return toRPCWorkloads(ctx, workloads, nil), nil + return toRPCWorkloads(task.context, workloads, nil), nil } // ListWorkloads by appname with optional entrypoint and nodename func (v *Vibranium) ListWorkloads(opts *pb.ListWorkloadsOptions, stream pb.CoreRPC_ListWorkloadsServer) error { - ctx := v.taskAdd(stream.Context(), "ListWorkloads", true) - defer v.taskDone(ctx, "ListWorkloads", true) + task := v.newTask(stream.Context(), "ListWorkloads", true) + defer task.done() lsopts := &types.ListWorkloadsOptions{ Appname: opts.Appname, Entrypoint: opts.Entrypoint, @@ -374,14 +406,14 @@ func (v *Vibranium) ListWorkloads(opts *pb.ListWorkloadsOptions, stream pb.CoreR Limit: opts.Limit, Labels: opts.Labels, } - workloads, err := v.cluster.ListWorkloads(ctx, lsopts) + workloads, err := v.cluster.ListWorkloads(task.context, lsopts) if err != nil { return grpcstatus.Error(ListWorkloads, err.Error()) } - for _, c := range toRPCWorkloads(ctx, workloads, opts.Labels).Workloads { + for _, c := range toRPCWorkloads(task.context, workloads, opts.Labels).Workloads { if err = stream.Send(c); err != nil { - v.logUnsentMessages(ctx, "ListWorkloads", err, c) + v.logUnsentMessages(task.context, "ListWorkloads", err, c) return grpcstatus.Error(ListWorkloads, err.Error()) } } @@ -390,19 +422,21 @@ func (v *Vibranium) ListWorkloads(opts *pb.ListWorkloadsOptions, stream pb.CoreR // ListNodeWorkloads list node workloads func (v *Vibranium) ListNodeWorkloads(ctx context.Context, opts *pb.GetNodeOptions) (*pb.Workloads, error) { - workloads, err := v.cluster.ListNodeWorkloads(ctx, opts.Nodename, opts.Labels) + task := v.newTask(ctx, "ListNodeWorkloads", false) + defer task.done() + workloads, err := v.cluster.ListNodeWorkloads(task.context, opts.Nodename, opts.Labels) if err != nil { return nil, grpcstatus.Error(ListNodeWorkloads, err.Error()) } - return toRPCWorkloads(ctx, workloads, nil), nil + return toRPCWorkloads(task.context, workloads, nil), nil } // GetWorkloadsStatus get workloads status func (v *Vibranium) GetWorkloadsStatus(ctx context.Context, opts *pb.WorkloadIDs) (*pb.WorkloadsStatus, error) { - ctx = v.taskAdd(ctx, "GetWorkloadsStatus", false) - defer v.taskDone(ctx, "GetWorkloadsStatus", false) + task := v.newTask(ctx, "GetWorkloadsStatus", false) + defer task.done() - workloadsStatus, err := v.cluster.GetWorkloadsStatus(ctx, opts.Ids) + workloadsStatus, err := v.cluster.GetWorkloadsStatus(task.context, opts.Ids) if err != nil { return nil, grpcstatus.Error(GetWorkloadsStatus, err.Error()) } @@ -411,8 +445,8 @@ func (v *Vibranium) GetWorkloadsStatus(ctx context.Context, opts *pb.WorkloadIDs // SetWorkloadsStatus set workloads status func (v *Vibranium) SetWorkloadsStatus(ctx context.Context, opts *pb.SetWorkloadsStatusOptions) (*pb.WorkloadsStatus, error) { - ctx = v.taskAdd(ctx, "SetWorkloadsStatus", false) - defer v.taskDone(ctx, "SetWorkloadsStatus", false) + task := v.newTask(ctx, "SetWorkloadsStatus", false) + defer task.done() var err error statusData := []*types.StatusMeta{} @@ -433,7 +467,7 @@ func (v *Vibranium) SetWorkloadsStatus(ctx context.Context, opts *pb.SetWorkload ttls[status.Id] = status.Ttl } - status, err := v.cluster.SetWorkloadsStatus(ctx, statusData, ttls) + status, err := v.cluster.SetWorkloadsStatus(task.context, statusData, ttls) if err != nil { return nil, grpcstatus.Error(SetWorkloadsStatus, err.Error()) } @@ -442,14 +476,14 @@ func (v *Vibranium) SetWorkloadsStatus(ctx context.Context, opts *pb.SetWorkload // WorkloadStatusStream watch and show deployed status func (v *Vibranium) WorkloadStatusStream(opts *pb.WorkloadStatusStreamOptions, stream pb.CoreRPC_WorkloadStatusStreamServer) error { - ctx := v.taskAdd(stream.Context(), "WorkloadStatusStream", true) - defer v.taskDone(ctx, "WorkloadStatusStream", true) + task := v.newTask(stream.Context(), "WorkloadStatusStream", true) + defer task.done() - log.Infof(ctx, "[rpc] WorkloadStatusStream start %s", opts.Appname) - defer log.Infof(ctx, "[rpc] WorkloadStatusStream stop %s", opts.Appname) + log.Infof(task.context, "[rpc] WorkloadStatusStream start %s", opts.Appname) + defer log.Infof(task.context, "[rpc] WorkloadStatusStream stop %s", opts.Appname) ch := v.cluster.WorkloadStatusStream( - ctx, + task.context, opts.Appname, opts.Entrypoint, opts.Nodename, opts.Labels, ) for { @@ -462,7 +496,7 @@ func (v *Vibranium) WorkloadStatusStream(opts *pb.WorkloadStatusStreamOptions, s if m.Error != nil { r.Error = m.Error.Error() } else if m.Workload != nil { - if workload, err := toRPCWorkload(ctx, m.Workload); err != nil { + if workload, err := toRPCWorkload(task.context, m.Workload); err != nil { r.Error = err.Error() } else { r.Workload = workload @@ -470,7 +504,7 @@ func (v *Vibranium) WorkloadStatusStream(opts *pb.WorkloadStatusStreamOptions, s } } if err := stream.Send(r); err != nil { - v.logUnsentMessages(ctx, "WorkloadStatusStream", err, m) + v.logUnsentMessages(task.context, "WorkloadStatusStream", err, m) } case <-v.stop: return nil @@ -480,11 +514,11 @@ func (v *Vibranium) WorkloadStatusStream(opts *pb.WorkloadStatusStreamOptions, s // Copy copy files from multiple workloads func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) error { - ctx := v.taskAdd(stream.Context(), "Copy", true) - defer v.taskDone(ctx, "Copy", true) + task := v.newTask(stream.Context(), "Copy", true) + defer task.done() copyOpts := toCoreCopyOptions(opts) - ch, err := v.cluster.Copy(ctx, copyOpts) + ch, err := v.cluster.Copy(task.context, copyOpts) if err != nil { return grpcstatus.Error(Copy, err.Error()) } @@ -498,7 +532,7 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err if m.Error != nil { msg.Error = m.Error.Error() if err := stream.Send(msg); err != nil { - v.logUnsentMessages(ctx, "Copy", err, m) + v.logUnsentMessages(task.context, "Copy", err, m) } continue } @@ -521,11 +555,11 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err Size: int64(len(m.Content)), } if err = tw.WriteHeader(header); err != nil { - log.Errorf(ctx, "[Copy] Error during writing tarball header: %v", err) + log.Errorf(task.context, "[Copy] Error during writing tarball header: %v", err) return } if _, err = tw.Write(m.Content); err != nil { - log.Errorf(ctx, "[Copy] Error during writing tarball content: %v", err) + log.Errorf(task.context, "[Copy] Error during writing tarball content: %v", err) return } } @@ -535,10 +569,10 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err n, err := r.Read(p) if err != nil { if err != io.EOF { - log.Errorf(ctx, "[Copy] Error during buffer resp: %v", err) + log.Errorf(task.context, "[Copy] Error during buffer resp: %v", err) msg.Error = err.Error() if err = stream.Send(msg); err != nil { - v.logUnsentMessages(ctx, "Copy", err, m) + v.logUnsentMessages(task.context, "Copy", err, m) } } break @@ -546,7 +580,7 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err if n > 0 { msg.Data = p[:n] if err = stream.Send(msg); err != nil { - v.logUnsentMessages(ctx, "Copy", err, m) + v.logUnsentMessages(task.context, "Copy", err, m) } } } @@ -556,15 +590,15 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err // Send send files to some contaienrs func (v *Vibranium) Send(opts *pb.SendOptions, stream pb.CoreRPC_SendServer) error { - ctx := v.taskAdd(stream.Context(), "Send", true) - defer v.taskDone(ctx, "Send", true) + task := v.newTask(stream.Context(), "Send", true) + defer task.done() sendOpts, err := toCoreSendOptions(opts) if err != nil { return grpcstatus.Error(Send, err.Error()) } - ch, err := v.cluster.Send(ctx, sendOpts) + ch, err := v.cluster.Send(task.context, sendOpts) if err != nil { return grpcstatus.Error(Send, err.Error()) } @@ -580,7 +614,7 @@ func (v *Vibranium) Send(opts *pb.SendOptions, stream pb.CoreRPC_SendServer) err } if err := stream.Send(msg); err != nil { - v.logUnsentMessages(ctx, "Send", err, m) + v.logUnsentMessages(task.context, "Send", err, m) } } return nil @@ -588,21 +622,21 @@ func (v *Vibranium) Send(opts *pb.SendOptions, stream pb.CoreRPC_SendServer) err // BuildImage streamed returned functions func (v *Vibranium) BuildImage(opts *pb.BuildImageOptions, stream pb.CoreRPC_BuildImageServer) error { - ctx := v.taskAdd(stream.Context(), "BuildImage", true) - defer v.taskDone(ctx, "BuildImage", true) + task := v.newTask(stream.Context(), "BuildImage", true) + defer task.done() buildOpts, err := toCoreBuildOptions(opts) if err != nil { return grpcstatus.Error(BuildImage, err.Error()) } - ch, err := v.cluster.BuildImage(ctx, buildOpts) + ch, err := v.cluster.BuildImage(task.context, buildOpts) if err != nil { return grpcstatus.Error(BuildImage, err.Error()) } for m := range ch { if err = stream.Send(toRPCBuildImageMessage(m)); err != nil { - v.logUnsentMessages(ctx, "BuildImage", err, m) + v.logUnsentMessages(task.context, "BuildImage", err, m) } } return nil @@ -610,17 +644,17 @@ func (v *Vibranium) BuildImage(opts *pb.BuildImageOptions, stream pb.CoreRPC_Bui // CacheImage cache image func (v *Vibranium) CacheImage(opts *pb.CacheImageOptions, stream pb.CoreRPC_CacheImageServer) error { - ctx := v.taskAdd(stream.Context(), "CacheImage", true) - defer v.taskDone(ctx, "CacheImage", true) + task := v.newTask(stream.Context(), "CacheImage", true) + defer task.done() - ch, err := v.cluster.CacheImage(ctx, toCoreCacheImageOptions(opts)) + ch, err := v.cluster.CacheImage(task.context, toCoreCacheImageOptions(opts)) if err != nil { return grpcstatus.Error(CacheImage, err.Error()) } for m := range ch { if err = stream.Send(toRPCCacheImageMessage(m)); err != nil { - v.logUnsentMessages(ctx, "CacheImage", err, m) + v.logUnsentMessages(task.context, "CacheImage", err, m) } } return nil @@ -628,17 +662,17 @@ func (v *Vibranium) CacheImage(opts *pb.CacheImageOptions, stream pb.CoreRPC_Cac // RemoveImage remove image func (v *Vibranium) RemoveImage(opts *pb.RemoveImageOptions, stream pb.CoreRPC_RemoveImageServer) error { - ctx := v.taskAdd(stream.Context(), "RemoveImage", true) - defer v.taskDone(ctx, "RemoveImage", true) + task := v.newTask(stream.Context(), "RemoveImage", true) + defer task.done() - ch, err := v.cluster.RemoveImage(ctx, toCoreRemoveImageOptions(opts)) + ch, err := v.cluster.RemoveImage(task.context, toCoreRemoveImageOptions(opts)) if err != nil { return grpcstatus.Error(RemoveImage, err.Error()) } for m := range ch { if err = stream.Send(toRPCRemoveImageMessage(m)); err != nil { - v.logUnsentMessages(ctx, "RemoveImage", err, m) + v.logUnsentMessages(task.context, "RemoveImage", err, m) } } return nil @@ -646,17 +680,17 @@ func (v *Vibranium) RemoveImage(opts *pb.RemoveImageOptions, stream pb.CoreRPC_R // ListImage list image func (v *Vibranium) ListImage(opts *pb.ListImageOptions, stream pb.CoreRPC_ListImageServer) error { - ctx := v.taskAdd(stream.Context(), "ListImage", true) - defer v.taskDone(ctx, "ListImage", true) + task := v.newTask(stream.Context(), "ListImage", true) + defer task.done() - ch, err := v.cluster.ListImage(ctx, toCoreListImageOptions(opts)) + ch, err := v.cluster.ListImage(task.context, toCoreListImageOptions(opts)) if err != nil { return grpcstatus.Error(ListImage, err.Error()) } for msg := range ch { if err = stream.Send(toRPCListImageMessage(msg)); err != nil { - v.logUnsentMessages(ctx, "ListImage", err, msg) + v.logUnsentMessages(task.context, "ListImage", err, msg) } } @@ -665,22 +699,22 @@ func (v *Vibranium) ListImage(opts *pb.ListImageOptions, stream pb.CoreRPC_ListI // CreateWorkload create workloads func (v *Vibranium) CreateWorkload(opts *pb.DeployOptions, stream pb.CoreRPC_CreateWorkloadServer) error { - ctx := v.taskAdd(stream.Context(), "CreateWorkload", true) - defer v.taskDone(ctx, "CreateWorkload", true) + task := v.newTask(stream.Context(), "CreateWorkload", true) + defer task.done() deployOpts, err := toCoreDeployOptions(opts) if err != nil { return grpcstatus.Error(CreateWorkload, err.Error()) } - ch, err := v.cluster.CreateWorkload(ctx, deployOpts) + ch, err := v.cluster.CreateWorkload(task.context, deployOpts) if err != nil { return grpcstatus.Error(CreateWorkload, err.Error()) } for m := range ch { - log.Debugf(ctx, "[CreateWorkload] create workload message: %+v", m) + log.Debugf(task.context, "[CreateWorkload] create workload message: %+v", m) if err = stream.Send(toRPCCreateWorkloadMessage(m)); err != nil { - v.logUnsentMessages(ctx, "CreateWorkload", err, m) + v.logUnsentMessages(task.context, "CreateWorkload", err, m) } } return nil @@ -688,22 +722,22 @@ func (v *Vibranium) CreateWorkload(opts *pb.DeployOptions, stream pb.CoreRPC_Cre // ReplaceWorkload replace workloads func (v *Vibranium) ReplaceWorkload(opts *pb.ReplaceOptions, stream pb.CoreRPC_ReplaceWorkloadServer) error { - ctx := v.taskAdd(stream.Context(), "ReplaceWorkload", true) - defer v.taskDone(ctx, "ReplaceWorkload", true) + task := v.newTask(stream.Context(), "ReplaceWorkload", true) + defer task.done() replaceOpts, err := toCoreReplaceOptions(opts) if err != nil { return grpcstatus.Error(ReplaceWorkload, err.Error()) } - ch, err := v.cluster.ReplaceWorkload(ctx, replaceOpts) + ch, err := v.cluster.ReplaceWorkload(task.context, replaceOpts) if err != nil { return grpcstatus.Error(ReplaceWorkload, err.Error()) } for m := range ch { if err = stream.Send(toRPCReplaceWorkloadMessage(m)); err != nil { - v.logUnsentMessages(ctx, "ReplaceWorkload", err, m) + v.logUnsentMessages(task.context, "ReplaceWorkload", err, m) } } return nil @@ -711,8 +745,8 @@ func (v *Vibranium) ReplaceWorkload(opts *pb.ReplaceOptions, stream pb.CoreRPC_R // RemoveWorkload remove workloads func (v *Vibranium) RemoveWorkload(opts *pb.RemoveWorkloadOptions, stream pb.CoreRPC_RemoveWorkloadServer) error { - ctx := v.taskAdd(stream.Context(), "RemoveWorkload", true) - defer v.taskDone(ctx, "RemoveWorkload", true) + task := v.newTask(stream.Context(), "RemoveWorkload", true) + defer task.done() ids := opts.GetIds() force := opts.GetForce() @@ -721,14 +755,14 @@ func (v *Vibranium) RemoveWorkload(opts *pb.RemoveWorkloadOptions, stream pb.Cor if len(ids) == 0 { return types.ErrNoWorkloadIDs } - ch, err := v.cluster.RemoveWorkload(ctx, ids, force, step) + ch, err := v.cluster.RemoveWorkload(task.context, ids, force, step) if err != nil { return grpcstatus.Error(ReplaceWorkload, err.Error()) } for m := range ch { if err = stream.Send(toRPCRemoveWorkloadMessage(m)); err != nil { - v.logUnsentMessages(ctx, "RemoveWorkload", err, m) + v.logUnsentMessages(task.context, "RemoveWorkload", err, m) } } @@ -737,22 +771,22 @@ func (v *Vibranium) RemoveWorkload(opts *pb.RemoveWorkloadOptions, stream pb.Cor // DissociateWorkload dissociate workload func (v *Vibranium) DissociateWorkload(opts *pb.DissociateWorkloadOptions, stream pb.CoreRPC_DissociateWorkloadServer) error { - ctx := v.taskAdd(stream.Context(), "DissociateWorkload", true) - defer v.taskDone(ctx, "DissociateWorkload", true) + task := v.newTask(stream.Context(), "DissociateWorkload", true) + defer task.done() ids := opts.GetIds() if len(ids) == 0 { return types.ErrNoWorkloadIDs } - ch, err := v.cluster.DissociateWorkload(ctx, ids) + ch, err := v.cluster.DissociateWorkload(task.context, ids) if err != nil { return grpcstatus.Error(DissociateWorkload, err.Error()) } for m := range ch { if err = stream.Send(toRPCDissociateWorkloadMessage(m)); err != nil { - v.logUnsentMessages(ctx, "DissociateWorkload", err, m) + v.logUnsentMessages(task.context, "DissociateWorkload", err, m) } } @@ -761,8 +795,8 @@ func (v *Vibranium) DissociateWorkload(opts *pb.DissociateWorkloadOptions, strea // ControlWorkload control workloads func (v *Vibranium) ControlWorkload(opts *pb.ControlWorkloadOptions, stream pb.CoreRPC_ControlWorkloadServer) error { - ctx := v.taskAdd(stream.Context(), "ControlWorkload", true) - defer v.taskDone(ctx, "ControlWorkload", true) + task := v.newTask(stream.Context(), "ControlWorkload", true) + defer task.done() ids := opts.GetIds() t := opts.GetType() @@ -772,14 +806,14 @@ func (v *Vibranium) ControlWorkload(opts *pb.ControlWorkloadOptions, stream pb.C return types.ErrNoWorkloadIDs } - ch, err := v.cluster.ControlWorkload(ctx, ids, t, force) + ch, err := v.cluster.ControlWorkload(task.context, ids, t, force) if err != nil { return grpcstatus.Error(ControlWorkload, err.Error()) } for m := range ch { if err = stream.Send(toRPCControlWorkloadMessage(m)); err != nil { - v.logUnsentMessages(ctx, "ControlWorkload", err, m) + v.logUnsentMessages(task.context, "ControlWorkload", err, m) } } @@ -788,8 +822,8 @@ func (v *Vibranium) ControlWorkload(opts *pb.ControlWorkloadOptions, stream pb.C // ExecuteWorkload runs a command in a running workload func (v *Vibranium) ExecuteWorkload(stream pb.CoreRPC_ExecuteWorkloadServer) error { - ctx := v.taskAdd(stream.Context(), "ExecuteWorkload", true) - defer v.taskDone(ctx, "ExecuteWorkload", true) + task := v.newTask(stream.Context(), "ExecuteWorkload", true) + defer task.done() opts, err := stream.Recv() if err != nil { @@ -807,7 +841,7 @@ func (v *Vibranium) ExecuteWorkload(stream pb.CoreRPC_ExecuteWorkloadServer) err for { execWorkloadOpt, err := stream.Recv() if execWorkloadOpt == nil || err != nil { - log.Errorf(ctx, "[ExecuteWorkload] Recv command error: %v", err) + log.Errorf(task.context, "[ExecuteWorkload] Recv command error: %v", err) return } inCh <- execWorkloadOpt.ReplCmd @@ -815,9 +849,9 @@ func (v *Vibranium) ExecuteWorkload(stream pb.CoreRPC_ExecuteWorkloadServer) err } }) - for m := range v.cluster.ExecuteWorkload(ctx, executeWorkloadOpts, inCh) { + for m := range v.cluster.ExecuteWorkload(task.context, executeWorkloadOpts, inCh) { if err = stream.Send(toRPCAttachWorkloadMessage(m)); err != nil { - v.logUnsentMessages(ctx, "ExecuteWorkload", err, m) + v.logUnsentMessages(task.context, "ExecuteWorkload", err, m) } } return nil @@ -825,8 +859,8 @@ func (v *Vibranium) ExecuteWorkload(stream pb.CoreRPC_ExecuteWorkloadServer) err // ReallocResource realloc res for workloads func (v *Vibranium) ReallocResource(ctx context.Context, opts *pb.ReallocOptions) (msg *pb.ReallocResourceMessage, err error) { - ctx = v.taskAdd(ctx, "ReallocResource", true) - defer v.taskDone(ctx, "ReallocResource", true) + task := v.newTask(ctx, "ReallocResource", true) + defer task.done() defer func() { errString := "" if err != nil { @@ -850,7 +884,7 @@ func (v *Vibranium) ReallocResource(ctx context.Context, opts *pb.ReallocOptions } if err := v.cluster.ReallocResource( - ctx, + task.context, &types.ReallocOptions{ ID: opts.Id, CPUBindOpts: types.TriOptions(opts.BindCpuOpt), @@ -874,13 +908,13 @@ func (v *Vibranium) ReallocResource(ctx context.Context, opts *pb.ReallocOptions // LogStream get workload logs func (v *Vibranium) LogStream(opts *pb.LogStreamOptions, stream pb.CoreRPC_LogStreamServer) error { - ctx := v.taskAdd(stream.Context(), "LogStream", true) - defer v.taskDone(ctx, "LogStream", true) + task := v.newTask(stream.Context(), "LogStream", true) + defer task.done() ID := opts.GetId() - log.Infof(ctx, "[LogStream] Get %s log start", ID) - defer log.Infof(ctx, "[LogStream] Get %s log done", ID) - ch, err := v.cluster.LogStream(ctx, &types.LogStreamOptions{ + log.Infof(task.context, "[LogStream] Get %s log start", ID) + defer log.Infof(task.context, "[LogStream] Get %s log done", ID) + ch, err := v.cluster.LogStream(task.context, &types.LogStreamOptions{ ID: ID, Tail: opts.Tail, Since: opts.Since, @@ -898,7 +932,7 @@ func (v *Vibranium) LogStream(opts *pb.LogStreamOptions, stream pb.CoreRPC_LogSt return nil } if err = stream.Send(toRPCLogStreamMessage(m)); err != nil { - v.logUnsentMessages(ctx, "LogStream", err, m) + v.logUnsentMessages(task.context, "LogStream", err, m) } case <-v.stop: return nil @@ -908,26 +942,26 @@ func (v *Vibranium) LogStream(opts *pb.LogStreamOptions, stream pb.CoreRPC_LogSt // RunAndWait is lambda func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error { - ctx := v.taskAdd(stream.Context(), "RunAndWait", true) + task := v.newTask(stream.Context(), "RunAndWait", true) RunAndWaitOptions, err := stream.Recv() if err != nil { - v.taskDone(ctx, "RunAndWait", true) + task.done() return grpcstatus.Error(RunAndWait, err.Error()) } if RunAndWaitOptions.DeployOptions == nil { - v.taskDone(ctx, "RunAndWait", true) + task.done() return grpcstatus.Error(RunAndWait, types.ErrNoDeployOpts.Error()) } opts := RunAndWaitOptions.DeployOptions deployOpts, err := toCoreDeployOptions(opts) if err != nil { - v.taskDone(ctx, "RunAndWait", true) + task.done() return grpcstatus.Error(RunAndWait, err.Error()) } - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(task.context) if RunAndWaitOptions.Async { timeout := v.config.GlobalTimeout if RunAndWaitOptions.AsyncTimeout != 0 { @@ -956,7 +990,7 @@ func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error { ids, ch, err := v.cluster.RunAndWait(ctx, deployOpts, inCh) if err != nil { - v.taskDone(ctx, "RunAndWait", true) + task.done() cancel() return grpcstatus.Error(RunAndWait, err.Error()) } @@ -974,7 +1008,7 @@ func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error { // then deal with the rest messages runAndWait := func(f func(<-chan *types.AttachWorkloadMessage)) { - defer v.taskDone(ctx, "RunAndWait", true) + defer task.done() defer cancel() f(ch) } diff --git a/selfmon/selfmon.go b/selfmon/selfmon.go index c7bd65bb4..a614d2eea 100644 --- a/selfmon/selfmon.go +++ b/selfmon/selfmon.go @@ -32,7 +32,7 @@ func RunNodeStatusWatcher(ctx context.Context, config types.Config, cluster clus id := rand.Int63n(10000) // nolint store, err := store.NewStore(config, t) if err != nil { - log.Errorf(context.TODO(), "[RunNodeStatusWatcher] %v failed to create store, err: %v", id, err) + log.Errorf(ctx, "[RunNodeStatusWatcher] %v failed to create store, err: %v", id, err) return } @@ -201,6 +201,9 @@ func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message * return } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // TODO maybe we need a distributed lock to control concurrency opts := &types.SetNodeOptions{ Nodename: message.Nodename,