Skip to content

Commit

Permalink
make resource great again
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Feb 3, 2023
1 parent d8ece2c commit 4c2b8fb
Show file tree
Hide file tree
Showing 29 changed files with 1,725 additions and 1,934 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *Calcium) getMostIdleNode(ctx context.Context, nodes []*types.Node) (*ty
nodeMap[node.Name] = node
}

mostIdleNode, err := c.rmgr.GetMostIdleNode(ctx, nodenames)
mostIdleNode, err := c.rmgr2.GetMostIdleNode(ctx, nodenames)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestBuild(t *testing.T) {
}
store.On("GetNodesByPod", mock.Anything, mock.Anything).Return([]*types.Node{node}, nil)
// failed by plugin error
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr := c.rmgr2.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
rmgr.On("GetMostIdleNode", mock.Anything, mock.Anything).Return("", types.ErrInvaildCount).Once()
ch, err = c.BuildImage(ctx, opts)
Expand Down
2 changes: 0 additions & 2 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/resource3"
"github.com/projecteru2/core/resource3/cobalt"
""
"github.com/projecteru2/core/source"
"github.com/projecteru2/core/source/github"
"github.com/projecteru2/core/source/gitlab"
Expand All @@ -26,7 +25,6 @@ import (
type Calcium struct {
config types.Config
store store.Store
rmgr resources.Manager
rmgr2 resource3.Manager
source source.Source
watcher discovery.Service
Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"

"github.com/projecteru2/core/log"
""
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/sanity-io/litter"
"golang.org/x/exp/maps"

"github.com/cockroachdb/errors"
plugintypes "github.com/projecteru2/core/resource3/plugins/types"
)

// CalculateCapacity calculates capacity
Expand Down Expand Up @@ -38,8 +38,8 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
return nil
}

var infos map[string]*resources.NodeCapacityInfo
infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodenames, opts.ResourceOpts)
var infos map[string]*plugintypes.NodeDeployCapacity
infos, msg.Total, err = c.rmgr2.GetNodesDeployCapacity(ctx, nodenames, opts.Resources)
if err != nil {
logger.Error(ctx, err, "failed to get nodes capacity")
return err
Expand Down
45 changes: 19 additions & 26 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
var (
deployMap map[string]int
rollbackMap map[string][]int
// map[node][]engineArgs
engineArgsMap = map[string][]types.EngineArgs{}
// map[node][]map[plugin]resourceArgs
resourceArgsMap = map[string][]map[string]types.WorkloadResourceArgs{}
// map[nodename][]Resources
engineParamsMap = map[string][]*types.Resources{}
// map[nodename][]Resources
workloadResourcesMap = map[string][]*types.Resources{}
)

_ = c.pool.Invoke(func() {
Expand Down Expand Up @@ -122,7 +122,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
processingCommits = make(map[string]wal.Commit)
for nodename, deploy := range deployMap {
nodes = append(nodes, nodeMap[nodename])
if engineArgsMap[nodename], resourceArgsMap[nodename], err = c.rmgr.Alloc(ctx, nodename, deploy, opts.ResourceOpts); err != nil {
if engineParamsMap[nodename], workloadResourcesMap[nodename], err = c.rmgr2.Alloc(ctx, nodename, deploy, opts.Resources); err != nil {
return err
}

Expand All @@ -140,7 +140,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// then: deploy workloads
func(ctx context.Context) (err error) {
rollbackMap, err = c.doDeployWorkloads(ctx, ch, opts, engineArgsMap, resourceArgsMap, deployMap)
rollbackMap, err = c.doDeployWorkloads(ctx, ch, opts, engineParamsMap, workloadResourcesMap, deployMap)
return err
},

Expand All @@ -151,10 +151,10 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
}
for nodename, rollbackIndices := range rollbackMap {
if e := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
resourceArgsToRollback := utils.Map(rollbackIndices, func(idx int) map[string]types.WorkloadResourceArgs {
return resourceArgsMap[nodename][idx]
rollbackResources := utils.Map(rollbackIndices, func(idx int) *types.Resources {
return workloadResourcesMap[nodename][idx]
})
return c.rmgr.RollbackAlloc(ctx, nodename, resourceArgsToRollback)
return c.rmgr2.RollbackAlloc(ctx, nodename, rollbackResources)
}); e != nil {
logger.Error(ctx, e)
err = e
Expand All @@ -173,8 +173,8 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
func (c *Calcium) doDeployWorkloads(ctx context.Context,
ch chan *types.CreateWorkloadMessage,
opts *types.DeployOptions,
engineArgsMap map[string][]types.EngineArgs,
resourceArgsMap map[string][]map[string]types.WorkloadResourceArgs,
engineParamsMap map[string][]*types.Resources,
workloadResourcesMap map[string][]*types.Resources,
deployMap map[string]int) (_ map[string][]int, err error) {

wg := sync.WaitGroup{}
Expand All @@ -193,7 +193,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context,
_ = c.pool.Invoke(func(nodename string, deploy, seq int) func() {
return func() {
defer wg.Done()
if indices, err := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, engineArgsMap[nodename], resourceArgsMap[nodename], seq); err != nil {
if indices, err := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, engineParamsMap[nodename], workloadResourcesMap[nodename], seq); err != nil {
syncRollbackMap.Set(nodename, indices)
}
}
Expand All @@ -220,8 +220,8 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context,
nodename string,
opts *types.DeployOptions,
deploy int,
engineArgs []types.EngineArgs,
resourceArgs []map[string]types.WorkloadResourceArgs,
engineParams []*types.Resources,
workloadResources []*types.Resources,
seq int) (indices []int, err error) {

logger := log.WithFunc("calcium.doDeployWorkloadsOnNode").WithField("node", nodename).WithField("ident", opts.ProcessIdent).WithField("deploy", deploy).WithField("seq", seq)
Expand Down Expand Up @@ -260,11 +260,8 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context,
ch <- createMsg
}()

createMsg.EngineArgs = engineArgs[idx]
createMsg.ResourceArgs = map[string]types.WorkloadResourceArgs{}
for k, v := range resourceArgs[idx] {
createMsg.ResourceArgs[k] = v
}
createMsg.EngineParams = engineParams[idx]
createMsg.Resources = workloadResources[idx]

createOpts := c.doMakeWorkloadOptions(ctx, seq+idx, createMsg, opts, node)
e = c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, true)
Expand Down Expand Up @@ -300,8 +297,8 @@ func (c *Calcium) doDeployOneWorkload(
) (err error) {
logger := log.WithFunc("calcium.doDeployWorkload").WithField("node", node.Name).WithField("ident", opts.ProcessIdent).WithField("msg", msg)
workload := &types.Workload{
ResourceArgs: types.ResourceMeta{},
EngineArgs: msg.EngineArgs,
Resources: msg.Resources,
EngineParams: msg.EngineParams,
Name: config.Name,
Labels: config.Labels,
Podname: opts.Podname,
Expand All @@ -314,10 +311,6 @@ func (c *Calcium) doDeployOneWorkload(
User: opts.User,
CreateTime: time.Now().Unix(),
}
// copy resource args
for k, v := range msg.ResourceArgs {
workload.ResourceArgs[k] = v
}

var commit wal.Commit
defer func() {
Expand Down Expand Up @@ -446,7 +439,7 @@ func (c *Calcium) doDeployOneWorkload(
func (c *Calcium) doMakeWorkloadOptions(ctx context.Context, no int, msg *types.CreateWorkloadMessage, opts *types.DeployOptions, node *types.Node) *enginetypes.VirtualizationCreateOptions {
config := &enginetypes.VirtualizationCreateOptions{}
// general
config.EngineArgs = msg.EngineArgs
config.EngineParams = msg.EngineParams
config.RawArgs = opts.RawArgs
config.Lambda = opts.Lambda
config.User = opts.User
Expand Down
14 changes: 3 additions & 11 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

"github.com/projecteru2/core/log"
""
"github.com/projecteru2/core/resource3/plugins"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down Expand Up @@ -32,11 +32,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t
ctx,
// if
func(ctx context.Context) (err error) {
resourceArgs := map[string]types.WorkloadResourceArgs{}
for plugin, args := range workload.ResourceArgs {
resourceArgs[plugin] = args
}
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr)
_, _, err = c.rmgr2.SetNodeResourceUsage(ctx, node.Name, nil, nil, []*types.Resources{workload.Resources}, true, plugins.Decr)
return err
},
// then
Expand All @@ -48,11 +44,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t
if failedByCond {
return nil
}
resourceArgs := map[string]types.WorkloadResourceArgs{}
for plugin, args := range workload.ResourceArgs {
resourceArgs[plugin] = args
}
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr)
_, _, err = c.rmgr2.SetNodeResourceUsage(ctx, node.Name, nil, nil, []*types.Resources{workload.Resources}, true, plugins.Incr)
return err
},
c.config.GlobalTimeout,
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// InitMetrics .
func (c *Calcium) InitMetrics(ctx context.Context) {
logger := log.WithFunc("calcium.InitMetrics")
metricsDescriptions, err := c.rmgr.GetMetricsDescription(ctx)
metricsDescriptions, err := c.rmgr2.GetMetricsDescription(ctx)
if err != nil {
logger.Error(ctx, err, "failed to get metrics description")
return
Expand All @@ -26,7 +26,7 @@ func (c *Calcium) InitMetrics(ctx context.Context) {
}

func (c *Calcium) doSendNodeMetrics(ctx context.Context, node *types.Node) {
nodeMetrics, err := c.rmgr.GetNodeMetrics(ctx, node)
nodeMetrics, err := c.rmgr2.GetNodeMetrics(ctx, node)
if err != nil {
log.WithFunc("calcium.doSendNodeMetrics").Errorf(ctx, err, "convert node %s resource info to metrics failed", node.Name)
return
Expand Down
39 changes: 17 additions & 22 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
enginefactory "github.com/projecteru2/core/engine/factory"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
""
"github.com/projecteru2/core/resource3/plugins"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -20,7 +20,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
logger.Error(ctx, err)
return nil, err
}
var resourceCapacity map[string]*types.RawNodeResource
var res *types.Resources
var node *types.Node
var err error

Expand All @@ -39,7 +39,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
ctx,
// if: add node resource with resource plugins
func(ctx context.Context) error {
resourceCapacity, err = c.rmgr2.AddNode(ctx, opts.Nodename, opts.ResourceOptions, nodeInfo)
res, err = c.rmgr2.AddNode(ctx, opts.Nodename, opts.Resources, nodeInfo)
return err
},
// then: add node meta in store
Expand All @@ -48,7 +48,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
if err != nil {
return err
}
node.Resource.Capacity = resourceCapacity
node.Resource.Capacity = res
_ = c.pool.Invoke(func() { c.doSendNodeMetrics(ctx, node) })
return nil
},
Expand All @@ -57,7 +57,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
if failureByCond {
return nil
}
return c.rmgr.RemoveNode(ctx, opts.Nodename)
return c.rmgr2.RemoveNode(ctx, opts.Nodename)
},
c.config.GlobalTimeout)
}
Expand Down Expand Up @@ -88,7 +88,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
},
// then: remove node resource metadata
func(ctx context.Context) error {
return c.rmgr.RemoveNode(ctx, nodename)
return c.rmgr2.RemoveNode(ctx, nodename)
},
// rollback: do nothing
func(ctx context.Context, failureByCond bool) error {
Expand Down Expand Up @@ -119,7 +119,7 @@ func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions
_ = c.pool.Invoke(func() {
defer wg.Done()
var err error
if node.Resource.Capacity, node.Resource.Usage, node.Resource.Diffs, err = c.rmgr.GetNodeResourceInfo(ctx, node.Name, nil, false); err != nil {
if node.Resource.Capacity, node.Resource.Usage, node.Resource.Diffs, err = c.rmgr2.GetNodeResourceInfo(ctx, node.Name, nil, false); err != nil {
logger.Errorf(ctx, err, "failed to get node %+v resource info", node.Name)
}
if opts.CallInfo {
Expand Down Expand Up @@ -147,7 +147,7 @@ func (c *Calcium) GetNode(ctx context.Context, nodename string) (node *types.Nod
logger.Error(ctx, err)
return nil, err
}
if node.Resource.Capacity, node.Resource.Usage, node.Resource.Diffs, err = c.rmgr.GetNodeResourceInfo(ctx, node.Name, nil, false); err != nil {
if node.Resource.Capacity, node.Resource.Usage, node.Resource.Diffs, err = c.rmgr2.GetNodeResourceInfo(ctx, node.Name, nil, false); err != nil {
logger.Error(ctx, err)
return nil, err
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
logger.Info(ctx, "set node")
// update resource map
var err error
node.Resource.Capacity, node.Resource.Usage, node.Resource.Diffs, err = c.rmgr.GetNodeResourceInfo(ctx, node.Name, nil, false)
node.Resource.Capacity, node.Resource.Usage, node.Resource.Diffs, err = c.rmgr2.GetNodeResourceInfo(ctx, node.Name, nil, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -212,15 +212,15 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
n.Labels = opts.Labels
}

var originNodeResourceCapacity map[string]*types.NodeResourceSettings
var origin *types.Resources
if len((*opts.Resources)) == 0 {
return nil
}

return utils.Txn(ctx,
// if: update node resource capacity success
func(ctx context.Context) error {
if len(opts.ResourceOpts) == 0 {
return nil
}

originNodeResourceCapacity, _, err = c.rmgr.SetNodeResourceCapacity(ctx, n.Name, opts.ResourceOpts, nil, opts.Delta, resources.Incr)
origin, _, err = c.rmgr2.SetNodeResourceCapacity(ctx, n.Name, opts.Resources, nil, opts.Delta, plugins.Incr)
return err
},
// then: update node metadata
Expand All @@ -230,9 +230,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}
// update resource
// actually we can ignore err here, if update success
if len(opts.ResourceOpts) != 0 {
n.Resource.Capacity, n.Resource.Usage, n.Resource.Diffs, _ = c.rmgr.GetNodeResourceInfo(ctx, node.Name, nil, false)
}
n.Resource.Capacity, n.Resource.Usage, n.Resource.Diffs, _ = c.rmgr2.GetNodeResourceInfo(ctx, node.Name, nil, false)
// use send to update the usage
_ = c.pool.Invoke(func() { c.doSendNodeMetrics(ctx, n) })
return nil
Expand All @@ -242,10 +240,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
if failureByCond {
return nil
}
if len(opts.ResourceOpts) == 0 {
return nil
}
_, _, err = c.rmgr.SetNodeResourceCapacity(ctx, n.Name, nil, originNodeResourceCapacity, false, resources.Decr)
_, _, err = c.rmgr2.SetNodeResourceCapacity(ctx, n.Name, nil, origin, false, plugins.Decr)
return err
},
c.config.GlobalTimeout)
Expand Down
13 changes: 4 additions & 9 deletions cluster/calcium/remap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,18 @@ func (c *Calcium) doRemapResource(ctx context.Context, node *types.Node) (ch cha
return
}

workloadMap := map[string]*types.Workload{}
for _, workload := range workloads {
workloadMap[workload.ID] = workload
}

engineArgsMap, err := c.rmgr.GetRemapArgs(ctx, node.Name, workloadMap)
engineParamsMap, err := c.rmgr2.Remap(ctx, node.Name, workloads)
if err != nil {
return nil, err
}

ch = make(chan *remapMsg, len(engineArgsMap))
ch = make(chan *remapMsg, len(engineParamsMap))
_ = c.pool.Invoke(func() {
defer close(ch)
for workloadID, engineArgs := range engineArgsMap {
for workloadID, engineParams := range engineParamsMap {
ch <- &remapMsg{
ID: workloadID,
err: node.Engine.VirtualizationUpdateResource(ctx, workloadID, &enginetypes.VirtualizationResource{EngineArgs: engineArgs}),
err: node.Engine.VirtualizationUpdateResource(ctx, workloadID, &enginetypes.VirtualizationResource{EngineParams: engineParams}),
}
}
})
Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/projecteru2/core/log"
""
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down
Loading

0 comments on commit 4c2b8fb

Please sign in to comment.