Skip to content

Commit

Permalink
implement PodresourceStream
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Dec 23, 2021
1 parent a9e814c commit 533dbbc
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 15 deletions.
31 changes: 19 additions & 12 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,31 @@ import (
)

// PodResource show pod resource usage
func (c *Calcium) PodResource(ctx context.Context, podname string) (*types.PodResource, error) {
func (c *Calcium) PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error) {
logger := log.WithField("Calcium", "PodResource").WithField("podname", podname)
nodes, err := c.ListPodNodes(ctx, podname, nil, true)
if err != nil {
return nil, logger.Err(ctx, err)
}
r := &types.PodResource{
Name: podname,
NodesResource: []*types.NodeResource{},
}
for _, node := range nodes {
nodeResource, err := c.doGetNodeResource(ctx, node.Name, false)
if err != nil {
return nil, logger.Err(ctx, err)
ch := make(chan *types.NodeResource)
pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency))
go func() {
defer close(ch)
for _, node := range nodes {
pool.Go(ctx, func() {
nodeResource, err := c.doGetNodeResource(ctx, node.Name, false)
if err != nil {
logger.Err(ctx, err)
nodeResource = &types.NodeResource{
Name: node.Name, Diffs: []string{err.Error()},
}
}
ch <- nodeResource
})
}
r.NodesResource = append(r.NodesResource, nodeResource)
}
return r, nil
pool.Wait(ctx)
}()
return ch, nil
}

// NodeResource check node's workload and resource
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Cluster interface {
GetPod(ctx context.Context, podname string) (*types.Pod, error)
ListPods(ctx context.Context) ([]*types.Pod, error)
// pod resource
PodResource(ctx context.Context, podname string) (*types.PodResource, error)
PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error)
// meta node
AddNode(context.Context, *types.AddNodeOptions) (*types.Node, error)
RemoveNode(ctx context.Context, nodename string) error
Expand Down
27 changes: 25 additions & 2 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,30 @@ func (v *Vibranium) ListPods(ctx context.Context, _ *pb.Empty) (*pb.Pods, error)

// GetPodResource get pod nodes resource usage
func (v *Vibranium) GetPodResource(ctx context.Context, opts *pb.GetPodOptions) (*pb.PodResource, error) {
r, err := v.cluster.PodResource(ctx, opts.Name)
ch, err := v.cluster.PodResource(ctx, opts.Name)
if err != nil {
return nil, grpcstatus.Error(PodResource, err.Error())
}
return toRPCPodResource(r), nil
podResource := &pb.PodResource{Name: opts.Name}
for nodeResource := range ch {
podResource.NodesResource = append(podResource.NodesResource, toRPCNodeResource(nodeResource))
}
return podResource, nil
}

func (v *Vibranium) PodResourceStream(opts *pb.GetPodOptions, stream pb.CoreRPC_PodResourceStreamServer) error {
ctx := v.taskAdd(stream.Context(), "PodResourceStream", false)
defer v.taskDone(ctx, "PodResourceStream", false)
ch, err := v.cluster.PodResource(ctx, opts.Name)
if err != nil {
return grpcstatus.Error(PodResource, err.Error())
}
for msg := range ch {
if err := stream.Send(toRPCNodeResource(msg)); err != nil {
v.logUnsentMessages(ctx, "PodResourceStream", err, msg)
}
}
return nil
}

// AddNode saves a node and returns it to client
Expand Down Expand Up @@ -204,6 +223,10 @@ func (v *Vibranium) ListPodNodes(ctx context.Context, opts *pb.ListNodesOptions)
return &pb.Nodes{Nodes: nodes}, nil
}

func (v *Vibranium) PodNodesStream(opts *pb.ListNodesOptions, stream pb.CoreRPC_PodNodesStreamServer) error {
return nil
}

// GetNode get a node
func (v *Vibranium) GetNode(ctx context.Context, opts *pb.GetNodeOptions) (*pb.Node, error) {
n, err := v.cluster.GetNode(ctx, opts.Nodename)
Expand Down

0 comments on commit 533dbbc

Please sign in to comment.