Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream APIs for PodNodes and PodResource #524

Merged
merged 6 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
}

// get node by scheduler
nodes, err := c.ListPodNodes(ctx, c.config.Docker.BuildPod, nil, false)
ch, err := c.ListPodNodes(ctx, &types.ListNodesOptions{Podname: c.config.Docker.BuildPod})
if err != nil {
return nil, err
}

nodes := []*types.Node{}
for n := range ch {
nodes = append(nodes, n)
}
if len(nodes) == 0 {
return nil, errors.WithStack(types.ErrInsufficientNodes)
}
Expand Down
6 changes: 5 additions & 1 deletion cluster/calcium/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
func (c *Calcium) ListNetworks(ctx context.Context, podname string, driver string) ([]*enginetypes.Network, error) {
logger := log.WithField("Calcium", "ListNetworks").WithField("podname", podname).WithField("driver", driver)
networks := []*enginetypes.Network{}
nodes, err := c.ListPodNodes(ctx, podname, nil, false)
ch, err := c.ListPodNodes(ctx, &types.ListNodesOptions{Podname: podname})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是应该分别提供返回chan *types.Node和[]*types.Node的两个方法呢?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没必要吧, cluster 的接口应该正交, 而且内部实现也是流式的, 多个 goroutine 并发去取

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

等泛型可用了加一个 ChanToSlice 的泛型函数就好了

if err != nil {
return networks, logger.Err(ctx, errors.WithStack(err))
}

nodes := []*types.Node{}
for n := range ch {
nodes = append(nodes, n)
}
if len(nodes) == 0 {
return networks, logger.Err(ctx, errors.WithStack(types.NewDetailedErr(types.ErrPodNoNodes, podname)))
}
Expand Down
44 changes: 40 additions & 4 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,37 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
}

// ListPodNodes list nodes belong to pod
func (c *Calcium) ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error) {
nodes, err := c.store.GetNodesByPod(ctx, podname, labels, all)
return nodes, log.WithField("Calcium", "ListPodNodes").WithField("podname", podname).WithField("labels", labels).WithField("all", all).Err(ctx, errors.WithStack(err))
func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions) (<-chan *types.Node, error) {
logger := log.WithField("Calcium", "ListPodNodes").WithField("podname", opts.Podname).WithField("labels", opts.Labels).WithField("all", opts.All).WithField("info", opts.Info)
ch := make(chan *types.Node)
nodes, err := c.store.GetNodesByPod(ctx, opts.Podname, opts.Labels, opts.All)
if err != nil || !opts.Info {
go func() {
defer close(ch)
for _, node := range nodes {
ch <- node
}
}()
return ch, logger.Err(ctx, errors.WithStack(err))
}

pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency))
go func() {
defer close(ch)
for _, node := range nodes {
pool.Go(ctx, func(node *types.Node) func() {
return func() {
err := node.Info(ctx)
if err != nil {
logger.Errorf(ctx, "failed to get node info: %+v", err)
}
ch <- node
}
}(node))
}
pool.Wait(ctx)
}()
return ch, nil
}

// GetNode get node
Expand Down Expand Up @@ -223,10 +251,18 @@ func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*t
return ns, nil
}

listedNodes, err := c.ListPodNodes(ctx, nf.Podname, nf.Labels, nf.All)
ch, err := c.ListPodNodes(ctx, &types.ListNodesOptions{
Podname: nf.Podname,
Labels: nf.Labels,
All: nf.All,
})
if err != nil {
return nil, err
}
listedNodes := []*types.Node{}
for n := range ch {
listedNodes = append(listedNodes, n)
}
if len(nf.Excludes) == 0 {
return listedNodes, nil
}
Expand Down
20 changes: 14 additions & 6 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,24 @@ func TestListPodNodes(t *testing.T) {
store := &storemocks.Store{}
c.store = store
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err := c.ListPodNodes(ctx, "", nil, false)
_, err := c.ListPodNodes(ctx, &types.ListNodesOptions{})
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
ns, err := c.ListPodNodes(ctx, "", nil, false)
ns, err := c.ListPodNodes(ctx, &types.ListNodesOptions{})
nss := []*types.Node{}
for n := range ns {
nss = append(nss, n)
}
assert.NoError(t, err)
assert.Equal(t, len(ns), 2)
assert.Equal(t, ns[0].Name, name1)
ns, err = c.ListPodNodes(ctx, "", nil, true)
assert.Equal(t, len(nss), 2)
assert.Equal(t, nss[0].Name, name1)
ns, err = c.ListPodNodes(ctx, &types.ListNodesOptions{})
assert.NoError(t, err)
assert.Equal(t, len(ns), 2)
cnt := 0
for range ns {
cnt++
}
assert.Equal(t, cnt, 2)
}

func TestGetNode(t *testing.T) {
Expand Down
32 changes: 19 additions & 13 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,30 @@ import (
)

// PodResource show pod resource usage
func (c *Calcium) PodResource(ctx context.Context, podname string) (*types.PodResource, error) {
func (c *Calcium) PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error) {
logger := log.WithField("Calcium", "PodResource").WithField("podname", podname)
nodes, err := c.ListPodNodes(ctx, podname, nil, true)
nodeCh, err := c.ListPodNodes(ctx, &types.ListNodesOptions{Podname: podname, All: true})
if err != nil {
return nil, logger.Err(ctx, err)
}
r := &types.PodResource{
Name: podname,
NodesResource: []*types.NodeResource{},
}
for _, node := range nodes {
nodeResource, err := c.doGetNodeResource(ctx, node.Name, false)
if err != nil {
return nil, logger.Err(ctx, err)
ch := make(chan *types.NodeResource)
pool := utils.NewGoroutinePool(int(c.config.MaxConcurrency))
go func() {
defer close(ch)
for node := range nodeCh {
pool.Go(ctx, func() {
nodeResource, err := c.doGetNodeResource(ctx, node.Name, false)
if err != nil {
nodeResource = &types.NodeResource{
Name: node.Name, Diffs: []string{logger.Err(ctx, err).Error()},
}
}
ch <- nodeResource
})
}
r.NodesResource = append(r.NodesResource, nodeResource)
}
return r, nil
pool.Wait(ctx)
}()
return ch, nil
}

// NodeResource check node's workload and resource
Expand Down
25 changes: 16 additions & 9 deletions cluster/calcium/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ func TestPodResource(t *testing.T) {
store := &storemocks.Store{}
c.store = store
lock := &lockmocks.DistributedLock{}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Unlock", mock.Anything).Return(nil)
// failed by GetNodesByPod
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err := c.PodResource(ctx, podname)
ch, err := c.PodResource(ctx, podname)
assert.Error(t, err)
store.AssertExpectations(t)

// failed by ListNodeWorkloads
node := &types.Node{
NodeMeta: types.NodeMeta{
Name: nodename,
Expand All @@ -49,10 +51,14 @@ func TestPodResource(t *testing.T) {
}
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{node}, nil)
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
// failed by ListNodeWorkloads
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err = c.PodResource(ctx, podname)
assert.Error(t, err)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
ch, err = c.PodResource(ctx, podname)
assert.NoError(t, err)
msg := <-ch
assert.True(t, strings.Contains(msg.Diffs[0], types.ErrNoETCD.Error()))
store.AssertExpectations(t)

workloads := []*types.Workload{
{
ResourceMeta: types.ResourceMeta{
Expand Down Expand Up @@ -84,10 +90,11 @@ func TestPodResource(t *testing.T) {
// success
r, err := c.PodResource(ctx, podname)
assert.NoError(t, err)
assert.Equal(t, r.NodesResource[0].CPUPercent, 0.9)
assert.Equal(t, r.NodesResource[0].MemoryPercent, 0.5)
assert.Equal(t, r.NodesResource[0].StoragePercent, 0.1)
assert.NotEmpty(t, r.NodesResource[0].Diffs)
first := <-r
assert.Equal(t, first.CPUPercent, 0.9)
assert.Equal(t, first.MemoryPercent, 0.5)
assert.Equal(t, first.StoragePercent, 0.1)
assert.NotEmpty(t, first.Diffs)
}

func TestNodeResource(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ type Cluster interface {
GetPod(ctx context.Context, podname string) (*types.Pod, error)
ListPods(ctx context.Context) ([]*types.Pod, error)
// pod resource
PodResource(ctx context.Context, podname string) (*types.PodResource, error)
PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error)
// meta node
AddNode(context.Context, *types.AddNodeOptions) (*types.Node, error)
RemoveNode(ctx context.Context, nodename string) error
ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
ListPodNodes(context.Context, *types.ListNodesOptions) (<-chan *types.Node, error)
GetNode(ctx context.Context, nodename string) (*types.Node, error)
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
SetNodeStatus(ctx context.Context, nodename string, ttl int64) error
Expand Down
28 changes: 14 additions & 14 deletions cluster/mocks/Cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/docker/mocks/APIClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lock/mocks/DistributedLock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions metrics/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
)

// ResourceMiddleware to make sure update resource correct
Expand All @@ -14,11 +15,11 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.TODO(), m.Config.GlobalTimeout)
defer cancel()
nodes, err := cluster.ListPodNodes(ctx, "", nil, true)
nodes, err := cluster.ListPodNodes(ctx, &types.ListNodesOptions{All: true})
if err != nil {
log.Errorf(ctx, "[ResourceMiddleware] Get all nodes err %v", err)
}
for _, node := range nodes {
for node := range nodes {
m.SendNodeInfo(node.Metrics())
}
h.ServeHTTP(w, r)
Expand Down
Loading