Skip to content

Commit

Permalink
use interface in cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jul 6, 2022
1 parent 3d8654b commit d5d4cf0
Show file tree
Hide file tree
Showing 25 changed files with 980 additions and 536 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mock: deps
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn
mockery --dir rpc/gen/ --output rpc/mocks --name CoreRPC_RunAndWaitServer
mockery --dir resources --output resources/mocks --name Plugin
mockery --dir resources --output resources/mocks --all

.ONESHELL:

Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *Calcium) getMostIdleNode(ctx context.Context, nodes []*types.Node) (*ty
nodeMap[node.Name] = node
}

mostIdleNode, err := c.resource.GetMostIdleNode(ctx, nodeNames)
mostIdleNode, err := c.rmgr.GetMostIdleNode(ctx, nodeNames)
if err != nil {
return nil, err
}
Expand Down
18 changes: 8 additions & 10 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/projecteru2/core/source/gitlab"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"

"github.com/pkg/errors"
)
Expand All @@ -25,10 +26,10 @@ import (
type Calcium struct {
config types.Config
store store.Store
resource *resources.PluginManager
rmgr resources.Manager
source source.Source
watcher discovery.Service
wal *WAL
wal wal.WAL
identifier string
}

Expand Down Expand Up @@ -62,7 +63,7 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
watcher := helium.New(config.GRPCConfig, store)

// set resource plugin manager
resource, err := resources.NewPluginManager(config)
rmgr, err := resources.NewPluginsManager(config)
if err != nil {
return nil, logger.ErrWithTracing(ctx, errors.WithStack(err))
}
Expand All @@ -73,23 +74,20 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
log.Errorf(ctx, "[NewPluginManager] new cpumem plugin error: %v", err)
return nil, err
}
resource.AddPlugins(cpumem)

// load volume plugin
volume, err := volume.NewPlugin(config)
if err != nil {
log.Errorf(ctx, "[NewPluginManager] new volume plugin error: %v", err)
return nil, err
}
resource.AddPlugins(volume)
rmgr.AddPlugins(cpumem, volume)
// load binary plugins
if err = resource.LoadPlugins(ctx); err != nil {
if err = rmgr.LoadPlugins(ctx); err != nil {
return nil, err
}

cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, resource: resource}
cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, rmgr: rmgr}

cal.wal, err = newWAL(config, cal)
cal.wal, err = enableWAL(config, cal, store)
if err != nil {
return nil, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
}
} else {
var infos map[string]*resources.NodeCapacityInfo
infos, msg.Total, err = c.resource.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts)
infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts)
if err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
return err
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
processingCommits = make(map[string]wal.Commit)
for nodeName, deploy := range deployMap {
nodes = append(nodes, nodeMap[nodeName])
if engineArgsMap[nodeName], resourceArgsMap[nodeName], err = c.resource.Alloc(ctx, nodeName, deploy, opts.ResourceOpts); err != nil {
if engineArgsMap[nodeName], resourceArgsMap[nodeName], err = c.rmgr.Alloc(ctx, nodeName, deploy, opts.ResourceOpts); err != nil {
return errors.WithStack(err)
}

Expand Down Expand Up @@ -151,7 +151,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
resourceArgsToRollback := utils.Map(rollbackIndices, func(idx int) map[string]types.WorkloadResourceArgs {
return resourceArgsMap[nodename][idx]
})
return c.resource.RollbackAlloc(ctx, nodename, resourceArgsToRollback)
return c.rmgr.RollbackAlloc(ctx, nodename, resourceArgsToRollback)
}); e != nil {
err = logger.ErrWithTracing(ctx, e)
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
for plugin, args := range workload.ResourceArgs {
resourceArgs[plugin] = args
}
_, _, err = c.resource.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr)
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr)
return errors.WithStack(err)
},
// then
Expand All @@ -54,7 +54,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
for plugin, args := range workload.ResourceArgs {
resourceArgs[plugin] = args
}
_, _, err = c.resource.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr)
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr)
return errors.WithStack(err)
},
c.config.GlobalTimeout,
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (c *Calcium) InitMetrics() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

metricsDescriptions, err := c.resource.GetMetricsDescription(ctx)
metricsDescriptions, err := c.rmgr.GetMetricsDescription(ctx)
if err != nil {
log.Errorf(ctx, "[InitMetrics] failed to get metrics description, err: %v", err)
return
Expand All @@ -37,7 +37,7 @@ func (c *Calcium) SendNodeMetrics(ctx context.Context, nodeName string) {
return
}

nodeMetrics, err := c.resource.ResolveNodeResourceInfoToMetrics(ctx, node.Podname, node.Name, node.ResourceCapacity, node.ResourceUsage)
nodeMetrics, err := c.rmgr.ConvertNodeResourceInfoToMetrics(ctx, node.Podname, node.Name, node.ResourceCapacity, node.ResourceUsage)
if err != nil {
log.Errorf(ctx, "[SendNodeMetrics] resolve node %s resource info to metrics failed, %v", nodeName, err)
return
Expand Down
12 changes: 6 additions & 6 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
ctx,
// if: add node resource with resource plugins
func(ctx context.Context) error {
resourceCapacity, resourceUsage, err = c.resource.AddNode(ctx, opts.Nodename, opts.ResourceOpts, nodeInfo)
resourceCapacity, resourceUsage, err = c.rmgr.AddNode(ctx, opts.Nodename, opts.ResourceOpts, nodeInfo)
return errors.WithStack(err)
},
// then: add node meta in store
Expand All @@ -59,7 +59,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
if failureByCond {
return nil
}
return errors.WithStack(c.resource.RemoveNode(ctx, opts.Nodename))
return errors.WithStack(c.rmgr.RemoveNode(ctx, opts.Nodename))
},
c.config.GlobalTimeout),
)
Expand Down Expand Up @@ -87,7 +87,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
},
// then: remove node resource metadata
func(ctx context.Context) error {
return errors.WithStack(c.resource.RemoveNode(ctx, nodename))
return errors.WithStack(c.rmgr.RemoveNode(ctx, nodename))
},
// rollback: do nothing
func(ctx context.Context, failureByCond bool) error {
Expand Down Expand Up @@ -208,7 +208,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
return nil
}

originNodeResourceCapacity, _, err = c.resource.SetNodeResourceCapacity(ctx, n.Name, opts.ResourceOpts, nil, opts.Delta, resources.Incr)
originNodeResourceCapacity, _, err = c.rmgr.SetNodeResourceCapacity(ctx, n.Name, opts.ResourceOpts, nil, opts.Delta, resources.Incr)
return errors.WithStack(err)
},
// then: update node metadata
Expand All @@ -227,7 +227,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
if len(opts.ResourceOpts) == 0 {
return nil
}
_, _, err = c.resource.SetNodeResourceCapacity(ctx, n.Name, nil, originNodeResourceCapacity, false, resources.Decr)
_, _, err = c.rmgr.SetNodeResourceCapacity(ctx, n.Name, nil, originNodeResourceCapacity, false, resources.Decr)
return errors.WithStack(err)
},
c.config.GlobalTimeout,
Expand All @@ -236,7 +236,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}

func (c *Calcium) getNodeResourceInfo(ctx context.Context, node *types.Node, plugins []string) (err error) {
if node.ResourceCapacity, node.ResourceUsage, _, err = c.resource.GetNodeResourceInfo(ctx, node.Name, nil, false, plugins); err != nil {
if node.ResourceCapacity, node.ResourceUsage, _, err = c.rmgr.GetNodeResourceInfo(ctx, node.Name, nil, false, plugins); err != nil {
log.Errorf(ctx, "[getNodeResourceInfo] failed to get node resource info for node %v, err: %v", node.Name, err)
return errors.WithStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa
func(ctx context.Context) error {
// note here will change the node resource meta (stored in resource plugin)
// todo: add wal here
engineArgs, deltaResourceArgs, resourceArgs, err = c.resource.Realloc(ctx, workload.Nodename, workload.ResourceArgs, opts.ResourceOpts)
engineArgs, deltaResourceArgs, resourceArgs, err = c.rmgr.Realloc(ctx, workload.Nodename, workload.ResourceArgs, opts.ResourceOpts)
if err != nil {
return err
}
Expand All @@ -60,7 +60,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa
if failureByCond {
return nil
}
if err := c.resource.RollbackRealloc(ctx, workload.Nodename, deltaResourceArgs); err != nil {
if err := c.rmgr.RollbackRealloc(ctx, workload.Nodename, deltaResourceArgs); err != nil {
log.Errorf(ctx, "[doReallocOnNode] failed to rollback workload %v, resource args %v, engine args %v, err %v", workload.ID, litter.Sdump(resourceArgs), litter.Sdump(engineArgs), err)
// don't return here, so the node resource can still be fixed
}
Expand Down
9 changes: 7 additions & 2 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool)
for plugin, args := range workload.ResourceArgs {
resourceArgs[plugin] = args
}
_, _, err = c.resource.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr)
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr)
return errors.WithStack(err)
},
// then
Expand All @@ -65,7 +65,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool)
for plugin, args := range workload.ResourceArgs {
resourceArgs[plugin] = args
}
_, _, err = c.resource.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr)
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr)
return errors.WithStack(err)
},
c.config.GlobalTimeout,
Expand All @@ -91,6 +91,11 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool)
return ch, nil
}

// RemoveWorkloadSync .
func (c *Calcium) RemoveWorkloadSync(ctx context.Context, ids []string) error {
return c.doRemoveWorkloadSync(ctx, ids)
}

// semantic: instance removed on err == nil, instance remained on err != nil
func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload, force bool) error {
return utils.Txn(
Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo
}

// TODO: percentage?
resourceCapacity, resourceUsage, diffs, err := c.resource.GetNodeResourceInfo(ctx, nodename, workloads, fix, nil)
resourceCapacity, resourceUsage, diffs, err := c.rmgr.GetNodeResourceInfo(ctx, nodename, workloads, fix, nil)
if err != nil {
log.Errorf(ctx, "[doGetNodeResource] failed to get node resource, node %v, err: %v", nodename, err)
return err
Expand All @@ -94,7 +94,7 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo

func (c *Calcium) doGetDeployMap(ctx context.Context, nodes []string, opts *types.DeployOptions) (map[string]int, error) {
// get nodes with capacity > 0
nodeResourceInfoMap, total, err := c.resource.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts)
nodeResourceInfoMap, total, err := c.rmgr.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts)
if err != nil {
log.Errorf(ctx, "[doGetDeployMap] failed to select available nodes, nodes %v, err %v", nodes, err)
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -146,7 +146,7 @@ func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch chan
workloadMap[workload.ID] = workload
}

engineArgsMap, err := c.resource.GetRemapArgs(ctx, node.Name, workloadMap)
engineArgsMap, err := c.rmgr.GetRemapArgs(ctx, node.Name, workloadMap)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d5d4cf0

Please sign in to comment.