From d5d4cf060d7dfde1276638db419ee519330d191d Mon Sep 17 00:00:00 2001 From: CMGS Date: Wed, 6 Jul 2022 19:49:07 +0800 Subject: [PATCH] use interface in cluster --- Makefile | 2 +- cluster/calcium/build.go | 2 +- cluster/calcium/calcium.go | 18 +- cluster/calcium/capacity.go | 2 +- cluster/calcium/create.go | 4 +- cluster/calcium/dissociate.go | 4 +- cluster/calcium/metrics.go | 4 +- cluster/calcium/node.go | 12 +- cluster/calcium/realloc.go | 4 +- cluster/calcium/remove.go | 9 +- cluster/calcium/resource.go | 6 +- cluster/calcium/wal.go | 59 ++-- cluster/cluster.go | 1 + cluster/mocks/Cluster.go | 14 + resources/binary.go | 234 ++++++------- resources/cpumem/cpumem.go | 8 +- resources/cpumem/models/metrics.go | 2 +- resources/manager.go | 546 ++++++++++++----------------- resources/mocks/Manager.go | 406 +++++++++++++++++++++ resources/mocks/Plugin.go | 46 +-- resources/plugin.go | 33 +- resources/plugins.go | 82 +++++ resources/types.go | 8 +- resources/volume/models/metrics.go | 2 +- resources/volume/volume.go | 8 +- 25 files changed, 980 insertions(+), 536 deletions(-) create mode 100644 resources/mocks/Manager.go create mode 100644 resources/plugins.go diff --git a/Makefile b/Makefile index 4f797ef11..18d33c40a 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ mock: deps mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn mockery --dir rpc/gen/ --output rpc/mocks --name CoreRPC_RunAndWaitServer - mockery --dir resources --output resources/mocks --name Plugin + mockery --dir resources --output resources/mocks --all .ONESHELL: diff --git a/cluster/calcium/build.go b/cluster/calcium/build.go index 193a9e06e..1a400e421 100644 --- a/cluster/calcium/build.go +++ b/cluster/calcium/build.go @@ -86,7 +86,7 @@ func (c *Calcium) getMostIdleNode(ctx context.Context, nodes []*types.Node) (*ty nodeMap[node.Name] = node } - mostIdleNode, err := c.resource.GetMostIdleNode(ctx, nodeNames) + mostIdleNode, err := c.rmgr.GetMostIdleNode(ctx, nodeNames) if err != nil { return nil, err } diff --git a/cluster/calcium/calcium.go b/cluster/calcium/calcium.go index 59147cd63..713e3f100 100644 --- a/cluster/calcium/calcium.go +++ b/cluster/calcium/calcium.go @@ -17,6 +17,7 @@ import ( "github.com/projecteru2/core/source/gitlab" "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" + "github.com/projecteru2/core/wal" "github.com/pkg/errors" ) @@ -25,10 +26,10 @@ import ( type Calcium struct { config types.Config store store.Store - resource *resources.PluginManager + rmgr resources.Manager source source.Source watcher discovery.Service - wal *WAL + wal wal.WAL identifier string } @@ -62,7 +63,7 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro watcher := helium.New(config.GRPCConfig, store) // set resource plugin manager - resource, err := resources.NewPluginManager(config) + rmgr, err := resources.NewPluginsManager(config) if err != nil { return nil, logger.ErrWithTracing(ctx, errors.WithStack(err)) } @@ -73,23 +74,20 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro log.Errorf(ctx, "[NewPluginManager] new cpumem plugin error: %v", err) return nil, err } - resource.AddPlugins(cpumem) - - // load volume plugin volume, err := volume.NewPlugin(config) if err != nil { log.Errorf(ctx, "[NewPluginManager] new volume plugin error: %v", err) return nil, err } - resource.AddPlugins(volume) + rmgr.AddPlugins(cpumem, volume) // load binary plugins - if err = resource.LoadPlugins(ctx); err != nil { + if err = rmgr.LoadPlugins(ctx); err != nil { return nil, err } - cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, resource: resource} + cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, rmgr: rmgr} - cal.wal, err = newWAL(config, cal) + cal.wal, err = enableWAL(config, cal, store) if err != nil { return nil, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint } diff --git a/cluster/calcium/capacity.go b/cluster/calcium/capacity.go index da52b4f48..56168a2f6 100644 --- a/cluster/calcium/capacity.go +++ b/cluster/calcium/capacity.go @@ -39,7 +39,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio } } else { var infos map[string]*resources.NodeCapacityInfo - infos, msg.Total, err = c.resource.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts) + infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts) if err != nil { logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err) return err diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index 45e802228..e98687f77 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -119,7 +119,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio processingCommits = make(map[string]wal.Commit) for nodeName, deploy := range deployMap { nodes = append(nodes, nodeMap[nodeName]) - if engineArgsMap[nodeName], resourceArgsMap[nodeName], err = c.resource.Alloc(ctx, nodeName, deploy, opts.ResourceOpts); err != nil { + if engineArgsMap[nodeName], resourceArgsMap[nodeName], err = c.rmgr.Alloc(ctx, nodeName, deploy, opts.ResourceOpts); err != nil { return errors.WithStack(err) } @@ -151,7 +151,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio resourceArgsToRollback := utils.Map(rollbackIndices, func(idx int) map[string]types.WorkloadResourceArgs { return resourceArgsMap[nodename][idx] }) - return c.resource.RollbackAlloc(ctx, nodename, resourceArgsToRollback) + return c.rmgr.RollbackAlloc(ctx, nodename, resourceArgsToRollback) }); e != nil { err = logger.ErrWithTracing(ctx, e) } diff --git a/cluster/calcium/dissociate.go b/cluster/calcium/dissociate.go index 4c0f288ee..60d9323f7 100644 --- a/cluster/calcium/dissociate.go +++ b/cluster/calcium/dissociate.go @@ -38,7 +38,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t for plugin, args := range workload.ResourceArgs { resourceArgs[plugin] = args } - _, _, err = c.resource.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr) + _, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr) return errors.WithStack(err) }, // then @@ -54,7 +54,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t for plugin, args := range workload.ResourceArgs { resourceArgs[plugin] = args } - _, _, err = c.resource.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr) + _, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr) return errors.WithStack(err) }, c.config.GlobalTimeout, diff --git a/cluster/calcium/metrics.go b/cluster/calcium/metrics.go index 15e4f515e..6453cdf5f 100644 --- a/cluster/calcium/metrics.go +++ b/cluster/calcium/metrics.go @@ -15,7 +15,7 @@ func (c *Calcium) InitMetrics() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - metricsDescriptions, err := c.resource.GetMetricsDescription(ctx) + metricsDescriptions, err := c.rmgr.GetMetricsDescription(ctx) if err != nil { log.Errorf(ctx, "[InitMetrics] failed to get metrics description, err: %v", err) return @@ -37,7 +37,7 @@ func (c *Calcium) SendNodeMetrics(ctx context.Context, nodeName string) { return } - nodeMetrics, err := c.resource.ResolveNodeResourceInfoToMetrics(ctx, node.Podname, node.Name, node.ResourceCapacity, node.ResourceUsage) + nodeMetrics, err := c.rmgr.ConvertNodeResourceInfoToMetrics(ctx, node.Podname, node.Name, node.ResourceCapacity, node.ResourceUsage) if err != nil { log.Errorf(ctx, "[SendNodeMetrics] resolve node %s resource info to metrics failed, %v", nodeName, err) return diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index dc29aeff8..7f1aa46d8 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -40,7 +40,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ ctx, // if: add node resource with resource plugins func(ctx context.Context) error { - resourceCapacity, resourceUsage, err = c.resource.AddNode(ctx, opts.Nodename, opts.ResourceOpts, nodeInfo) + resourceCapacity, resourceUsage, err = c.rmgr.AddNode(ctx, opts.Nodename, opts.ResourceOpts, nodeInfo) return errors.WithStack(err) }, // then: add node meta in store @@ -59,7 +59,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ if failureByCond { return nil } - return errors.WithStack(c.resource.RemoveNode(ctx, opts.Nodename)) + return errors.WithStack(c.rmgr.RemoveNode(ctx, opts.Nodename)) }, c.config.GlobalTimeout), ) @@ -87,7 +87,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error { }, // then: remove node resource metadata func(ctx context.Context) error { - return errors.WithStack(c.resource.RemoveNode(ctx, nodename)) + return errors.WithStack(c.rmgr.RemoveNode(ctx, nodename)) }, // rollback: do nothing func(ctx context.Context, failureByCond bool) error { @@ -208,7 +208,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ return nil } - originNodeResourceCapacity, _, err = c.resource.SetNodeResourceCapacity(ctx, n.Name, opts.ResourceOpts, nil, opts.Delta, resources.Incr) + originNodeResourceCapacity, _, err = c.rmgr.SetNodeResourceCapacity(ctx, n.Name, opts.ResourceOpts, nil, opts.Delta, resources.Incr) return errors.WithStack(err) }, // then: update node metadata @@ -227,7 +227,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ if len(opts.ResourceOpts) == 0 { return nil } - _, _, err = c.resource.SetNodeResourceCapacity(ctx, n.Name, nil, originNodeResourceCapacity, false, resources.Decr) + _, _, err = c.rmgr.SetNodeResourceCapacity(ctx, n.Name, nil, originNodeResourceCapacity, false, resources.Decr) return errors.WithStack(err) }, c.config.GlobalTimeout, @@ -236,7 +236,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ } func (c *Calcium) getNodeResourceInfo(ctx context.Context, node *types.Node, plugins []string) (err error) { - if node.ResourceCapacity, node.ResourceUsage, _, err = c.resource.GetNodeResourceInfo(ctx, node.Name, nil, false, plugins); err != nil { + if node.ResourceCapacity, node.ResourceUsage, _, err = c.rmgr.GetNodeResourceInfo(ctx, node.Name, nil, false, plugins); err != nil { log.Errorf(ctx, "[getNodeResourceInfo] failed to get node resource info for node %v, err: %v", node.Name, err) return errors.WithStack(err) } diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index e3d42a331..f384ae5be 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -42,7 +42,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa func(ctx context.Context) error { // note here will change the node resource meta (stored in resource plugin) // todo: add wal here - engineArgs, deltaResourceArgs, resourceArgs, err = c.resource.Realloc(ctx, workload.Nodename, workload.ResourceArgs, opts.ResourceOpts) + engineArgs, deltaResourceArgs, resourceArgs, err = c.rmgr.Realloc(ctx, workload.Nodename, workload.ResourceArgs, opts.ResourceOpts) if err != nil { return err } @@ -60,7 +60,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa if failureByCond { return nil } - if err := c.resource.RollbackRealloc(ctx, workload.Nodename, deltaResourceArgs); err != nil { + if err := c.rmgr.RollbackRealloc(ctx, workload.Nodename, deltaResourceArgs); err != nil { log.Errorf(ctx, "[doReallocOnNode] failed to rollback workload %v, resource args %v, engine args %v, err %v", workload.ID, litter.Sdump(resourceArgs), litter.Sdump(engineArgs), err) // don't return here, so the node resource can still be fixed } diff --git a/cluster/calcium/remove.go b/cluster/calcium/remove.go index 51c1a8209..343bedabc 100644 --- a/cluster/calcium/remove.go +++ b/cluster/calcium/remove.go @@ -46,7 +46,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) for plugin, args := range workload.ResourceArgs { resourceArgs[plugin] = args } - _, _, err = c.resource.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr) + _, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Decr) return errors.WithStack(err) }, // then @@ -65,7 +65,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) for plugin, args := range workload.ResourceArgs { resourceArgs[plugin] = args } - _, _, err = c.resource.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr) + _, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, resources.Incr) return errors.WithStack(err) }, c.config.GlobalTimeout, @@ -91,6 +91,11 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) return ch, nil } +// RemoveWorkloadSync . +func (c *Calcium) RemoveWorkloadSync(ctx context.Context, ids []string) error { + return c.doRemoveWorkloadSync(ctx, ids) +} + // semantic: instance removed on err == nil, instance remained on err != nil func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload, force bool) error { return utils.Txn( diff --git a/cluster/calcium/resource.go b/cluster/calcium/resource.go index c5cb0ebac..debd89f8d 100644 --- a/cluster/calcium/resource.go +++ b/cluster/calcium/resource.go @@ -71,7 +71,7 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo } // TODO: percentage? - resourceCapacity, resourceUsage, diffs, err := c.resource.GetNodeResourceInfo(ctx, nodename, workloads, fix, nil) + resourceCapacity, resourceUsage, diffs, err := c.rmgr.GetNodeResourceInfo(ctx, nodename, workloads, fix, nil) if err != nil { log.Errorf(ctx, "[doGetNodeResource] failed to get node resource, node %v, err: %v", nodename, err) return err @@ -94,7 +94,7 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo func (c *Calcium) doGetDeployMap(ctx context.Context, nodes []string, opts *types.DeployOptions) (map[string]int, error) { // get nodes with capacity > 0 - nodeResourceInfoMap, total, err := c.resource.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts) + nodeResourceInfoMap, total, err := c.rmgr.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts) if err != nil { log.Errorf(ctx, "[doGetDeployMap] failed to select available nodes, nodes %v, err %v", nodes, err) return nil, errors.WithStack(err) @@ -146,7 +146,7 @@ func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch chan workloadMap[workload.ID] = workload } - engineArgsMap, err := c.resource.GetRemapArgs(ctx, node.Name, workloadMap) + engineArgsMap, err := c.rmgr.GetRemapArgs(ctx, node.Name, workloadMap) if err != nil { return nil, err } diff --git a/cluster/calcium/wal.go b/cluster/calcium/wal.go index c22e74d3d..c23056b96 100644 --- a/cluster/calcium/wal.go +++ b/cluster/calcium/wal.go @@ -6,7 +6,9 @@ import ( "time" "github.com/pkg/errors" + "github.com/projecteru2/core/cluster" "github.com/projecteru2/core/log" + "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" "github.com/projecteru2/core/wal" @@ -19,44 +21,31 @@ const ( eventProcessingCreated = "create-processing" // processing created but yet to delete ) -// WAL for calcium. -type WAL struct { - wal.WAL - calcium *Calcium -} - -func newWAL(config types.Config, calcium *Calcium) (*WAL, error) { +func enableWAL(config types.Config, calcium cluster.Cluster, stor store.Store) (wal.WAL, error) { hydro, err := wal.NewHydro(config.WALFile, config.WALOpenTimeout) if err != nil { return nil, err } - w := &WAL{ - WAL: hydro, - calcium: calcium, - } - - w.registerHandlers() - return w, nil -} - -func (w *WAL) registerHandlers() { - w.Register(newCreateLambdaHandler(w.calcium)) - w.Register(newCreateWorkloadHandler(w.calcium)) - w.Register(newWorkloadResourceAllocatedHandler(w.calcium)) - w.Register(newProcessingCreatedHandler(w.calcium)) + hydro.Register(newCreateLambdaHandler(calcium, stor)) + hydro.Register(newCreateWorkloadHandler(calcium, stor)) + hydro.Register(newWorkloadResourceAllocatedHandler(calcium, stor)) + hydro.Register(newProcessingCreatedHandler(calcium, stor)) + return hydro, nil } // CreateLambdaHandler indicates event handler for creating lambda. type CreateLambdaHandler struct { typ string - calcium *Calcium + calcium cluster.Cluster + stor store.Store } -func newCreateLambdaHandler(calcium *Calcium) *CreateLambdaHandler { +func newCreateLambdaHandler(calcium cluster.Cluster, stor store.Store) *CreateLambdaHandler { return &CreateLambdaHandler{ typ: eventCreateLambda, calcium: calcium, + stor: stor, } } @@ -108,7 +97,7 @@ func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error logger.Errorf(ctx, "Run failed: %s", r.Message) } - if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil { + if err := h.calcium.RemoveWorkloadSync(ctx, []string{workloadID}); err != nil { logger.Errorf(ctx, "Remove failed: %+v", err) } logger.Infof(ctx, "waited and removed") @@ -120,13 +109,15 @@ func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error // CreateWorkloadHandler indicates event handler for creating workload. type CreateWorkloadHandler struct { typ string - calcium *Calcium + calcium cluster.Cluster + stor store.Store } -func newCreateWorkloadHandler(calcium *Calcium) *CreateWorkloadHandler { +func newCreateWorkloadHandler(calcium cluster.Cluster, stor store.Store) *CreateWorkloadHandler { return &CreateWorkloadHandler{ typ: eventWorkloadCreated, calcium: calcium, + stor: stor, } } @@ -169,7 +160,7 @@ func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (er defer cancel() if _, err = h.calcium.GetWorkload(ctx, wrk.ID); err == nil { - return h.calcium.doRemoveWorkloadSync(ctx, []string{wrk.ID}) + return h.calcium.RemoveWorkloadSync(ctx, []string{wrk.ID}) } // workload meta doesn't exist @@ -188,13 +179,15 @@ func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (er // WorkloadResourceAllocatedHandler . type WorkloadResourceAllocatedHandler struct { typ string - calcium *Calcium + calcium cluster.Cluster + stor store.Store } -func newWorkloadResourceAllocatedHandler(calcium *Calcium) *WorkloadResourceAllocatedHandler { +func newWorkloadResourceAllocatedHandler(calcium cluster.Cluster, stor store.Store) *WorkloadResourceAllocatedHandler { return &WorkloadResourceAllocatedHandler{ typ: eventWorkloadResourceAllocated, calcium: calcium, + stor: stor, } } @@ -256,13 +249,15 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter // ProcessingCreatedHandler . type ProcessingCreatedHandler struct { typ string - calcium *Calcium + calcium cluster.Cluster + stor store.Store } -func newProcessingCreatedHandler(calcium *Calcium) *ProcessingCreatedHandler { +func newProcessingCreatedHandler(calcium cluster.Cluster, stor store.Store) *ProcessingCreatedHandler { return &ProcessingCreatedHandler{ typ: eventProcessingCreated, calcium: calcium, + stor: stor, } } @@ -302,7 +297,7 @@ func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{}) ctx, cancel := getReplayContext(ctx) defer cancel() - if err = h.calcium.store.DeleteProcessing(ctx, processing); err != nil { + if err = h.stor.DeleteProcessing(ctx, processing); err != nil { return logger.ErrWithTracing(ctx, err) } logger.Infof(ctx, "obsolete processing deleted") diff --git a/cluster/cluster.go b/cluster/cluster.go index b12154838..f22fffac7 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -86,6 +86,7 @@ type Cluster interface { CreateWorkload(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateWorkloadMessage, error) ReplaceWorkload(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceWorkloadMessage, error) RemoveWorkload(ctx context.Context, ids []string, force bool) (chan *types.RemoveWorkloadMessage, error) + RemoveWorkloadSync(ctx context.Context, ids []string) error DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error) ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorkloadOptions, inCh <-chan []byte) chan *types.AttachWorkloadMessage diff --git a/cluster/mocks/Cluster.go b/cluster/mocks/Cluster.go index 368606e3f..90e1b048a 100644 --- a/cluster/mocks/Cluster.go +++ b/cluster/mocks/Cluster.go @@ -767,6 +767,20 @@ func (_m *Cluster) RemoveWorkload(ctx context.Context, ids []string, force bool) return r0, r1 } +// RemoveWorkloadSync provides a mock function with given fields: ctx, ids +func (_m *Cluster) RemoveWorkloadSync(ctx context.Context, ids []string) error { + ret := _m.Called(ctx, ids) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []string) error); ok { + r0 = rf(ctx, ids) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // ReplaceWorkload provides a mock function with given fields: ctx, opts func (_m *Cluster) ReplaceWorkload(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceWorkloadMessage, error) { ret := _m.Called(ctx, opts) diff --git a/resources/binary.go b/resources/binary.go index 0193b5811..2120301a5 100644 --- a/resources/binary.go +++ b/resources/binary.go @@ -21,96 +21,6 @@ type BinaryPlugin struct { config coretypes.Config } -func (bp *BinaryPlugin) getArgs(req interface{}) []string { - t := reflect.TypeOf(req) - if t.Kind() != reflect.Struct { - return nil - } - v := reflect.ValueOf(req) - args := []string{} - - for i := 0; i < t.NumField(); i++ { - fieldType := t.Field(i).Type - fieldValue := v.Field(i).Interface() - jsonTag := t.Field(i).Tag.Get("json") - - switch { - case fieldType.Kind() == reflect.Map: - if v.Field(i).IsZero() { - break - } - body, err := json.Marshal(fieldValue) - if err != nil { - break - } - args = append(args, "--"+jsonTag, string(body)) - case fieldType.Kind() == reflect.Slice: - for j := 0; j < v.Field(i).Len(); j++ { - if v.Field(i).Index(j).Kind() == reflect.Map { - body, err := json.Marshal(v.Field(i).Index(j).Interface()) - if err != nil { - break - } - args = append(args, "--"+jsonTag, string(body)) - } else { - args = append(args, "--"+jsonTag, fmt.Sprintf("%v", v.Field(i).Index(j).Interface())) - } - } - case fieldType.Kind() == reflect.Bool: - if fieldValue.(bool) { - args = append(args, "--"+jsonTag) - } - default: - args = append(args, "--"+jsonTag, fmt.Sprintf("%v", fieldValue)) - } - } - return args -} - -func (bp *BinaryPlugin) execCommand(cmd *exec.Cmd) (output, log string, err error) { - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - err = cmd.Run() - output = stdout.String() - log = stderr.String() - if err != nil { - err = fmt.Errorf("err: %v, output: %v, log: %v", err, output, log) - } - return output, log, err -} - -// calls the plugin and gets json response -func (bp *BinaryPlugin) call(ctx context.Context, cmd string, req interface{}, resp interface{}) error { - ctx, cancel := context.WithTimeout(ctx, bp.config.ResourcePluginsTimeout) - defer cancel() - - args := bp.getArgs(req) - args = append([]string{cmd}, args...) - command := exec.CommandContext(ctx, bp.path, args...) // nolint: gosec - command.Dir = bp.config.ResourcePluginsDir - log.Infof(ctx, "[callBinaryPlugin] command: %s %s", bp.path, strings.Join(args, " ")) - pluginOutput, pluginLog, err := bp.execCommand(command) - - defer log.Infof(ctx, "[callBinaryPlugin] log from plugin %s: %s", bp.path, pluginLog) - defer log.Infof(ctx, "[callBinaryPlugin] output from plugin %s: %s", bp.path, pluginOutput) - - if err != nil { - log.Errorf(ctx, "[callBinaryPlugin] failed to run plugin %s, command %v, err %s", bp.path, args, err) - return err - } - - if len(pluginOutput) == 0 { - pluginOutput = "{}" - } - if err := json.Unmarshal([]byte(pluginOutput), resp); err != nil { - log.Errorf(ctx, "[callBinaryPlugin] failed to unmarshal output of plugin %s, command %v, output %s, err %s", bp.path, args, pluginOutput, err) - return err - } - return nil -} - // GetNodesDeployCapacity . func (bp *BinaryPlugin) GetNodesDeployCapacity(ctx context.Context, nodes []string, resourceOpts coretypes.WorkloadResourceOpts) (resp *GetNodesDeployCapacityResponse, err error) { req := GetNodesDeployCapacityRequest{ @@ -122,24 +32,6 @@ func (bp *BinaryPlugin) GetNodesDeployCapacity(ctx context.Context, nodes []stri return resp, err } -func (bp *BinaryPlugin) getNodeResourceInfo(ctx context.Context, nodeName string, workloads []*coretypes.Workload, fix bool) (resp *GetNodeResourceInfoResponse, err error) { - workloadMap := map[string]coretypes.WorkloadResourceArgs{} - for _, workload := range workloads { - workloadMap[workload.ID] = workload.ResourceArgs[bp.Name()] - } - - req := GetNodeResourceInfoRequest{ - NodeName: nodeName, - WorkloadMap: workloadMap, - Fix: fix, - } - resp = &GetNodeResourceInfoResponse{} - if err = bp.call(ctx, getNodeResourceInfoCommand, req, resp); err != nil { - return nil, err - } - return resp, nil -} - // GetNodeResourceInfo . func (bp *BinaryPlugin) GetNodeResourceInfo(ctx context.Context, nodeName string, workloads []*coretypes.Workload) (resp *GetNodeResourceInfoResponse, err error) { return bp.getNodeResourceInfo(ctx, nodeName, workloads, false) @@ -271,11 +163,6 @@ func (bp *BinaryPlugin) GetMostIdleNode(ctx context.Context, nodeNames []string) return resp, bp.call(ctx, getMostIdleNodeCommand, req, resp) } -// Name . -func (bp *BinaryPlugin) Name() string { - return path.Base(bp.path) -} - // GetMetricsDescription . func (bp *BinaryPlugin) GetMetricsDescription(ctx context.Context) (*GetMetricsDescriptionResponse, error) { req := GetMetricsDescriptionRequest{} @@ -283,14 +170,127 @@ func (bp *BinaryPlugin) GetMetricsDescription(ctx context.Context) (*GetMetricsD return resp, bp.call(ctx, getMetricsDescriptionCommand, req, resp) } -// ResolveNodeResourceInfoToMetrics . -func (bp *BinaryPlugin) ResolveNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, nodeResourceInfo *NodeResourceInfo) (*ResolveNodeResourceInfoToMetricsResponse, error) { - req := ResolveNodeResourceInfoToMetricsRequest{ +// ConvertNodeResourceInfoToMetrics . +func (bp *BinaryPlugin) ConvertNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, nodeResourceInfo *NodeResourceInfo) (*ConvertNodeResourceInfoToMetricsResponse, error) { + req := ConvertNodeResourceInfoToMetricsRequest{ PodName: podName, NodeName: nodeName, Capacity: nodeResourceInfo.Capacity, Usage: nodeResourceInfo.Usage, } - resp := &ResolveNodeResourceInfoToMetricsResponse{} + resp := &ConvertNodeResourceInfoToMetricsResponse{} return resp, bp.call(ctx, resolveNodeResourceInfoToMetricsCommand, req, resp) } + +// Name . +func (bp *BinaryPlugin) Name() string { + return path.Base(bp.path) +} + +func (bp *BinaryPlugin) getArgs(req interface{}) []string { + t := reflect.TypeOf(req) + if t.Kind() != reflect.Struct { + return nil + } + v := reflect.ValueOf(req) + args := []string{} + + for i := 0; i < t.NumField(); i++ { + fieldType := t.Field(i).Type + fieldValue := v.Field(i).Interface() + jsonTag := t.Field(i).Tag.Get("json") + + switch { + case fieldType.Kind() == reflect.Map: + if v.Field(i).IsZero() { + break + } + body, err := json.Marshal(fieldValue) + if err != nil { + break + } + args = append(args, "--"+jsonTag, string(body)) + case fieldType.Kind() == reflect.Slice: + for j := 0; j < v.Field(i).Len(); j++ { + if v.Field(i).Index(j).Kind() == reflect.Map { + body, err := json.Marshal(v.Field(i).Index(j).Interface()) + if err != nil { + break + } + args = append(args, "--"+jsonTag, string(body)) + } else { + args = append(args, "--"+jsonTag, fmt.Sprintf("%v", v.Field(i).Index(j).Interface())) + } + } + case fieldType.Kind() == reflect.Bool: + if fieldValue.(bool) { + args = append(args, "--"+jsonTag) + } + default: + args = append(args, "--"+jsonTag, fmt.Sprintf("%v", fieldValue)) + } + } + return args +} + +func (bp *BinaryPlugin) execCommand(cmd *exec.Cmd) (output, log string, err error) { + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err = cmd.Run() + output = stdout.String() + log = stderr.String() + if err != nil { + err = fmt.Errorf("err: %v, output: %v, log: %v", err, output, log) + } + return output, log, err +} + +// calls the plugin and gets json response +func (bp *BinaryPlugin) call(ctx context.Context, cmd string, req interface{}, resp interface{}) error { + ctx, cancel := context.WithTimeout(ctx, bp.config.ResourcePluginsTimeout) + defer cancel() + + args := bp.getArgs(req) + args = append([]string{cmd}, args...) + command := exec.CommandContext(ctx, bp.path, args...) // nolint: gosec + command.Dir = bp.config.ResourcePluginsDir + log.Infof(ctx, "[callBinaryPlugin] command: %s %s", bp.path, strings.Join(args, " ")) + pluginOutput, pluginLog, err := bp.execCommand(command) + + defer log.Infof(ctx, "[callBinaryPlugin] log from plugin %s: %s", bp.path, pluginLog) + defer log.Infof(ctx, "[callBinaryPlugin] output from plugin %s: %s", bp.path, pluginOutput) + + if err != nil { + log.Errorf(ctx, "[callBinaryPlugin] failed to run plugin %s, command %v, err %s", bp.path, args, err) + return err + } + + if len(pluginOutput) == 0 { + pluginOutput = "{}" + } + if err := json.Unmarshal([]byte(pluginOutput), resp); err != nil { + log.Errorf(ctx, "[callBinaryPlugin] failed to unmarshal output of plugin %s, command %v, output %s, err %s", bp.path, args, pluginOutput, err) + return err + } + return nil +} + +func (bp *BinaryPlugin) getNodeResourceInfo(ctx context.Context, nodeName string, workloads []*coretypes.Workload, fix bool) (resp *GetNodeResourceInfoResponse, err error) { + workloadMap := map[string]coretypes.WorkloadResourceArgs{} + for _, workload := range workloads { + workloadMap[workload.ID] = workload.ResourceArgs[bp.Name()] + } + + req := GetNodeResourceInfoRequest{ + NodeName: nodeName, + WorkloadMap: workloadMap, + Fix: fix, + } + resp = &GetNodeResourceInfoResponse{} + if err = bp.call(ctx, getNodeResourceInfoCommand, req, resp); err != nil { + return nil, err + } + return resp, nil +} diff --git a/resources/cpumem/cpumem.go b/resources/cpumem/cpumem.go index 1ccaeb2e1..b60cd3d6d 100644 --- a/resources/cpumem/cpumem.go +++ b/resources/cpumem/cpumem.go @@ -311,8 +311,8 @@ func (c *Plugin) GetMetricsDescription(ctx context.Context) (*resources.GetMetri return resp, mapstructure.Decode(c.c.GetMetricsDescription(), resp) } -// ResolveNodeResourceInfoToMetrics . -func (c *Plugin) ResolveNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, info *resources.NodeResourceInfo) (*resources.ResolveNodeResourceInfoToMetricsResponse, error) { +// ConvertNodeResourceInfoToMetrics . +func (c *Plugin) ConvertNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, info *resources.NodeResourceInfo) (*resources.ConvertNodeResourceInfoToMetricsResponse, error) { capacity, usage := &types.NodeResourceArgs{}, &types.NodeResourceArgs{} if err := capacity.ParseFromRawParams(coretypes.RawParams(info.Capacity)); err != nil { return nil, err @@ -321,7 +321,7 @@ func (c *Plugin) ResolveNodeResourceInfoToMetrics(ctx context.Context, podName s return nil, err } - metrics := c.c.ResolveNodeResourceInfoToMetrics(podName, nodeName, capacity, usage) - resp := &resources.ResolveNodeResourceInfoToMetricsResponse{} + metrics := c.c.ConvertNodeResourceInfoToMetrics(podName, nodeName, capacity, usage) + resp := &resources.ConvertNodeResourceInfoToMetricsResponse{} return resp, mapstructure.Decode(metrics, resp) } diff --git a/resources/cpumem/models/metrics.go b/resources/cpumem/models/metrics.go index 26075da20..85e6d82fc 100644 --- a/resources/cpumem/models/metrics.go +++ b/resources/cpumem/models/metrics.go @@ -37,7 +37,7 @@ func (c *CPUMem) GetMetricsDescription() []map[string]interface{} { } } -func (c *CPUMem) ResolveNodeResourceInfoToMetrics(podName string, nodeName string, nodeResourceCapacity *types.NodeResourceArgs, nodeResourceUsage *types.NodeResourceArgs) []map[string]interface{} { +func (c *CPUMem) ConvertNodeResourceInfoToMetrics(podName string, nodeName string, nodeResourceCapacity *types.NodeResourceArgs, nodeResourceUsage *types.NodeResourceArgs) []map[string]interface{} { cleanedNodeName := strings.ReplaceAll(nodeName, ".", "_") metrics := []map[string]interface{}{ { diff --git a/resources/manager.go b/resources/manager.go index 89536c71a..7f1c098e4 100644 --- a/resources/manager.go +++ b/resources/manager.go @@ -3,126 +3,52 @@ package resources import ( "context" "math" - "sync" + "github.com/pkg/errors" enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" - - "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" - "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) -// PluginManager manages plugins -type PluginManager struct { - config types.Config - plugins []Plugin -} +// GetMostIdleNode . +func (pm *PluginsManager) GetMostIdleNode(ctx context.Context, nodeNames []string) (string, error) { + var mostIdleNode *GetMostIdleNodeResponse -// NewPluginManager creates a plugin manager -func NewPluginManager(config types.Config) (*PluginManager, error) { - pm := &PluginManager{ - config: config, - plugins: []Plugin{}, + if len(nodeNames) == 0 { + return "", errors.Wrap(types.ErrGetMostIdleNodeFailed, "empty node names") } - return pm, nil -} - -// LoadPlugins . -func (pm *PluginManager) LoadPlugins(ctx context.Context) error { - if len(pm.config.ResourcePluginsDir) > 0 { // it's not a slice ! - pluginFiles, err := utils.ListAllExecutableFiles(pm.config.ResourcePluginsDir) + respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetMostIdleNodeResponse, error) { + resp, err := plugin.GetMostIdleNode(ctx, nodeNames) if err != nil { - log.Errorf(ctx, "[LoadPlugins] failed to list all executable files dir: %v, err: %v", pm.config.ResourcePluginsDir, err) - return err - } - - for _, file := range pluginFiles { - log.Infof(ctx, "[LoadPlugins] load binary plugin: %v", file) - pm.plugins = append(pm.plugins, &BinaryPlugin{path: file, config: pm.config}) - } - - pluginMap := map[string]Plugin{} - for _, plugin := range pm.plugins { - pluginMap[plugin.Name()] = plugin + log.Errorf(ctx, "[GetMostIdleNode] plugin %v failed to get the most idle node of %v, err: %v", plugin.Name(), nodeNames, err) } - pm.plugins = maps.Values(pluginMap) - } - return nil -} - -// AddPlugins adds a plugin (for test and debug) -func (pm *PluginManager) AddPlugins(plugins ...Plugin) { - pm.plugins = append(pm.plugins, plugins...) -} - -// GetPlugins is used for mock -func (pm *PluginManager) GetPlugins() []Plugin { - return pm.plugins -} - -func callPlugins[T any](ctx context.Context, plugins []Plugin, f func(Plugin) (T, error)) (map[Plugin]T, error) { - resMap := sync.Map{} - var combinedErr error - wg := &sync.WaitGroup{} - wg.Add(len(plugins)) - - for _, plugin := range plugins { - go func(p Plugin) { - defer wg.Done() - if res, err := f(p); err != nil { - log.Errorf(ctx, "[callPlugins] failed to call plugin %v, err: %v", p.Name(), err) - combinedErr = multierror.Append(combinedErr, types.NewDetailedErr(err, p.Name())) - } else { - resMap.Store(p, res) - } - }(plugin) - } - wg.Wait() - - results := map[Plugin]T{} - resMap.Range(func(key, value interface{}) bool { - plugin := key.(Plugin) - res := value.(T) - results[plugin] = res - return true + return resp, err }) - if len(results) == len(plugins) { - return results, nil - } - - return results, combinedErr -} -func (pm *PluginManager) mergeNodeCapacityInfo(m1 map[string]*NodeCapacityInfo, m2 map[string]*NodeCapacityInfo) map[string]*NodeCapacityInfo { - if m1 == nil { - return m2 + if err != nil { + log.Errorf(ctx, "[GetMostIdleNode] failed to get the most idle node of %v", nodeNames) + return "", err } - res := map[string]*NodeCapacityInfo{} - for node, info1 := range m1 { - // all the capacities should > 0 - if info2, ok := m2[node]; ok { - res[node] = &NodeCapacityInfo{ - NodeName: node, - Capacity: utils.Min(info1.Capacity, info2.Capacity), - Rate: info1.Rate + info2.Rate*info2.Weight, - Usage: info1.Usage + info2.Usage*info2.Weight, - Weight: info1.Weight + info2.Weight, - } + for _, resp := range respMap { + if (mostIdleNode == nil || resp.Priority > mostIdleNode.Priority) && len(resp.NodeName) > 0 { + mostIdleNode = resp } } - return res + + if mostIdleNode == nil { + return "", types.ErrGetMostIdleNodeFailed + } + return mostIdleNode.NodeName, nil } // GetNodesDeployCapacity returns available nodes which meet all the requirements // the caller should require locks // pure calculation -func (pm *PluginManager) GetNodesDeployCapacity(ctx context.Context, nodeNames []string, resourceOpts types.WorkloadResourceOpts) (map[string]*NodeCapacityInfo, int, error) { +func (pm *PluginsManager) GetNodesDeployCapacity(ctx context.Context, nodeNames []string, resourceOpts types.WorkloadResourceOpts) (map[string]*NodeCapacityInfo, int, error) { var res map[string]*NodeCapacityInfo respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetNodesDeployCapacityResponse, error) { @@ -158,148 +84,63 @@ func (pm *PluginManager) GetNodesDeployCapacity(ctx context.Context, nodeNames [ return res, total, nil } -// mergeEngineArgs e.g. {"file": ["/bin/sh:/bin/sh"], "cpu": 1.2, "cpu-bind": true} + {"file": ["/bin/ls:/bin/ls"], "mem": "1PB"} -// => {"file": ["/bin/sh:/bin/sh", "/bin/ls:/bin/ls"], "cpu": 1.2, "cpu-bind": true, "mem": "1PB"} -func (pm *PluginManager) mergeEngineArgs(ctx context.Context, m1 types.EngineArgs, m2 types.EngineArgs) (types.EngineArgs, error) { - res := types.EngineArgs{} - for key, value := range m1 { - res[key] = value - } - for key, value := range m2 { - if _, ok := res[key]; ok { - // only two string slices can be merged - _, ok1 := res[key].([]string) - _, ok2 := value.([]string) - if !ok1 || !ok2 { - log.Errorf(ctx, "[mergeEngineArgs] only two string slices can be merged! error key %v, m1[key] = %v, m2[key] = %v", key, m1[key], m2[key]) - return nil, types.ErrInvalidEngineArgs - } - res[key] = append(res[key].([]string), value.([]string)...) - } else { - res[key] = value - } - } - return res, nil -} - -// Alloc . -func (pm *PluginManager) Alloc(ctx context.Context, nodeName string, deployCount int, resourceOpts types.WorkloadResourceOpts) ([]types.EngineArgs, []map[string]types.WorkloadResourceArgs, error) { - resEngineArgs := make([]types.EngineArgs, deployCount) - resResourceArgs := make([]map[string]types.WorkloadResourceArgs, deployCount) - - // init engine args - for i := 0; i < deployCount; i++ { - resEngineArgs[i] = types.EngineArgs{} - resResourceArgs[i] = map[string]types.WorkloadResourceArgs{} - } +// SetNodeResourceCapacity updates node resource capacity +// receives resource options instead of resource args +func (pm *PluginsManager) SetNodeResourceCapacity(ctx context.Context, nodeName string, nodeResourceOpts types.NodeResourceOpts, nodeResourceArgs map[string]types.NodeResourceArgs, delta bool, incr bool) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { + rollbackPlugins := []Plugin{} + beforeMap := map[string]types.NodeResourceArgs{} + afterMap := map[string]types.NodeResourceArgs{} - return resEngineArgs, resResourceArgs, utils.PCR(ctx, - // prepare: calculate engine args and resource args + return beforeMap, afterMap, utils.PCR(ctx, func(ctx context.Context) error { - respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetDeployArgsResponse, error) { - resp, err := plugin.GetDeployArgs(ctx, nodeName, deployCount, resourceOpts) + if nodeResourceArgs == nil { + nodeResourceArgs = map[string]types.NodeResourceArgs{} + } + return nil + }, + // commit: call plugins to set node resource + func(ctx context.Context) error { + respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*SetNodeResourceCapacityResponse, error) { + resp, err := plugin.SetNodeResourceCapacity(ctx, nodeName, nodeResourceOpts, nodeResourceArgs[plugin.Name()], delta, incr) if err != nil { - log.Errorf(ctx, "[Alloc] plugin %v failed to compute alloc args, request %v, node %v, deploy count %v, err %v", plugin.Name(), resourceOpts, nodeName, deployCount, err) + log.Errorf(ctx, "[SetNodeResourceCapacity] node %v plugin %v failed to set node resource capacity, err: %v", nodeName, plugin.Name(), err) } return resp, err }) - if err != nil { - return err - } - // calculate engine args - for plugin, resp := range respMap { - for index, args := range resp.ResourceArgs { - resResourceArgs[index][plugin.Name()] = args - } - for index, args := range resp.EngineArgs { - resEngineArgs[index], err = pm.mergeEngineArgs(ctx, resEngineArgs[index], args) - if err != nil { - log.Errorf(ctx, "[Alloc] invalid engine args") - return err - } + if err != nil { + for plugin, resp := range respMap { + rollbackPlugins = append(rollbackPlugins, plugin) + beforeMap[plugin.Name()] = resp.Before + afterMap[plugin.Name()] = resp.After } - } - return nil - }, - // commit: update node resources - func(ctx context.Context) error { - if _, _, err := pm.SetNodeResourceUsage(ctx, nodeName, nil, nil, resResourceArgs, true, Incr); err != nil { - log.Errorf(ctx, "[Alloc] failed to update node resource, err: %v", err) + + log.Errorf(ctx, "[SetNodeResourceCapacity] failed to set node resource for node %v", nodeName) return err } return nil }, - // rollback: do nothing - func(ctx context.Context) error { - return nil - }, - pm.config.GlobalTimeout, - ) -} - -// RollbackAlloc rollbacks the allocated resource -func (pm *PluginManager) RollbackAlloc(ctx context.Context, nodeName string, resourceArgs []map[string]types.WorkloadResourceArgs) error { - _, _, err := pm.SetNodeResourceUsage(ctx, nodeName, nil, nil, resourceArgs, true, Decr) - return err -} - -// Realloc reallocates resource for workloads, returns engine args and final resource args. -func (pm *PluginManager) Realloc(ctx context.Context, nodeName string, originResourceArgs map[string]types.WorkloadResourceArgs, resourceOpts types.WorkloadResourceOpts) (types.EngineArgs, map[string]types.WorkloadResourceArgs, map[string]types.WorkloadResourceArgs, error) { - resEngineArgs := types.EngineArgs{} - resDeltaResourceArgs := map[string]types.WorkloadResourceArgs{} - resFinalResourceArgs := map[string]types.WorkloadResourceArgs{} - - return resEngineArgs, resDeltaResourceArgs, resFinalResourceArgs, utils.PCR(ctx, - // prepare: calculate engine args, delta node resource args and final workload resource args + // rollback: set the rollback resource args in reverse func(ctx context.Context) error { - respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetReallocArgsResponse, error) { - resp, err := plugin.GetReallocArgs(ctx, nodeName, originResourceArgs[plugin.Name()], resourceOpts) + _, err := callPlugins(ctx, rollbackPlugins, func(plugin Plugin) (*SetNodeResourceCapacityResponse, error) { + resp, err := plugin.SetNodeResourceCapacity(ctx, nodeName, nil, beforeMap[plugin.Name()], false, false) if err != nil { - log.Errorf(ctx, "[Realloc] plugin %v failed to calculate realloc args, err: %v", plugin.Name(), err) + log.Errorf(ctx, "[SetNodeResourceCapacity] node %v plugin %v failed to rollback node resource capacity, err: %v", err) } return resp, err }) if err != nil { - log.Errorf(ctx, "[Realloc] realloc failed, origin: %+v, opts: %+v", originResourceArgs, resourceOpts) return err } - - for plugin, resp := range respMap { - if resEngineArgs, err = pm.mergeEngineArgs(ctx, resEngineArgs, resp.EngineArgs); err != nil { - log.Errorf(ctx, "[Realloc] invalid engine args, err: %v", err) - return err - } - resDeltaResourceArgs[plugin.Name()] = resp.Delta - resFinalResourceArgs[plugin.Name()] = resp.ResourceArgs - } - return nil - }, - // commit: update node resource - func(ctx context.Context) error { - if _, _, err := pm.SetNodeResourceUsage(ctx, nodeName, nil, nil, []map[string]types.WorkloadResourceArgs{resDeltaResourceArgs}, true, Incr); err != nil { - log.Errorf(ctx, "[Realloc] failed to update nodeName resource, err: %v", err) - return err - } - return nil - }, - // rollback: do nothing - func(ctx context.Context) error { return nil }, pm.config.GlobalTimeout, ) } -// RollbackRealloc rollbacks the resource changes caused by realloc -func (pm *PluginManager) RollbackRealloc(ctx context.Context, nodeName string, resourceArgs map[string]types.WorkloadResourceArgs) error { - _, _, err := pm.SetNodeResourceUsage(ctx, nodeName, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, Decr) - return err -} - // GetNodeResourceInfo . -func (pm *PluginManager) GetNodeResourceInfo(ctx context.Context, nodeName string, workloads []*types.Workload, fix bool, pluginWhiteList []string) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, []string, error) { +func (pm *PluginsManager) GetNodeResourceInfo(ctx context.Context, nodeName string, workloads []*types.Workload, fix bool, pluginWhiteList []string) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, []string, error) { resResourceCapacity := map[string]types.NodeResourceArgs{} resResourceUsage := map[string]types.NodeResourceArgs{} resDiffs := []string{} @@ -339,7 +180,7 @@ func (pm *PluginManager) GetNodeResourceInfo(ctx context.Context, nodeName strin } // SetNodeResourceUsage with rollback -func (pm *PluginManager) SetNodeResourceUsage(ctx context.Context, nodeName string, nodeResourceOpts types.NodeResourceOpts, nodeResourceArgs map[string]types.NodeResourceArgs, workloadResourceArgs []map[string]types.WorkloadResourceArgs, delta bool, incr bool) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { +func (pm *PluginsManager) SetNodeResourceUsage(ctx context.Context, nodeName string, nodeResourceOpts types.NodeResourceOpts, nodeResourceArgs map[string]types.NodeResourceArgs, workloadResourceArgs []map[string]types.WorkloadResourceArgs, delta bool, incr bool) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { workloadResourceArgsMap := map[string][]types.WorkloadResourceArgs{} rollbackPlugins := []Plugin{} beforeMap := map[string]types.NodeResourceArgs{} @@ -403,98 +244,171 @@ func (pm *PluginManager) SetNodeResourceUsage(ctx context.Context, nodeName stri ) } -// SetNodeResourceCapacity updates node resource capacity -// receives resource options instead of resource args -func (pm *PluginManager) SetNodeResourceCapacity(ctx context.Context, nodeName string, nodeResourceOpts types.NodeResourceOpts, nodeResourceArgs map[string]types.NodeResourceArgs, delta bool, incr bool) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { - rollbackPlugins := []Plugin{} - beforeMap := map[string]types.NodeResourceArgs{} - afterMap := map[string]types.NodeResourceArgs{} +// Alloc . +func (pm *PluginsManager) Alloc(ctx context.Context, nodeName string, deployCount int, resourceOpts types.WorkloadResourceOpts) ([]types.EngineArgs, []map[string]types.WorkloadResourceArgs, error) { + resEngineArgs := make([]types.EngineArgs, deployCount) + resResourceArgs := make([]map[string]types.WorkloadResourceArgs, deployCount) - return beforeMap, afterMap, utils.PCR(ctx, - func(ctx context.Context) error { - if nodeResourceArgs == nil { - nodeResourceArgs = map[string]types.NodeResourceArgs{} - } - return nil - }, - // commit: call plugins to set node resource + // init engine args + for i := 0; i < deployCount; i++ { + resEngineArgs[i] = types.EngineArgs{} + resResourceArgs[i] = map[string]types.WorkloadResourceArgs{} + } + + return resEngineArgs, resResourceArgs, utils.PCR(ctx, + // prepare: calculate engine args and resource args func(ctx context.Context) error { - respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*SetNodeResourceCapacityResponse, error) { - resp, err := plugin.SetNodeResourceCapacity(ctx, nodeName, nodeResourceOpts, nodeResourceArgs[plugin.Name()], delta, incr) + respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetDeployArgsResponse, error) { + resp, err := plugin.GetDeployArgs(ctx, nodeName, deployCount, resourceOpts) if err != nil { - log.Errorf(ctx, "[SetNodeResourceCapacity] node %v plugin %v failed to set node resource capacity, err: %v", nodeName, plugin.Name(), err) + log.Errorf(ctx, "[Alloc] plugin %v failed to compute alloc args, request %v, node %v, deploy count %v, err %v", plugin.Name(), resourceOpts, nodeName, deployCount, err) } return resp, err }) - if err != nil { - for plugin, resp := range respMap { - rollbackPlugins = append(rollbackPlugins, plugin) - beforeMap[plugin.Name()] = resp.Before - afterMap[plugin.Name()] = resp.After - } + return err + } - log.Errorf(ctx, "[SetNodeResourceCapacity] failed to set node resource for node %v", nodeName) + // calculate engine args + for plugin, resp := range respMap { + for index, args := range resp.ResourceArgs { + resResourceArgs[index][plugin.Name()] = args + } + for index, args := range resp.EngineArgs { + resEngineArgs[index], err = pm.mergeEngineArgs(ctx, resEngineArgs[index], args) + if err != nil { + log.Errorf(ctx, "[Alloc] invalid engine args") + return err + } + } + } + return nil + }, + // commit: update node resources + func(ctx context.Context) error { + if _, _, err := pm.SetNodeResourceUsage(ctx, nodeName, nil, nil, resResourceArgs, true, Incr); err != nil { + log.Errorf(ctx, "[Alloc] failed to update node resource, err: %v", err) return err } return nil }, - // rollback: set the rollback resource args in reverse + // rollback: do nothing func(ctx context.Context) error { - _, err := callPlugins(ctx, rollbackPlugins, func(plugin Plugin) (*SetNodeResourceCapacityResponse, error) { - resp, err := plugin.SetNodeResourceCapacity(ctx, nodeName, nil, beforeMap[plugin.Name()], false, false) + return nil + }, + pm.config.GlobalTimeout, + ) +} + +// RollbackAlloc rollbacks the allocated resource +func (pm *PluginsManager) RollbackAlloc(ctx context.Context, nodeName string, resourceArgs []map[string]types.WorkloadResourceArgs) error { + _, _, err := pm.SetNodeResourceUsage(ctx, nodeName, nil, nil, resourceArgs, true, Decr) + return err +} + +// Realloc reallocates resource for workloads, returns engine args and final resource args. +func (pm *PluginsManager) Realloc(ctx context.Context, nodeName string, originResourceArgs map[string]types.WorkloadResourceArgs, resourceOpts types.WorkloadResourceOpts) (types.EngineArgs, map[string]types.WorkloadResourceArgs, map[string]types.WorkloadResourceArgs, error) { + resEngineArgs := types.EngineArgs{} + resDeltaResourceArgs := map[string]types.WorkloadResourceArgs{} + resFinalResourceArgs := map[string]types.WorkloadResourceArgs{} + + return resEngineArgs, resDeltaResourceArgs, resFinalResourceArgs, utils.PCR(ctx, + // prepare: calculate engine args, delta node resource args and final workload resource args + func(ctx context.Context) error { + respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetReallocArgsResponse, error) { + resp, err := plugin.GetReallocArgs(ctx, nodeName, originResourceArgs[plugin.Name()], resourceOpts) if err != nil { - log.Errorf(ctx, "[SetNodeResourceCapacity] node %v plugin %v failed to rollback node resource capacity, err: %v", err) + log.Errorf(ctx, "[Realloc] plugin %v failed to calculate realloc args, err: %v", plugin.Name(), err) } return resp, err }) if err != nil { + log.Errorf(ctx, "[Realloc] realloc failed, origin: %+v, opts: %+v", originResourceArgs, resourceOpts) + return err + } + + for plugin, resp := range respMap { + if resEngineArgs, err = pm.mergeEngineArgs(ctx, resEngineArgs, resp.EngineArgs); err != nil { + log.Errorf(ctx, "[Realloc] invalid engine args, err: %v", err) + return err + } + resDeltaResourceArgs[plugin.Name()] = resp.Delta + resFinalResourceArgs[plugin.Name()] = resp.ResourceArgs + } + return nil + }, + // commit: update node resource + func(ctx context.Context) error { + if _, _, err := pm.SetNodeResourceUsage(ctx, nodeName, nil, nil, []map[string]types.WorkloadResourceArgs{resDeltaResourceArgs}, true, Incr); err != nil { + log.Errorf(ctx, "[Realloc] failed to update nodeName resource, err: %v", err) return err } return nil }, + // rollback: do nothing + func(ctx context.Context) error { + return nil + }, pm.config.GlobalTimeout, ) } -// GetRemapArgs remaps resource and returns engine args for workloads. format: {"workload-1": {"cpus": ["1-3"]}} -// remap doesn't change resource args -func (pm *PluginManager) GetRemapArgs(ctx context.Context, nodeName string, workloadMap map[string]*types.Workload) (map[string]types.EngineArgs, error) { - resEngineArgsMap := map[string]types.EngineArgs{} +// RollbackRealloc rollbacks the resource changes caused by realloc +func (pm *PluginsManager) RollbackRealloc(ctx context.Context, nodeName string, resourceArgs map[string]types.WorkloadResourceArgs) error { + _, _, err := pm.SetNodeResourceUsage(ctx, nodeName, nil, nil, []map[string]types.WorkloadResourceArgs{resourceArgs}, true, Decr) + return err +} - // call plugins to remap - respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetRemapArgsResponse, error) { - resp, err := plugin.GetRemapArgs(ctx, nodeName, workloadMap) +// GetMetricsDescription . +func (pm *PluginsManager) GetMetricsDescription(ctx context.Context) ([]*MetricsDescription, error) { + var metricsDescriptions []*MetricsDescription + respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetMetricsDescriptionResponse, error) { + resp, err := plugin.GetMetricsDescription(ctx) if err != nil { - log.Errorf(ctx, "[GetRemapArgs] plugin %v node %v failed to remap, err: %v", plugin.Name(), nodeName, err) + log.Errorf(ctx, "[GetMetricsDescription] plugin %v failed to get metrics description, err: %v", plugin.Name(), err) } return resp, err }) if err != nil { + log.Errorf(ctx, "[GetMetricsDescription] failed to get metrics description") return nil, err } - // merge engine args for _, resp := range respMap { - for workloadID, engineArgs := range resp.EngineArgsMap { - if _, ok := resEngineArgsMap[workloadID]; !ok { - resEngineArgsMap[workloadID] = types.EngineArgs{} - } - resEngineArgsMap[workloadID], err = pm.mergeEngineArgs(ctx, resEngineArgsMap[workloadID], engineArgs) - if err != nil { - log.Errorf(ctx, "[GetRemapArgs] invalid engine args") - return nil, err - } + metricsDescriptions = append(metricsDescriptions, *resp...) + } + + return metricsDescriptions, nil +} + +// ConvertNodeResourceInfoToMetrics . +func (pm *PluginsManager) ConvertNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, nodeResourceCapacity map[string]types.NodeResourceArgs, nodeResourceUsage map[string]types.NodeResourceArgs) ([]*Metrics, error) { + var metrics []*Metrics + respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*ConvertNodeResourceInfoToMetricsResponse, error) { + capacity, usage := nodeResourceCapacity[plugin.Name()], nodeResourceUsage[plugin.Name()] + resp, err := plugin.ConvertNodeResourceInfoToMetrics(ctx, podName, nodeName, &NodeResourceInfo{Capacity: capacity, Usage: usage}) + if err != nil { + log.Errorf(ctx, "[ConvertNodeResourceInfoToMetrics] plugin %v failed to resolve node resource info to metrics, err: %v", plugin.Name(), err) } + return resp, err + }) + + if err != nil { + log.Errorf(ctx, "[ConvertNodeResourceInfoToMetrics] failed to resolve node resource info to metrics") + return nil, err } - return resEngineArgsMap, nil + for _, resp := range respMap { + metrics = append(metrics, *resp...) + } + + return metrics, nil } // AddNode . -func (pm *PluginManager) AddNode(ctx context.Context, nodeName string, resourceOpts types.NodeResourceOpts, nodeInfo *enginetypes.Info) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { +func (pm *PluginsManager) AddNode(ctx context.Context, nodeName string, resourceOpts types.NodeResourceOpts, nodeInfo *enginetypes.Info) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { resResourceCapacity := map[string]types.NodeResourceArgs{} resResourceUsage := map[string]types.NodeResourceArgs{} rollbackPlugins := []Plugin{} @@ -552,7 +466,7 @@ func (pm *PluginManager) AddNode(ctx context.Context, nodeName string, resourceO } // RemoveNode . -func (pm *PluginManager) RemoveNode(ctx context.Context, nodeName string) error { +func (pm *PluginsManager) RemoveNode(ctx context.Context, nodeName string) error { var resourceCapacityMap map[string]types.NodeResourceArgs var resourceUsageMap map[string]types.NodeResourceArgs rollbackPlugins := []Plugin{} @@ -608,82 +522,82 @@ func (pm *PluginManager) RemoveNode(ctx context.Context, nodeName string) error ) } -// GetMostIdleNode , -func (pm *PluginManager) GetMostIdleNode(ctx context.Context, nodeNames []string) (string, error) { - var mostIdleNode *GetMostIdleNodeResponse - - if len(nodeNames) == 0 { - return "", errors.Wrap(types.ErrGetMostIdleNodeFailed, "empty node names") - } +// GetRemapArgs remaps resource and returns engine args for workloads. format: {"workload-1": {"cpus": ["1-3"]}} +// remap doesn't change resource args +func (pm *PluginsManager) GetRemapArgs(ctx context.Context, nodeName string, workloadMap map[string]*types.Workload) (map[string]types.EngineArgs, error) { + resEngineArgsMap := map[string]types.EngineArgs{} - respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetMostIdleNodeResponse, error) { - resp, err := plugin.GetMostIdleNode(ctx, nodeNames) + // call plugins to remap + respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetRemapArgsResponse, error) { + resp, err := plugin.GetRemapArgs(ctx, nodeName, workloadMap) if err != nil { - log.Errorf(ctx, "[GetMostIdleNode] plugin %v failed to get the most idle node of %v, err: %v", plugin.Name(), nodeNames, err) + log.Errorf(ctx, "[GetRemapArgs] plugin %v node %v failed to remap, err: %v", plugin.Name(), nodeName, err) } return resp, err }) if err != nil { - log.Errorf(ctx, "[GetMostIdleNode] failed to get the most idle node of %v", nodeNames) - return "", err + return nil, err } + // merge engine args for _, resp := range respMap { - if (mostIdleNode == nil || resp.Priority > mostIdleNode.Priority) && len(resp.NodeName) > 0 { - mostIdleNode = resp + for workloadID, engineArgs := range resp.EngineArgsMap { + if _, ok := resEngineArgsMap[workloadID]; !ok { + resEngineArgsMap[workloadID] = types.EngineArgs{} + } + resEngineArgsMap[workloadID], err = pm.mergeEngineArgs(ctx, resEngineArgsMap[workloadID], engineArgs) + if err != nil { + log.Errorf(ctx, "[GetRemapArgs] invalid engine args") + return nil, err + } } } - if mostIdleNode == nil { - return "", types.ErrGetMostIdleNodeFailed - } - return mostIdleNode.NodeName, nil + return resEngineArgsMap, nil } -// GetMetricsDescription . -func (pm *PluginManager) GetMetricsDescription(ctx context.Context) ([]*MetricsDescription, error) { - var metricsDescriptions []*MetricsDescription - respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetMetricsDescriptionResponse, error) { - resp, err := plugin.GetMetricsDescription(ctx) - if err != nil { - log.Errorf(ctx, "[GetMetricsDescription] plugin %v failed to get metrics description, err: %v", plugin.Name(), err) - } - return resp, err - }) - - if err != nil { - log.Errorf(ctx, "[GetMetricsDescription] failed to get metrics description") - return nil, err +func (pm *PluginsManager) mergeNodeCapacityInfo(m1 map[string]*NodeCapacityInfo, m2 map[string]*NodeCapacityInfo) map[string]*NodeCapacityInfo { + if m1 == nil { + return m2 } - for _, resp := range respMap { - metricsDescriptions = append(metricsDescriptions, *resp...) + res := map[string]*NodeCapacityInfo{} + for node, info1 := range m1 { + // all the capacities should > 0 + if info2, ok := m2[node]; ok { + res[node] = &NodeCapacityInfo{ + NodeName: node, + Capacity: utils.Min(info1.Capacity, info2.Capacity), + Rate: info1.Rate + info2.Rate*info2.Weight, + Usage: info1.Usage + info2.Usage*info2.Weight, + Weight: info1.Weight + info2.Weight, + } + } } - - return metricsDescriptions, nil + return res } -// ResolveNodeResourceInfoToMetrics . -func (pm *PluginManager) ResolveNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, nodeResourceCapacity map[string]types.NodeResourceArgs, nodeResourceUsage map[string]types.NodeResourceArgs) ([]*Metrics, error) { - var metrics []*Metrics - respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*ResolveNodeResourceInfoToMetricsResponse, error) { - capacity, usage := nodeResourceCapacity[plugin.Name()], nodeResourceUsage[plugin.Name()] - resp, err := plugin.ResolveNodeResourceInfoToMetrics(ctx, podName, nodeName, &NodeResourceInfo{Capacity: capacity, Usage: usage}) - if err != nil { - log.Errorf(ctx, "[ResolveNodeResourceInfoToMetrics] plugin %v failed to resolve node resource info to metrics, err: %v", plugin.Name(), err) - } - return resp, err - }) - - if err != nil { - log.Errorf(ctx, "[ResolveNodeResourceInfoToMetrics] failed to resolve node resource info to metrics") - return nil, err +// mergeEngineArgs e.g. {"file": ["/bin/sh:/bin/sh"], "cpu": 1.2, "cpu-bind": true} + {"file": ["/bin/ls:/bin/ls"], "mem": "1PB"} +// => {"file": ["/bin/sh:/bin/sh", "/bin/ls:/bin/ls"], "cpu": 1.2, "cpu-bind": true, "mem": "1PB"} +func (pm *PluginsManager) mergeEngineArgs(ctx context.Context, m1 types.EngineArgs, m2 types.EngineArgs) (types.EngineArgs, error) { + res := types.EngineArgs{} + for key, value := range m1 { + res[key] = value } - - for _, resp := range respMap { - metrics = append(metrics, *resp...) + for key, value := range m2 { + if _, ok := res[key]; ok { + // only two string slices can be merged + _, ok1 := res[key].([]string) + _, ok2 := value.([]string) + if !ok1 || !ok2 { + log.Errorf(ctx, "[mergeEngineArgs] only two string slices can be merged! error key %v, m1[key] = %v, m2[key] = %v", key, m1[key], m2[key]) + return nil, types.ErrInvalidEngineArgs + } + res[key] = append(res[key].([]string), value.([]string)...) + } else { + res[key] = value + } } - - return metrics, nil + return res, nil } diff --git a/resources/mocks/Manager.go b/resources/mocks/Manager.go new file mode 100644 index 000000000..54c4d4351 --- /dev/null +++ b/resources/mocks/Manager.go @@ -0,0 +1,406 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + enginetypes "github.com/projecteru2/core/engine/types" + mock "github.com/stretchr/testify/mock" + + resources "github.com/projecteru2/core/resources" + + types "github.com/projecteru2/core/types" +) + +// Manager is an autogenerated mock type for the Manager type +type Manager struct { + mock.Mock +} + +// AddNode provides a mock function with given fields: _a0, _a1, _a2, _a3 +func (_m *Manager) AddNode(_a0 context.Context, _a1 string, _a2 types.NodeResourceOpts, _a3 *enginetypes.Info) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { + ret := _m.Called(_a0, _a1, _a2, _a3) + + var r0 map[string]types.NodeResourceArgs + if rf, ok := ret.Get(0).(func(context.Context, string, types.NodeResourceOpts, *enginetypes.Info) map[string]types.NodeResourceArgs); ok { + r0 = rf(_a0, _a1, _a2, _a3) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]types.NodeResourceArgs) + } + } + + var r1 map[string]types.NodeResourceArgs + if rf, ok := ret.Get(1).(func(context.Context, string, types.NodeResourceOpts, *enginetypes.Info) map[string]types.NodeResourceArgs); ok { + r1 = rf(_a0, _a1, _a2, _a3) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[string]types.NodeResourceArgs) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, string, types.NodeResourceOpts, *enginetypes.Info) error); ok { + r2 = rf(_a0, _a1, _a2, _a3) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// Alloc provides a mock function with given fields: _a0, _a1, _a2, _a3 +func (_m *Manager) Alloc(_a0 context.Context, _a1 string, _a2 int, _a3 types.WorkloadResourceOpts) ([]types.EngineArgs, []map[string]types.WorkloadResourceArgs, error) { + ret := _m.Called(_a0, _a1, _a2, _a3) + + var r0 []types.EngineArgs + if rf, ok := ret.Get(0).(func(context.Context, string, int, types.WorkloadResourceOpts) []types.EngineArgs); ok { + r0 = rf(_a0, _a1, _a2, _a3) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.EngineArgs) + } + } + + var r1 []map[string]types.WorkloadResourceArgs + if rf, ok := ret.Get(1).(func(context.Context, string, int, types.WorkloadResourceOpts) []map[string]types.WorkloadResourceArgs); ok { + r1 = rf(_a0, _a1, _a2, _a3) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]map[string]types.WorkloadResourceArgs) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, string, int, types.WorkloadResourceOpts) error); ok { + r2 = rf(_a0, _a1, _a2, _a3) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// ConvertNodeResourceInfoToMetrics provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *Manager) ConvertNodeResourceInfoToMetrics(_a0 context.Context, _a1 string, _a2 string, _a3 map[string]types.NodeResourceArgs, _a4 map[string]types.NodeResourceArgs) ([]*resources.Metrics, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + var r0 []*resources.Metrics + if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs) []*resources.Metrics); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*resources.Metrics) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string, map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs) error); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetMetricsDescription provides a mock function with given fields: _a0 +func (_m *Manager) GetMetricsDescription(_a0 context.Context) ([]*resources.MetricsDescription, error) { + ret := _m.Called(_a0) + + var r0 []*resources.MetricsDescription + if rf, ok := ret.Get(0).(func(context.Context) []*resources.MetricsDescription); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*resources.MetricsDescription) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetMostIdleNode provides a mock function with given fields: _a0, _a1 +func (_m *Manager) GetMostIdleNode(_a0 context.Context, _a1 []string) (string, error) { + ret := _m.Called(_a0, _a1) + + var r0 string + if rf, ok := ret.Get(0).(func(context.Context, []string) string); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(string) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, []string) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetNodeResourceInfo provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *Manager) GetNodeResourceInfo(_a0 context.Context, _a1 string, _a2 []*types.Workload, _a3 bool, _a4 []string) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, []string, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + var r0 map[string]types.NodeResourceArgs + if rf, ok := ret.Get(0).(func(context.Context, string, []*types.Workload, bool, []string) map[string]types.NodeResourceArgs); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]types.NodeResourceArgs) + } + } + + var r1 map[string]types.NodeResourceArgs + if rf, ok := ret.Get(1).(func(context.Context, string, []*types.Workload, bool, []string) map[string]types.NodeResourceArgs); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[string]types.NodeResourceArgs) + } + } + + var r2 []string + if rf, ok := ret.Get(2).(func(context.Context, string, []*types.Workload, bool, []string) []string); ok { + r2 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).([]string) + } + } + + var r3 error + if rf, ok := ret.Get(3).(func(context.Context, string, []*types.Workload, bool, []string) error); ok { + r3 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 +} + +// GetNodesDeployCapacity provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Manager) GetNodesDeployCapacity(_a0 context.Context, _a1 []string, _a2 types.WorkloadResourceOpts) (map[string]*resources.NodeCapacityInfo, int, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 map[string]*resources.NodeCapacityInfo + if rf, ok := ret.Get(0).(func(context.Context, []string, types.WorkloadResourceOpts) map[string]*resources.NodeCapacityInfo); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]*resources.NodeCapacityInfo) + } + } + + var r1 int + if rf, ok := ret.Get(1).(func(context.Context, []string, types.WorkloadResourceOpts) int); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Get(1).(int) + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, []string, types.WorkloadResourceOpts) error); ok { + r2 = rf(_a0, _a1, _a2) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// GetRemapArgs provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Manager) GetRemapArgs(_a0 context.Context, _a1 string, _a2 map[string]*types.Workload) (map[string]types.EngineArgs, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 map[string]types.EngineArgs + if rf, ok := ret.Get(0).(func(context.Context, string, map[string]*types.Workload) map[string]types.EngineArgs); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]types.EngineArgs) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, map[string]*types.Workload) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Realloc provides a mock function with given fields: _a0, _a1, _a2, _a3 +func (_m *Manager) Realloc(_a0 context.Context, _a1 string, _a2 map[string]types.WorkloadResourceArgs, _a3 types.WorkloadResourceOpts) (types.EngineArgs, map[string]types.WorkloadResourceArgs, map[string]types.WorkloadResourceArgs, error) { + ret := _m.Called(_a0, _a1, _a2, _a3) + + var r0 types.EngineArgs + if rf, ok := ret.Get(0).(func(context.Context, string, map[string]types.WorkloadResourceArgs, types.WorkloadResourceOpts) types.EngineArgs); ok { + r0 = rf(_a0, _a1, _a2, _a3) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.EngineArgs) + } + } + + var r1 map[string]types.WorkloadResourceArgs + if rf, ok := ret.Get(1).(func(context.Context, string, map[string]types.WorkloadResourceArgs, types.WorkloadResourceOpts) map[string]types.WorkloadResourceArgs); ok { + r1 = rf(_a0, _a1, _a2, _a3) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[string]types.WorkloadResourceArgs) + } + } + + var r2 map[string]types.WorkloadResourceArgs + if rf, ok := ret.Get(2).(func(context.Context, string, map[string]types.WorkloadResourceArgs, types.WorkloadResourceOpts) map[string]types.WorkloadResourceArgs); ok { + r2 = rf(_a0, _a1, _a2, _a3) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).(map[string]types.WorkloadResourceArgs) + } + } + + var r3 error + if rf, ok := ret.Get(3).(func(context.Context, string, map[string]types.WorkloadResourceArgs, types.WorkloadResourceOpts) error); ok { + r3 = rf(_a0, _a1, _a2, _a3) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 +} + +// RemoveNode provides a mock function with given fields: _a0, _a1 +func (_m *Manager) RemoveNode(_a0 context.Context, _a1 string) error { + ret := _m.Called(_a0, _a1) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RollbackAlloc provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Manager) RollbackAlloc(_a0 context.Context, _a1 string, _a2 []map[string]types.WorkloadResourceArgs) error { + ret := _m.Called(_a0, _a1, _a2) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, []map[string]types.WorkloadResourceArgs) error); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RollbackRealloc provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Manager) RollbackRealloc(_a0 context.Context, _a1 string, _a2 map[string]types.WorkloadResourceArgs) error { + ret := _m.Called(_a0, _a1, _a2) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, map[string]types.WorkloadResourceArgs) error); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetNodeResourceCapacity provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5 +func (_m *Manager) SetNodeResourceCapacity(_a0 context.Context, _a1 string, _a2 types.NodeResourceOpts, _a3 map[string]types.NodeResourceArgs, _a4 bool, _a5 bool) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5) + + var r0 map[string]types.NodeResourceArgs + if rf, ok := ret.Get(0).(func(context.Context, string, types.NodeResourceOpts, map[string]types.NodeResourceArgs, bool, bool) map[string]types.NodeResourceArgs); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]types.NodeResourceArgs) + } + } + + var r1 map[string]types.NodeResourceArgs + if rf, ok := ret.Get(1).(func(context.Context, string, types.NodeResourceOpts, map[string]types.NodeResourceArgs, bool, bool) map[string]types.NodeResourceArgs); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4, _a5) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[string]types.NodeResourceArgs) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, string, types.NodeResourceOpts, map[string]types.NodeResourceArgs, bool, bool) error); ok { + r2 = rf(_a0, _a1, _a2, _a3, _a4, _a5) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// SetNodeResourceUsage provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5, _a6 +func (_m *Manager) SetNodeResourceUsage(_a0 context.Context, _a1 string, _a2 types.NodeResourceOpts, _a3 map[string]types.NodeResourceArgs, _a4 []map[string]types.WorkloadResourceArgs, _a5 bool, _a6 bool) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6) + + var r0 map[string]types.NodeResourceArgs + if rf, ok := ret.Get(0).(func(context.Context, string, types.NodeResourceOpts, map[string]types.NodeResourceArgs, []map[string]types.WorkloadResourceArgs, bool, bool) map[string]types.NodeResourceArgs); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]types.NodeResourceArgs) + } + } + + var r1 map[string]types.NodeResourceArgs + if rf, ok := ret.Get(1).(func(context.Context, string, types.NodeResourceOpts, map[string]types.NodeResourceArgs, []map[string]types.WorkloadResourceArgs, bool, bool) map[string]types.NodeResourceArgs); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[string]types.NodeResourceArgs) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, string, types.NodeResourceOpts, map[string]types.NodeResourceArgs, []map[string]types.WorkloadResourceArgs, bool, bool) error); ok { + r2 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +type mockConstructorTestingTNewManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewManager creates a new instance of Manager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewManager(t mockConstructorTestingTNewManager) *Manager { + mock := &Manager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/resources/mocks/Plugin.go b/resources/mocks/Plugin.go index 3ad1057fb..c6381eb5f 100644 --- a/resources/mocks/Plugin.go +++ b/resources/mocks/Plugin.go @@ -41,6 +41,29 @@ func (_m *Plugin) AddNode(ctx context.Context, nodeName string, resourceOpts typ return r0, r1 } +// ConvertNodeResourceInfoToMetrics provides a mock function with given fields: ctx, podName, nodeName, nodeResourceInfo +func (_m *Plugin) ConvertNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, nodeResourceInfo *resources.NodeResourceInfo) (*resources.ConvertNodeResourceInfoToMetricsResponse, error) { + ret := _m.Called(ctx, podName, nodeName, nodeResourceInfo) + + var r0 *resources.ConvertNodeResourceInfoToMetricsResponse + if rf, ok := ret.Get(0).(func(context.Context, string, string, *resources.NodeResourceInfo) *resources.ConvertNodeResourceInfoToMetricsResponse); ok { + r0 = rf(ctx, podName, nodeName, nodeResourceInfo) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*resources.ConvertNodeResourceInfoToMetricsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string, *resources.NodeResourceInfo) error); ok { + r1 = rf(ctx, podName, nodeName, nodeResourceInfo) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FixNodeResource provides a mock function with given fields: ctx, nodeName, workloads func (_m *Plugin) FixNodeResource(ctx context.Context, nodeName string, workloads []*types.Workload) (*resources.GetNodeResourceInfoResponse, error) { ret := _m.Called(ctx, nodeName, workloads) @@ -262,29 +285,6 @@ func (_m *Plugin) RemoveNode(ctx context.Context, nodeName string) (*resources.R return r0, r1 } -// ResolveNodeResourceInfoToMetrics provides a mock function with given fields: ctx, podName, nodeName, nodeResourceInfo -func (_m *Plugin) ResolveNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, nodeResourceInfo *resources.NodeResourceInfo) (*resources.ResolveNodeResourceInfoToMetricsResponse, error) { - ret := _m.Called(ctx, podName, nodeName, nodeResourceInfo) - - var r0 *resources.ResolveNodeResourceInfoToMetricsResponse - if rf, ok := ret.Get(0).(func(context.Context, string, string, *resources.NodeResourceInfo) *resources.ResolveNodeResourceInfoToMetricsResponse); ok { - r0 = rf(ctx, podName, nodeName, nodeResourceInfo) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*resources.ResolveNodeResourceInfoToMetricsResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, string, *resources.NodeResourceInfo) error); ok { - r1 = rf(ctx, podName, nodeName, nodeResourceInfo) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // SetNodeResourceCapacity provides a mock function with given fields: ctx, nodeName, nodeResourceOpts, nodeResourceArgs, delta, incr func (_m *Plugin) SetNodeResourceCapacity(ctx context.Context, nodeName string, nodeResourceOpts types.NodeResourceOpts, nodeResourceArgs types.NodeResourceArgs, delta bool, incr bool) (*resources.SetNodeResourceCapacityResponse, error) { ret := _m.Called(ctx, nodeName, nodeResourceOpts, nodeResourceArgs, delta, incr) diff --git a/resources/plugin.go b/resources/plugin.go index ad2d2d440..8b49db0e7 100644 --- a/resources/plugin.go +++ b/resources/plugin.go @@ -4,6 +4,7 @@ import ( "context" enginetypes "github.com/projecteru2/core/engine/types" + "github.com/projecteru2/core/types" coretypes "github.com/projecteru2/core/types" ) @@ -29,6 +30,7 @@ const ( resolveNodeResourceInfoToMetricsCommand = "resolve-metrics" ) +// Plugin indicate plugin methods type Plugin interface { // GetDeployArgs tries to allocate resource, returns engine args for each workload, format: [{"cpus": 1.2}, {"cpus": 1.2}] // also returns resource args for each workload, format: [{"cpus": 1.2}, {"cpus": 1.2}] @@ -78,9 +80,36 @@ type Plugin interface { // GetMetricsDescription returns metrics description GetMetricsDescription(ctx context.Context) (*GetMetricsDescriptionResponse, error) - // ResolveNodeResourceInfoToMetrics resolves node resource info to metrics - ResolveNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, nodeResourceInfo *NodeResourceInfo) (*ResolveNodeResourceInfoToMetricsResponse, error) + // ConvertNodeResourceInfoToMetrics resolves node resource info to metrics + ConvertNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, nodeResourceInfo *NodeResourceInfo) (*ConvertNodeResourceInfoToMetricsResponse, error) // Name returns the name of plugin Name() string } + +// Manager indicate manages +type Manager interface { + // GetMostIdleNode . + GetMostIdleNode(context.Context, []string) (string, error) + + GetNodesDeployCapacity(context.Context, []string, types.WorkloadResourceOpts) (map[string]*NodeCapacityInfo, int, error) + + SetNodeResourceCapacity(context.Context, string, types.NodeResourceOpts, map[string]types.NodeResourceArgs, bool, bool) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) + + GetNodeResourceInfo(context.Context, string, []*types.Workload, bool, []string) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, []string, error) + + SetNodeResourceUsage(context.Context, string, types.NodeResourceOpts, map[string]types.NodeResourceArgs, []map[string]types.WorkloadResourceArgs, bool, bool) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) + + Alloc(context.Context, string, int, types.WorkloadResourceOpts) ([]types.EngineArgs, []map[string]types.WorkloadResourceArgs, error) + RollbackAlloc(context.Context, string, []map[string]types.WorkloadResourceArgs) error + Realloc(context.Context, string, map[string]types.WorkloadResourceArgs, types.WorkloadResourceOpts) (types.EngineArgs, map[string]types.WorkloadResourceArgs, map[string]types.WorkloadResourceArgs, error) + RollbackRealloc(context.Context, string, map[string]types.WorkloadResourceArgs) error + + GetMetricsDescription(context.Context) ([]*MetricsDescription, error) + ConvertNodeResourceInfoToMetrics(context.Context, string, string, map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs) ([]*Metrics, error) + + AddNode(context.Context, string, types.NodeResourceOpts, *enginetypes.Info) (map[string]types.NodeResourceArgs, map[string]types.NodeResourceArgs, error) + RemoveNode(context.Context, string) error + + GetRemapArgs(context.Context, string, map[string]*types.Workload) (map[string]types.EngineArgs, error) +} diff --git a/resources/plugins.go b/resources/plugins.go new file mode 100644 index 000000000..2dcf73c74 --- /dev/null +++ b/resources/plugins.go @@ -0,0 +1,82 @@ +package resources + +import ( + "context" + + "github.com/projecteru2/core/log" + "github.com/projecteru2/core/types" + "github.com/projecteru2/core/utils" + + "github.com/hashicorp/go-multierror" +) + +// PluginsManager manages plugins +type PluginsManager struct { + config types.Config + plugins []Plugin +} + +// NewPluginsManager creates a plugin manager +func NewPluginsManager(config types.Config) (*PluginsManager, error) { + pm := &PluginsManager{ + config: config, + plugins: []Plugin{}, + } + + return pm, nil +} + +// LoadPlugins . +func (pm *PluginsManager) LoadPlugins(ctx context.Context) error { + if pm.config.ResourcePluginsDir == "" { + return nil + } + + pluginFiles, err := utils.ListAllExecutableFiles(pm.config.ResourcePluginsDir) + if err != nil { + log.Errorf(ctx, "[LoadPlugins] failed to list all executable files dir: %v, err: %v", pm.config.ResourcePluginsDir, err) + return err + } + + cache := map[string]struct{}{} + for _, plugin := range pm.plugins { + cache[plugin.Name()] = struct{}{} + } + + for _, file := range pluginFiles { + log.Infof(ctx, "[LoadPlugins] load binary plugin: %v", file) + plugin := &BinaryPlugin{path: file, config: pm.config} + if _, ok := cache[plugin.Name()]; ok { + continue + } + pm.plugins = append(pm.plugins, plugin) + } + return nil +} + +// AddPlugins adds a plugin (for test and debug) +func (pm *PluginsManager) AddPlugins(plugins ...Plugin) { + pm.plugins = append(pm.plugins, plugins...) +} + +// GetPlugins is used for mock +func (pm *PluginsManager) GetPlugins() []Plugin { + return pm.plugins +} + +func callPlugins[T any](ctx context.Context, plugins []Plugin, f func(Plugin) (T, error)) (map[Plugin]T, error) { + var combinedErr error + results := map[Plugin]T{} + + for _, plugin := range plugins { + result, err := f(plugin) + if err != nil { + log.Errorf(ctx, "[callPlugins] failed to call plugin %v, err: %v", plugin.Name(), err) + combinedErr = multierror.Append(combinedErr, types.NewDetailedErr(err, plugin.Name())) + continue + } + results[plugin] = result + } + + return results, combinedErr +} diff --git a/resources/types.go b/resources/types.go index 4c4605e65..06afaf998 100644 --- a/resources/types.go +++ b/resources/types.go @@ -170,8 +170,8 @@ type MetricsDescription struct { // GetMetricsDescriptionResponse . type GetMetricsDescriptionResponse []*MetricsDescription -// ResolveNodeResourceInfoToMetricsRequest . -type ResolveNodeResourceInfoToMetricsRequest struct { +// ConvertNodeResourceInfoToMetricsRequest . +type ConvertNodeResourceInfoToMetricsRequest struct { PodName string `json:"pod"` NodeName string `json:"node"` Capacity types.NodeResourceArgs `json:"capacity"` @@ -186,5 +186,5 @@ type Metrics struct { Value string `json:"value"` } -// ResolveNodeResourceInfoToMetricsResponse . -type ResolveNodeResourceInfoToMetricsResponse []*Metrics +// ConvertNodeResourceInfoToMetricsResponse . +type ConvertNodeResourceInfoToMetricsResponse []*Metrics diff --git a/resources/volume/models/metrics.go b/resources/volume/models/metrics.go index 2db4b6e19..14e3fc0c4 100644 --- a/resources/volume/models/metrics.go +++ b/resources/volume/models/metrics.go @@ -25,7 +25,7 @@ func (v *Volume) GetMetricsDescription() []map[string]interface{} { } } -func (v *Volume) ResolveNodeResourceInfoToMetrics(podName string, nodeName string, nodeResourceCapacity *types.NodeResourceArgs, nodeResourceUsage *types.NodeResourceArgs) []map[string]interface{} { +func (v *Volume) ConvertNodeResourceInfoToMetrics(podName string, nodeName string, nodeResourceCapacity *types.NodeResourceArgs, nodeResourceUsage *types.NodeResourceArgs) []map[string]interface{} { cleanedNodeName := strings.ReplaceAll(nodeName, ".", "_") metrics := []map[string]interface{}{ { diff --git a/resources/volume/volume.go b/resources/volume/volume.go index 9dcbd1ccb..6ad23240e 100644 --- a/resources/volume/volume.go +++ b/resources/volume/volume.go @@ -303,8 +303,8 @@ func (v *Plugin) GetMetricsDescription(ctx context.Context) (*resources.GetMetri return resp, mapstructure.Decode(v.v.GetMetricsDescription(), resp) } -// ResolveNodeResourceInfoToMetrics . -func (v *Plugin) ResolveNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, info *resources.NodeResourceInfo) (*resources.ResolveNodeResourceInfoToMetricsResponse, error) { +// ConvertNodeResourceInfoToMetrics . +func (v *Plugin) ConvertNodeResourceInfoToMetrics(ctx context.Context, podName string, nodeName string, info *resources.NodeResourceInfo) (*resources.ConvertNodeResourceInfoToMetricsResponse, error) { capacity, usage := &types.NodeResourceArgs{}, &types.NodeResourceArgs{} if err := capacity.ParseFromRawParams(coretypes.RawParams(info.Capacity)); err != nil { return nil, err @@ -313,7 +313,7 @@ func (v *Plugin) ResolveNodeResourceInfoToMetrics(ctx context.Context, podName s return nil, err } - metrics := v.v.ResolveNodeResourceInfoToMetrics(podName, nodeName, capacity, usage) - resp := &resources.ResolveNodeResourceInfoToMetricsResponse{} + metrics := v.v.ConvertNodeResourceInfoToMetrics(podName, nodeName, capacity, usage) + resp := &resources.ConvertNodeResourceInfoToMetricsResponse{} return resp, mapstructure.Decode(metrics, resp) }