Skip to content

Commit

Permalink
Add stream APIs for PodNodes and PodResource (#524)
Browse files Browse the repository at this point in the history
* add stream APIs for PodNodes related operations

* implement PodresourceStream

* streamize ListPodNodes, get rpc/transform out of extra IO

* implement PodNodesStream

* update unitests

* return node even if failed to fetch info
  • Loading branch information
jschwinger233 authored Dec 29, 2021
1 parent 76ac53f commit 117368c
Show file tree
Hide file tree
Showing 28 changed files with 1,168 additions and 906 deletions.
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
}

// get node by scheduler
nodes, err := c.ListPodNodes(ctx, c.config.Docker.BuildPod, nil, false)
ch, err := c.ListPodNodes(ctx, &types.ListNodesOptions{Podname: c.config.Docker.BuildPod})
if err != nil {
return nil, err
}

nodes := []*types.Node{}
for n := range ch {
nodes = append(nodes, n)
}
if len(nodes) == 0 {
return nil, errors.WithStack(types.ErrInsufficientNodes)
}
Expand Down
6 changes: 5 additions & 1 deletion cluster/calcium/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
func (c *Calcium) ListNetworks(ctx context.Context, podname string, driver string) ([]*enginetypes.Network, error) {
logger := log.WithField("Calcium", "ListNetworks").WithField("podname", podname).WithField("driver", driver)
networks := []*enginetypes.Network{}
nodes, err := c.ListPodNodes(ctx, podname, nil, false)
ch, err := c.ListPodNodes(ctx, &types.ListNodesOptions{Podname: podname})
if err != nil {
return networks, logger.Err(ctx, errors.WithStack(err))
}

nodes := []*types.Node{}
for n := range ch {
nodes = append(nodes, n)
}
if len(nodes) == 0 {
return networks, logger.Err(ctx, errors.WithStack(types.NewDetailedErr(types.ErrPodNoNodes, podname)))
}
Expand Down
44 changes: 40 additions & 4 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,37 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
}

// ListPodNodes list nodes belong to pod
func (c *Calcium) ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error) {
nodes, err := c.store.GetNodesByPod(ctx, podname, labels, all)
return nodes, log.WithField("Calcium", "ListPodNodes").WithField("podname", podname).WithField("labels", labels).WithField("all", all).Err(ctx, errors.WithStack(err))
func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions) (<-chan *types.Node, error) {
logger := log.WithField("Calcium", "ListPodNodes").WithField("podname", opts.Podname).WithField("labels", opts.Labels).WithField("all", opts.All).WithField("info", opts.Info)
ch := make(chan *types.Node)
nodes, err := c.store.GetNodesByPod(ctx, opts.Podname, opts.Labels, opts.All)
if err != nil || !opts.Info {
go func() {
defer close(ch)
for _, node := range nodes {
ch <- node
}
}()
return ch, logger.Err(ctx, errors.WithStack(err))
}

pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency))
go func() {
defer close(ch)
for _, node := range nodes {
pool.Go(ctx, func(node *types.Node) func() {
return func() {
err := node.Info(ctx)
if err != nil {
logger.Errorf(ctx, "failed to get node info: %+v", err)
}
ch <- node
}
}(node))
}
pool.Wait(ctx)
}()
return ch, nil
}

// GetNode get node
Expand Down Expand Up @@ -223,10 +251,18 @@ func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*t
return ns, nil
}

listedNodes, err := c.ListPodNodes(ctx, nf.Podname, nf.Labels, nf.All)
ch, err := c.ListPodNodes(ctx, &types.ListNodesOptions{
Podname: nf.Podname,
Labels: nf.Labels,
All: nf.All,
})
if err != nil {
return nil, err
}
listedNodes := []*types.Node{}
for n := range ch {
listedNodes = append(listedNodes, n)
}
if len(nf.Excludes) == 0 {
return listedNodes, nil
}
Expand Down
20 changes: 14 additions & 6 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,24 @@ func TestListPodNodes(t *testing.T) {
store := &storemocks.Store{}
c.store = store
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err := c.ListPodNodes(ctx, "", nil, false)
_, err := c.ListPodNodes(ctx, &types.ListNodesOptions{})
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
ns, err := c.ListPodNodes(ctx, "", nil, false)
ns, err := c.ListPodNodes(ctx, &types.ListNodesOptions{})
nss := []*types.Node{}
for n := range ns {
nss = append(nss, n)
}
assert.NoError(t, err)
assert.Equal(t, len(ns), 2)
assert.Equal(t, ns[0].Name, name1)
ns, err = c.ListPodNodes(ctx, "", nil, true)
assert.Equal(t, len(nss), 2)
assert.Equal(t, nss[0].Name, name1)
ns, err = c.ListPodNodes(ctx, &types.ListNodesOptions{})
assert.NoError(t, err)
assert.Equal(t, len(ns), 2)
cnt := 0
for range ns {
cnt++
}
assert.Equal(t, cnt, 2)
}

func TestGetNode(t *testing.T) {
Expand Down
32 changes: 19 additions & 13 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,30 @@ import (
)

// PodResource show pod resource usage
func (c *Calcium) PodResource(ctx context.Context, podname string) (*types.PodResource, error) {
func (c *Calcium) PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error) {
logger := log.WithField("Calcium", "PodResource").WithField("podname", podname)
nodes, err := c.ListPodNodes(ctx, podname, nil, true)
nodeCh, err := c.ListPodNodes(ctx, &types.ListNodesOptions{Podname: podname, All: true})
if err != nil {
return nil, logger.Err(ctx, err)
}
r := &types.PodResource{
Name: podname,
NodesResource: []*types.NodeResource{},
}
for _, node := range nodes {
nodeResource, err := c.doGetNodeResource(ctx, node.Name, false)
if err != nil {
return nil, logger.Err(ctx, err)
ch := make(chan *types.NodeResource)
pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency))
go func() {
defer close(ch)
for node := range nodeCh {
pool.Go(ctx, func() {
nodeResource, err := c.doGetNodeResource(ctx, node.Name, false)
if err != nil {
nodeResource = &types.NodeResource{
Name: node.Name, Diffs: []string{logger.Err(ctx, err).Error()},
}
}
ch <- nodeResource
})
}
r.NodesResource = append(r.NodesResource, nodeResource)
}
return r, nil
pool.Wait(ctx)
}()
return ch, nil
}

// NodeResource check node's workload and resource
Expand Down
25 changes: 16 additions & 9 deletions cluster/calcium/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ func TestPodResource(t *testing.T) {
store := &storemocks.Store{}
c.store = store
lock := &lockmocks.DistributedLock{}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Unlock", mock.Anything).Return(nil)
// failed by GetNodesByPod
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err := c.PodResource(ctx, podname)
ch, err := c.PodResource(ctx, podname)
assert.Error(t, err)
store.AssertExpectations(t)

// failed by ListNodeWorkloads
node := &types.Node{
NodeMeta: types.NodeMeta{
Name: nodename,
Expand All @@ -49,10 +51,14 @@ func TestPodResource(t *testing.T) {
}
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{node}, nil)
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
// failed by ListNodeWorkloads
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err = c.PodResource(ctx, podname)
assert.Error(t, err)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
ch, err = c.PodResource(ctx, podname)
assert.NoError(t, err)
msg := <-ch
assert.True(t, strings.Contains(msg.Diffs[0], types.ErrNoETCD.Error()))
store.AssertExpectations(t)

workloads := []*types.Workload{
{
ResourceMeta: types.ResourceMeta{
Expand Down Expand Up @@ -84,10 +90,11 @@ func TestPodResource(t *testing.T) {
// success
r, err := c.PodResource(ctx, podname)
assert.NoError(t, err)
assert.Equal(t, r.NodesResource[0].CPUPercent, 0.9)
assert.Equal(t, r.NodesResource[0].MemoryPercent, 0.5)
assert.Equal(t, r.NodesResource[0].StoragePercent, 0.1)
assert.NotEmpty(t, r.NodesResource[0].Diffs)
first := <-r
assert.Equal(t, first.CPUPercent, 0.9)
assert.Equal(t, first.MemoryPercent, 0.5)
assert.Equal(t, first.StoragePercent, 0.1)
assert.NotEmpty(t, first.Diffs)
}

func TestNodeResource(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ type Cluster interface {
GetPod(ctx context.Context, podname string) (*types.Pod, error)
ListPods(ctx context.Context) ([]*types.Pod, error)
// pod resource
PodResource(ctx context.Context, podname string) (*types.PodResource, error)
PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error)
// meta node
AddNode(context.Context, *types.AddNodeOptions) (*types.Node, error)
RemoveNode(ctx context.Context, nodename string) error
ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
ListPodNodes(context.Context, *types.ListNodesOptions) (<-chan *types.Node, error)
GetNode(ctx context.Context, nodename string) (*types.Node, error)
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
SetNodeStatus(ctx context.Context, nodename string, ttl int64) error
Expand Down
28 changes: 14 additions & 14 deletions cluster/mocks/Cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/docker/mocks/APIClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lock/mocks/DistributedLock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions metrics/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
)

// ResourceMiddleware to make sure update resource correct
Expand All @@ -14,11 +15,11 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.TODO(), m.Config.GlobalTimeout)
defer cancel()
nodes, err := cluster.ListPodNodes(ctx, "", nil, true)
nodes, err := cluster.ListPodNodes(ctx, &types.ListNodesOptions{All: true})
if err != nil {
log.Errorf(ctx, "[ResourceMiddleware] Get all nodes err %v", err)
}
for _, node := range nodes {
for node := range nodes {
m.SendNodeInfo(node.Metrics())
}
h.ServeHTTP(w, r)
Expand Down
Loading

0 comments on commit 117368c

Please sign in to comment.