Skip to content

Commit

Permalink
[skip ci] refactor stage 1
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jul 13, 2022
1 parent 555c243 commit 4fe8207
Show file tree
Hide file tree
Showing 64 changed files with 3,135 additions and 4,129 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestBuild(t *testing.T) {
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{node}, nil)
// failed by plugin error
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetMostIdleNode", mock.Anything, mock.Anything).Return("", types.ErrBadCount).Once()
ch, err = c.BuildImage(ctx, opts)
assert.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestCalculateCapacity(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
engine := &enginemocks.API{}

// pod1 := &types.Pod{Name: "p1"}
Expand Down
16 changes: 8 additions & 8 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
return c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
nodeNames := []string{}
nodes := []*types.Node{}
for nodeName, node := range nodeMap {
nodeNames = append(nodeNames, nodeName)
for nodename, node := range nodeMap {
nodeNames = append(nodeNames, nodename)
nodes = append(nodes, node)
}

Expand All @@ -117,14 +117,14 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// commit changes
processingCommits = make(map[string]wal.Commit)
for nodeName, deploy := range deployMap {
nodes = append(nodes, nodeMap[nodeName])
if engineArgsMap[nodeName], resourceArgsMap[nodeName], err = c.rmgr.Alloc(ctx, nodeName, deploy, opts.ResourceOpts); err != nil {
for nodename, deploy := range deployMap {
nodes = append(nodes, nodeMap[nodename])
if engineArgsMap[nodename], resourceArgsMap[nodename], err = c.rmgr.Alloc(ctx, nodename, deploy, opts.ResourceOpts); err != nil {
return errors.WithStack(err)
}

processing := opts.GetProcessing(nodeName)
if processingCommits[nodeName], err = c.wal.Log(eventProcessingCreated, processing); err != nil {
processing := opts.GetProcessing(nodename)
if processingCommits[nodename], err = c.wal.Log(eventProcessingCreated, processing); err != nil {
return errors.WithStack(err)
}
if err = c.store.CreateProcessing(ctx, processing, deploy); err != nil {
Expand Down Expand Up @@ -275,7 +275,7 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context,
}

func (c *Calcium) doGetAndPrepareNode(ctx context.Context, nodename, image string) (*types.Node, error) {
node, err := c.GetNode(ctx, nodename, nil)
node, err := c.GetNode(ctx, nodename)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestCreateWorkloadTxn(t *testing.T) {

store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
mwal := &walmocks.WAL{}
c.wal = mwal
var walCommitted bool
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDissociateWorkload(t *testing.T) {
}

store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{c1}, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
// failed by lock
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestRemoveImage(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
// fail by validating
_, err := c.RemoveImage(ctx, &types.ImageOptions{Podname: ""})
assert.Error(t, err)
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestCacheImage(t *testing.T) {
ctx := context.Background()
rmgr := c.rmgr.(*resourcemocks.Manager)
store := c.store.(*storemocks.Store)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
// fail by validating
_, err := c.CacheImage(ctx, &types.ImageOptions{Podname: ""})
assert.Error(t, err)
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
c, _ := newCreateWorkloadCluster(t)

rmgr := &resourcemocks.Manager{}
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(
nil, 0, types.ErrNoETCD,
)
Expand Down Expand Up @@ -192,7 +192,7 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {

store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
map[string]types.NodeResourceArgs{},
map[string]types.NodeResourceArgs{},
[]string{},
Expand Down
10 changes: 2 additions & 8 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
utils.Reverse(ids)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, ids...)
}()
cs, err := c.GetWorkloads(ctx, ids)
cs, err := c.store.GetWorkloads(ctx, ids)
if err != nil {
return err
}
Expand Down Expand Up @@ -161,13 +161,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, genK
locks[key] = lock
lockKeys = append(lockKeys, key)
}

// refresh node
node, err := c.GetNode(ctx, n.Name, nil)
if err != nil {
return err
}
nodes[n.Name] = node
nodes[n.Name] = n
}
return f(ctx, nodes)
}
8 changes: 4 additions & 4 deletions cluster/calcium/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestWithNodesPodLocked(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)

node1 := &types.Node{
NodeMeta: types.NodeMeta{
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestWithNodePodLocked(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)

node1 := &types.Node{
NodeMeta: types.NodeMeta{
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestWithNodesOperationLocked(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)

node1 := &types.Node{
NodeMeta: types.NodeMeta{
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestWithNodeOperationLocked(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)

node1 := &types.Node{
NodeMeta: types.NodeMeta{
Expand Down
15 changes: 9 additions & 6 deletions cluster/calcium/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package calcium

import (
"context"
"fmt"

"github.com/sanity-io/litter"

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)

Expand All @@ -29,19 +29,22 @@ func (c *Calcium) InitMetrics() {
}

// SendNodeMetrics .
func (c *Calcium) SendNodeMetrics(ctx context.Context, nodeName string) {
func (c *Calcium) SendNodeMetrics(ctx context.Context, nodename string) {
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
defer cancel()
node, err := c.GetNode(ctx, nodeName, nil)
fmt.Println(err)
node, err := c.GetNode(ctx, nodename)
if err != nil {
log.Errorf(ctx, "[SendNodeMetrics] get node %s failed, %v", nodeName, err)
log.Errorf(ctx, "[SendNodeMetrics] get node %s failed, %v", nodename, err)
return
}

c.doSendNodeMetrics(ctx, node)
}

func (c *Calcium) doSendNodeMetrics(ctx context.Context, node *types.Node) {
nodeMetrics, err := c.rmgr.ConvertNodeResourceInfoToMetrics(ctx, node.Podname, node.Name, node.ResourceCapacity, node.ResourceUsage)
if err != nil {
log.Errorf(ctx, "[SendNodeMetrics] resolve node %s resource info to metrics failed, %v", nodeName, err)
log.Errorf(ctx, "[SendNodeMetrics] convert node %s resource info to metrics failed, %v", node.Name, err)
return
}
metrics.Client.SendMetrics(nodeMetrics...)
Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestNetwork(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)

store.On("GetNodesByPod", mock.AnythingOfType("*context.emptyCtx"), mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err := c.ListNetworks(ctx, "", "")
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestConnectNetwork(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
engine := &enginemocks.API{}
workload := &types.Workload{Engine: engine}

Expand All @@ -69,7 +69,7 @@ func TestDisConnectNetwork(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
engine := &enginemocks.API{}
workload := &types.Workload{Engine: engine}

Expand Down
Loading

0 comments on commit 4fe8207

Please sign in to comment.