Skip to content

Commit

Permalink
[skip ci] sync with iops control
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jul 5, 2022
1 parent 9485b58 commit 870c7d9
Show file tree
Hide file tree
Showing 35 changed files with 957 additions and 740 deletions.
13 changes: 8 additions & 5 deletions cluster/calcium/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
func TestBuild(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
opts := &types.BuildOptions{
Name: "xx",
BuildMethod: types.BuildFromSCM,
Expand Down Expand Up @@ -60,6 +64,10 @@ func TestBuild(t *testing.T) {
assert.Error(t, err)
// failed by buildpod not set
c = NewTestCluster()
plugin = c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
_, err = c.BuildImage(ctx, opts)
assert.Error(t, err)
c.config.Docker.BuildPod = "test"
Expand All @@ -84,17 +92,12 @@ func TestBuild(t *testing.T) {
}
store.On("GetNodesByPod", mock.AnythingOfType("*context.emptyCtx"), mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{node}, nil)

plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)

// failed by plugin error
plugin.On("GetMostIdleNode", mock.Anything, mock.Anything).Return(nil, types.ErrGetMostIdleNodeFailed).Once()
ch, err = c.BuildImage(ctx, opts)
assert.Error(t, err)

plugin.On("GetMostIdleNode", mock.Anything, mock.Anything).Return(&resources.GetMostIdleNodeResponse{NodeName: node.Name, Priority: 100}, nil)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
// create image
c.config.Docker.Hub = "test.com"
c.config.Docker.Namespace = "test"
Expand Down
4 changes: 4 additions & 0 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
return nil, err
}
resource.AddPlugins(volume)
// load binary plugins
if err = resource.LoadPlugins(context.TODO()); err != nil {
return nil, err
}

cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, resource: resource}

Expand Down
2 changes: 2 additions & 0 deletions cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/projecteru2/core/resources"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/sanity-io/litter"

"github.com/pkg/errors"
)
Expand All @@ -15,6 +16,7 @@ import (
func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptions) (*types.CapacityMessage, error) {
logger := log.WithField("Calcium", "CalculateCapacity").WithField("opts", opts)
var err error
log.Infof(ctx, "[CalculateCapacity] Calculate capacity with options:\n%s", litter.Options{Compact: true}.Sdump(opts))0
msg := &types.CapacityMessage{
Total: 0,
NodeCapacities: map[string]int{},
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context,
}

func (c *Calcium) doGetAndPrepareNode(ctx context.Context, nodename, image string) (*types.Node, error) {
node, err := c.GetNode(ctx, nodename)
node, err := c.GetNode(ctx, nodename, nil)
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions cluster/calcium/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"

enginemocks "github.com/projecteru2/core/engine/mocks"
"github.com/projecteru2/core/resources"
resourcemocks "github.com/projecteru2/core/resources/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"

Expand All @@ -16,6 +18,10 @@ import (

func TestRemoveImage(t *testing.T) {
c := NewTestCluster()
plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
ctx := context.Background()
store := &storemocks.Store{}
c.store = store
Expand Down Expand Up @@ -62,6 +68,10 @@ func TestRemoveImage(t *testing.T) {

func TestCacheImage(t *testing.T) {
c := NewTestCluster()
plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
ctx := context.Background()
store := &storemocks.Store{}
c.store = store
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, genK
}

// refresh node
node, err := c.GetNode(ctx, n.Name)
node, err := c.GetNode(ctx, n.Name, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (c *Calcium) InitMetrics() {
func (c *Calcium) SendNodeMetrics(ctx context.Context, nodeName string) {
ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
defer cancel()
node, err := c.GetNode(ctx, nodeName)
node, err := c.GetNode(ctx, nodeName, nil)
if err != nil {
log.Errorf(ctx, "[SendNodeMetrics] get node %s failed, %v", nodeName, err)
return
Expand Down
14 changes: 14 additions & 0 deletions cluster/calcium/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/resources"
resourcemocks "github.com/projecteru2/core/resources/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"

Expand All @@ -15,6 +17,10 @@ import (

func TestNetwork(t *testing.T) {
c := NewTestCluster()
plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
ctx := context.Background()
store := &storemocks.Store{}
c.store = store
Expand Down Expand Up @@ -46,6 +52,10 @@ func TestNetwork(t *testing.T) {

func TestConnectNetwork(t *testing.T) {
c := NewTestCluster()
plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
ctx := context.Background()
store := &storemocks.Store{}
c.store = store
Expand All @@ -63,6 +73,10 @@ func TestConnectNetwork(t *testing.T) {

func TestDisConnectNetwork(t *testing.T) {
c := NewTestCluster()
plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
ctx := context.Background()
store := &storemocks.Store{}
c.store = store
Expand Down
21 changes: 11 additions & 10 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"sort"

enginefactory "github.com/projecteru2/core/engine/factory"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/resources"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

enginetypes "github.com/projecteru2/core/engine/types"

"github.com/pkg/errors"
)

Expand Down Expand Up @@ -108,6 +107,9 @@ func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions
go func() {
defer close(ch)
for _, node := range nodes {
if err := c.getNodeResourceInfo(ctx, node, nil); err != nil {
logger.Errorf(ctx, "failed to get node %v resource info: %+v", node.Name, err)
}
ch <- node
}
}()
Expand All @@ -120,11 +122,10 @@ func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions
for _, node := range nodes {
pool.Go(ctx, func(node *types.Node) func() {
return func() {
err := node.Info(ctx)
if err != nil {
if err := node.Info(ctx); err != nil {
logger.Errorf(ctx, "failed to get node %v info: %+v", node.Name, err)
}
if err := c.getNodeResourceInfo(ctx, node); err != nil {
if err := c.getNodeResourceInfo(ctx, node, nil); err != nil {
logger.Errorf(ctx, "failed to get node %v resource info: %+v", node.Name, err)
}
ch <- node
Expand All @@ -137,15 +138,15 @@ func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions
}

// GetNode get node
func (c *Calcium) GetNode(ctx context.Context, nodename string) (node *types.Node, err error) {
func (c *Calcium) GetNode(ctx context.Context, nodename string, plugins []string) (node *types.Node, err error) {
logger := log.WithField("Calcium", "GetNode").WithField("nodename", nodename)
if nodename == "" {
return nil, logger.Err(ctx, errors.WithStack(types.ErrEmptyNodeName))
}
if node, err = c.store.GetNode(ctx, nodename); err != nil {
return nil, logger.Err(ctx, errors.WithStack(err))
}
if err = c.getNodeResourceInfo(ctx, node); err != nil {
if err = c.getNodeResourceInfo(ctx, node, plugins); err != nil {
return nil, logger.Err(ctx, errors.WithStack(err))
}
return node, nil
Expand Down Expand Up @@ -234,8 +235,8 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
})
}

func (c *Calcium) getNodeResourceInfo(ctx context.Context, node *types.Node) (err error) {
if node.ResourceCapacity, node.ResourceUsage, _, err = c.resource.GetNodeResourceInfo(ctx, node.Name, nil, false); err != nil {
func (c *Calcium) getNodeResourceInfo(ctx context.Context, node *types.Node, plugins []string) (err error) {
if node.ResourceCapacity, node.ResourceUsage, _, err = c.resource.GetNodeResourceInfo(ctx, node.Name, nil, false, plugins); err != nil {
log.Errorf(ctx, "[getNodeResourceInfo] failed to get node resource info for node %v, err: %v", node.Name, err)
return errors.WithStack(err)
}
Expand Down Expand Up @@ -292,7 +293,7 @@ func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*t

if len(nf.Includes) != 0 {
for _, nodename := range nf.Includes {
node, err := c.GetNode(ctx, nodename)
node, err := c.GetNode(ctx, nodename, nil)
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func TestRemoveNode(t *testing.T) {

func TestListPodNodes(t *testing.T) {
c := NewTestCluster()
plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
ResourceInfo: &resources.NodeResourceInfo{},
}, nil)
ctx := context.Background()
name1 := "test1"
name2 := "test2"
Expand Down Expand Up @@ -152,7 +156,7 @@ func TestGetNode(t *testing.T) {
}, nil)

// fail by validating
_, err := c.GetNode(ctx, "")
_, err := c.GetNode(ctx, "", nil)
assert.Error(t, err)

name := "test"
Expand All @@ -164,7 +168,7 @@ func TestGetNode(t *testing.T) {
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
c.store = store

n, err := c.GetNode(ctx, name)
n, err := c.GetNode(ctx, name, nil)
assert.NoError(t, err)
assert.Equal(t, n.Name, name)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo
}

// TODO: percentage?
resourceCapacity, resourceUsage, diffs, err := c.resource.GetNodeResourceInfo(ctx, nodename, workloads, fix)
resourceCapacity, resourceUsage, diffs, err := c.resource.GetNodeResourceInfo(ctx, nodename, workloads, fix, nil)
if err != nil {
log.Errorf(ctx, "[doGetNodeResource] failed to get node resource, node %v, err: %v", nodename, err)
return err
Expand Down
3 changes: 1 addition & 2 deletions cluster/calcium/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import (

// WatchServiceStatus returns chan of available service address
func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) {
ch := make(chan types.ServiceStatus)
id := c.watcher.Subscribe(ch)
id, ch := c.watcher.Subscribe(ctx)
utils.SentryGo(func() {
<-ctx.Done()
c.watcher.Unsubscribe(id)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (er
}

// workload meta doesn't exist
node, err := h.calcium.GetNode(ctx, wrk.Nodename)
node, err := h.calcium.GetNode(ctx, wrk.Nodename, nil)
if err != nil {
return logger.Err(ctx, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Calcium) getWorkloadNode(ctx context.Context, id string) (*types.Node,
if err != nil {
return nil, err
}
node, err := c.GetNode(ctx, w.Nodename)
node, err := c.GetNode(ctx, w.Nodename, nil)
return node, err
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Cluster interface {
AddNode(context.Context, *types.AddNodeOptions) (*types.Node, error)
RemoveNode(ctx context.Context, nodename string) error
ListPodNodes(context.Context, *types.ListNodesOptions) (<-chan *types.Node, error)
GetNode(ctx context.Context, nodename string) (*types.Node, error)
GetNode(ctx context.Context, nodename string, plugins []string) (*types.Node, error)
GetNodeEngine(ctx context.Context, nodename string) (*enginetypes.Info, error)
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
SetNodeStatus(ctx context.Context, nodename string, ttl int64) error
Expand Down
4 changes: 3 additions & 1 deletion discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package discovery

import (
"context"

"github.com/projecteru2/core/types"

"github.com/google/uuid"
)

// Service .
type Service interface {
Subscribe(ch chan<- types.ServiceStatus) uuid.UUID
Subscribe(ctx context.Context) (uuid.UUID, <-chan types.ServiceStatus)
Unsubscribe(id uuid.UUID)
}
Loading

0 comments on commit 870c7d9

Please sign in to comment.