From 7fad65271d65a1f5c7b529a52e2d3d8d0eec346b Mon Sep 17 00:00:00 2001 From: zc Date: Thu, 23 Dec 2021 16:17:51 +0800 Subject: [PATCH] implement PodresourceStream --- cluster/calcium/resource.go | 31 +++++++++++++++++++------------ cluster/cluster.go | 2 +- rpc/rpc.go | 29 +++++++++++++++++++++++++++-- rpc/transform.go | 11 ----------- 4 files changed, 47 insertions(+), 26 deletions(-) diff --git a/cluster/calcium/resource.go b/cluster/calcium/resource.go index 10442c9ff..c22f9eebe 100644 --- a/cluster/calcium/resource.go +++ b/cluster/calcium/resource.go @@ -15,24 +15,31 @@ import ( ) // PodResource show pod resource usage -func (c *Calcium) PodResource(ctx context.Context, podname string) (*types.PodResource, error) { +func (c *Calcium) PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error) { logger := log.WithField("Calcium", "PodResource").WithField("podname", podname) nodes, err := c.ListPodNodes(ctx, podname, nil, true) if err != nil { return nil, logger.Err(ctx, err) } - r := &types.PodResource{ - Name: podname, - NodesResource: []*types.NodeResource{}, - } - for _, node := range nodes { - nodeResource, err := c.doGetNodeResource(ctx, node.Name, false) - if err != nil { - return nil, logger.Err(ctx, err) + ch := make(chan *types.NodeResource) + pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency)) + go func() { + defer close(ch) + for _, node := range nodes { + pool.Go(ctx, func() { + nodeResource, err := c.doGetNodeResource(ctx, node.Name, false) + if err != nil { + logger.Err(ctx, err) + nodeResource = &types.NodeResource{ + Name: node.Name, Diffs: []string{err.Error()}, + } + } + ch <- nodeResource + }) } - r.NodesResource = append(r.NodesResource, nodeResource) - } - return r, nil + pool.Wait(ctx) + }() + return ch, nil } // NodeResource check node's workload and resource diff --git a/cluster/cluster.go b/cluster/cluster.go index 7b788f5d3..b99fe2334 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -48,7 +48,7 @@ type Cluster interface { GetPod(ctx context.Context, podname string) (*types.Pod, error) ListPods(ctx context.Context) ([]*types.Pod, error) // pod resource - PodResource(ctx context.Context, podname string) (*types.PodResource, error) + PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error) // meta node AddNode(context.Context, *types.AddNodeOptions) (*types.Node, error) RemoveNode(ctx context.Context, nodename string) error diff --git a/rpc/rpc.go b/rpc/rpc.go index d481652cb..49ad40233 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -144,11 +144,32 @@ func (v *Vibranium) ListPods(ctx context.Context, _ *pb.Empty) (*pb.Pods, error) // GetPodResource get pod nodes resource usage func (v *Vibranium) GetPodResource(ctx context.Context, opts *pb.GetPodOptions) (*pb.PodResource, error) { - r, err := v.cluster.PodResource(ctx, opts.Name) + ctx = v.taskAdd(ctx, "GetPodResource", false) + defer v.taskDone(ctx, "GetPodResource", false) + ch, err := v.cluster.PodResource(ctx, opts.Name) if err != nil { return nil, grpcstatus.Error(PodResource, err.Error()) } - return toRPCPodResource(r), nil + podResource := &pb.PodResource{Name: opts.Name} + for nodeResource := range ch { + podResource.NodesResource = append(podResource.NodesResource, toRPCNodeResource(nodeResource)) + } + return podResource, nil +} + +func (v *Vibranium) PodResourceStream(opts *pb.GetPodOptions, stream pb.CoreRPC_PodResourceStreamServer) error { + ctx := v.taskAdd(stream.Context(), "PodResourceStream", false) + defer v.taskDone(ctx, "PodResourceStream", false) + ch, err := v.cluster.PodResource(ctx, opts.Name) + if err != nil { + return grpcstatus.Error(PodResource, err.Error()) + } + for msg := range ch { + if err := stream.Send(toRPCNodeResource(msg)); err != nil { + v.logUnsentMessages(ctx, "PodResourceStream", err, msg) + } + } + return nil } // AddNode saves a node and returns it to client @@ -204,6 +225,10 @@ func (v *Vibranium) ListPodNodes(ctx context.Context, opts *pb.ListNodesOptions) return &pb.Nodes{Nodes: nodes}, nil } +func (v *Vibranium) PodNodesStream(opts *pb.ListNodesOptions, stream pb.CoreRPC_PodNodesStreamServer) error { + return nil +} + // GetNode get a node func (v *Vibranium) GetNode(ctx context.Context, opts *pb.GetNodeOptions) (*pb.Node, error) { n, err := v.cluster.GetNode(ctx, opts.Nodename) diff --git a/rpc/transform.go b/rpc/transform.go index d9bbdbb74..cd0449256 100644 --- a/rpc/transform.go +++ b/rpc/transform.go @@ -34,17 +34,6 @@ func toRPCPod(p *types.Pod) *pb.Pod { return &pb.Pod{Name: p.Name, Desc: p.Desc} } -func toRPCPodResource(p *types.PodResource) *pb.PodResource { - r := &pb.PodResource{ - Name: p.Name, - NodesResource: []*pb.NodeResource{}, - } - for _, nodeResource := range p.NodesResource { - r.NodesResource = append(r.NodesResource, toRPCNodeResource(nodeResource)) - } - return r -} - func toRPCNetwork(n *enginetypes.Network) *pb.Network { return &pb.Network{Name: n.Name, Subnets: n.Subnets} }