Skip to content

Commit

Permalink
move types.resources to resource
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Feb 3, 2023
1 parent 39a7d58 commit 0ab6590
Show file tree
Hide file tree
Showing 39 changed files with 244 additions and 205 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ unit-test:
./lock/etcdlock/... \
./auth/simple/... \
./discovery/helium... \
./resource/types/. \
./resource/plugins/cpumem/. \
./resource/plugins/cpumem/schedule/. \
./wal/. \
Expand Down
3 changes: 2 additions & 1 deletion cluster/calcium/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
lockmocks "github.com/projecteru2/core/lock/mocks"
resourcemocks "github.com/projecteru2/core/resource/mocks"
plugintypes "github.com/projecteru2/core/resource/plugins/types"
resourcetypes "github.com/projecteru2/core/resource/types"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -40,7 +41,7 @@ func TestCalculateCapacity(t *testing.T) {
Entrypoint: &types.Entrypoint{
Name: "entry",
},
Resources: types.Resources{},
Resources: resourcetypes.Resources{},
DeployStrategy: strategy.Auto,
NodeFilter: &types.NodeFilter{
Includes: []string{name},
Expand Down
75 changes: 38 additions & 37 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/metrics"
resourcetypes "github.com/projecteru2/core/resource/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"
Expand Down Expand Up @@ -52,9 +53,9 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
deployMap map[string]int
rollbackMap map[string][]int
// map[nodename][]Resources
engineParamsMap = map[string][]types.Resources{}
engineParamsMap = map[string][]resourcetypes.Resources{}
// map[nodename][]Resources
workloadResourcesMap = map[string][]types.Resources{}
workloadResourcesMap = map[string][]resourcetypes.Resources{}
)

_ = c.pool.Invoke(func() {
Expand Down Expand Up @@ -151,7 +152,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
}
for nodename, rollbackIndices := range rollbackMap {
if e := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
rollbackResources := utils.Map(rollbackIndices, func(idx int) types.Resources {
rollbackResources := utils.Map(rollbackIndices, func(idx int) resourcetypes.Resources {
return workloadResourcesMap[nodename][idx]
})
return c.rmgr.RollbackAlloc(ctx, nodename, rollbackResources)
Expand All @@ -173,8 +174,8 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
func (c *Calcium) doDeployWorkloads(ctx context.Context,
ch chan *types.CreateWorkloadMessage,
opts *types.DeployOptions,
engineParamsMap map[string][]types.Resources,
workloadResourcesMap map[string][]types.Resources,
engineParamsMap map[string][]resourcetypes.Resources,
workloadResourcesMap map[string][]resourcetypes.Resources,
deployMap map[string]int) (_ map[string][]int, err error) {

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -220,8 +221,8 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context,
nodename string,
opts *types.DeployOptions,
deploy int,
engineParams []types.Resources,
workloadResources []types.Resources,
engineParams []resourcetypes.Resources,
workloadResources []resourcetypes.Resources,
seq int) (indices []int, err error) {

logger := log.WithFunc("calcium.doDeployWorkloadsOnNode").WithField("node", nodename).WithField("ident", opts.ProcessIdent).WithField("deploy", deploy).WithField("seq", seq)
Expand Down Expand Up @@ -292,15 +293,15 @@ func (c *Calcium) doDeployOneWorkload(
node *types.Node,
opts *types.DeployOptions,
msg *types.CreateWorkloadMessage,
config *enginetypes.VirtualizationCreateOptions,
createOpts *enginetypes.VirtualizationCreateOptions,
decrProcessing bool,
) (err error) {
logger := log.WithFunc("calcium.doDeployWorkload").WithField("node", node.Name).WithField("ident", opts.ProcessIdent).WithField("msg", msg)
workload := &types.Workload{
Resources: msg.Resources,
EngineParams: msg.EngineParams,
Name: config.Name,
Labels: config.Labels,
Name: createOpts.Name,
Labels: createOpts.Labels,
Podname: opts.Podname,
Nodename: node.Name,
Hook: opts.Entrypoint.Hook,
Expand All @@ -324,7 +325,7 @@ func (c *Calcium) doDeployOneWorkload(
ctx,
// create workload
func(ctx context.Context) error {
created, err := node.Engine.VirtualizationCreate(ctx, config)
created, err := node.Engine.VirtualizationCreate(ctx, createOpts)
if err != nil {
return err
}
Expand Down Expand Up @@ -437,48 +438,48 @@ func (c *Calcium) doDeployOneWorkload(
}

func (c *Calcium) doMakeWorkloadOptions(ctx context.Context, no int, msg *types.CreateWorkloadMessage, opts *types.DeployOptions, node *types.Node) *enginetypes.VirtualizationCreateOptions {
config := &enginetypes.VirtualizationCreateOptions{}
createOpts := &enginetypes.VirtualizationCreateOptions{}
// general
config.EngineParams = msg.EngineParams
config.RawArgs = opts.RawArgs
config.Lambda = opts.Lambda
config.User = opts.User
config.DNS = opts.DNS
config.Image = opts.Image
config.Stdin = opts.OpenStdin
config.Hosts = opts.ExtraHosts
config.Debug = opts.Debug
config.Networks = opts.Networks
createOpts.EngineParams = msg.EngineParams
createOpts.RawArgs = opts.RawArgs
createOpts.Lambda = opts.Lambda
createOpts.User = opts.User
createOpts.DNS = opts.DNS
createOpts.Image = opts.Image
createOpts.Stdin = opts.OpenStdin
createOpts.Hosts = opts.ExtraHosts
createOpts.Debug = opts.Debug
createOpts.Networks = opts.Networks

// entry
entry := opts.Entrypoint
config.WorkingDir = entry.Dir
config.Privileged = entry.Privileged
config.Sysctl = entry.Sysctls
config.Publish = entry.Publish
config.Restart = entry.Restart
createOpts.WorkingDir = entry.Dir
createOpts.Privileged = entry.Privileged
createOpts.Sysctl = entry.Sysctls
createOpts.Publish = entry.Publish
createOpts.Restart = entry.Restart
if entry.Log != nil {
config.LogType = entry.Log.Type
config.LogConfig = map[string]string{}
createOpts.LogType = entry.Log.Type
createOpts.LogConfig = map[string]string{}
for k, v := range entry.Log.Config {
config.LogConfig[k] = v
createOpts.LogConfig[k] = v
}
}
// name
suffix := utils.RandomString(6)
config.Name = utils.MakeWorkloadName(opts.Name, opts.Entrypoint.Name, suffix)
msg.WorkloadName = config.Name
createOpts.Name = utils.MakeWorkloadName(opts.Name, opts.Entrypoint.Name, suffix)
msg.WorkloadName = createOpts.Name
// command and user
// extra args is dynamically
config.Cmd = opts.Entrypoint.Commands
createOpts.Cmd = opts.Entrypoint.Commands
// env
env := append(opts.Env, fmt.Sprintf("APP_NAME=%s", opts.Name)) //nolint
env = append(env, fmt.Sprintf("ERU_POD=%s", opts.Podname))
env = append(env, fmt.Sprintf("ERU_NODE_NAME=%s", node.Name))
env = append(env, fmt.Sprintf("ERU_WORKLOAD_SEQ=%d", no))
config.Env = env
createOpts.Env = env
// basic labels, bind to LabelMeta
config.Labels = map[string]string{
createOpts.Labels = map[string]string{
cluster.ERUMark: "1",
cluster.LabelMeta: utils.EncodeMetaInLabel(ctx, &types.LabelMeta{
Publish: opts.Entrypoint.Publish,
Expand All @@ -488,8 +489,8 @@ func (c *Calcium) doMakeWorkloadOptions(ctx context.Context, no int, msg *types.
cluster.LabelCoreID: c.identifier,
}
for key, value := range opts.Labels {
config.Labels[key] = value
createOpts.Labels[key] = value
}

return config
return createOpts
}
7 changes: 4 additions & 3 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
lockmocks "github.com/projecteru2/core/lock/mocks"
resourcemocks "github.com/projecteru2/core/resource/mocks"
plugintypes "github.com/projecteru2/core/resource/plugins/types"
resourcetypes "github.com/projecteru2/core/resource/types"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -68,7 +69,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
Resources: types.Resources{},
Resources: resourcetypes.Resources{},
Image: "zc:test",
Entrypoint: &types.Entrypoint{
Name: "good-entrypoint",
Expand Down Expand Up @@ -149,8 +150,8 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.True(t, walCommitted)
walCommitted = false
rmgr.On("Alloc", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
[]types.Resources{{}, {}},
[]types.Resources{
[]resourcetypes.Resources{{}, {}},
[]resourcetypes.Resources{
{node1.Name: {}},
{node2.Name: {}},
},
Expand Down
5 changes: 3 additions & 2 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/resource/plugins"
resourcetypes "github.com/projecteru2/core/resource/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down Expand Up @@ -32,7 +33,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t
ctx,
// if
func(ctx context.Context) (err error) {
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []types.Resources{workload.Resources}, true, plugins.Decr)
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []resourcetypes.Resources{workload.Resources}, true, plugins.Decr)
return err
},
// then
Expand All @@ -44,7 +45,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t
if failedByCond {
return nil
}
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []types.Resources{workload.Resources}, true, plugins.Incr)
_, _, err = c.rmgr.SetNodeResourceUsage(ctx, node.Name, nil, nil, []resourcetypes.Resources{workload.Resources}, true, plugins.Incr)
return err
},
c.config.GlobalTimeout,
Expand Down
7 changes: 4 additions & 3 deletions cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

lockmocks "github.com/projecteru2/core/lock/mocks"
resourcemocks "github.com/projecteru2/core/resource/mocks"
resourcetypes "github.com/projecteru2/core/resource/types"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"

Expand All @@ -25,7 +26,7 @@ func TestDissociateWorkload(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)

c1 := &types.Workload{
Resources: types.Resources{},
Resources: resourcetypes.Resources{},
ID: "c1",
Podname: "p1",
Nodename: "node1",
Expand Down Expand Up @@ -55,8 +56,8 @@ func TestDissociateWorkload(t *testing.T) {
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
// failed by RemoveWorkload
rmgr.On("SetNodeResourceUsage", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
types.Resources{},
types.Resources{},
resourcetypes.Resources{},
resourcetypes.Resources{},
nil,
)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrMockError).Once()
Expand Down
15 changes: 8 additions & 7 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
lockmocks "github.com/projecteru2/core/lock/mocks"
resourcemocks "github.com/projecteru2/core/resource/mocks"
plugintypes "github.com/projecteru2/core/resource/plugins/types"
resourcetypes "github.com/projecteru2/core/resource/types"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -41,7 +42,7 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
Resources: types.Resources{},
Resources: resourcetypes.Resources{},
Image: "zc:test",
Entrypoint: &types.Entrypoint{
Name: "good-entrypoint",
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestLambdaWithWorkloadIDReturned(t *testing.T) {
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
Resources: types.Resources{},
Resources: resourcetypes.Resources{},
Image: "zc:test",
Entrypoint: &types.Entrypoint{
Name: "good-entrypoint",
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestLambdaWithError(t *testing.T) {
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
Resources: types.Resources{},
Resources: resourcetypes.Resources{},
Image: "zc:test",
Entrypoint: &types.Entrypoint{
Name: "good-entrypoint",
Expand Down Expand Up @@ -195,8 +196,8 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
types.Resources{},
types.Resources{},
resourcetypes.Resources{},
resourcetypes.Resources{},
[]string{},
nil,
)
Expand All @@ -218,8 +219,8 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
20, nil,
)
rmgr.On("Alloc", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
[]types.Resources{{}, {}},
[]types.Resources{
[]resourcetypes.Resources{{}, {}},
[]resourcetypes.Resources{
{node1.Name: {}},
{node2.Name: {}},
},
Expand Down
5 changes: 3 additions & 2 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/resource/plugins"
resourcetypes "github.com/projecteru2/core/resource/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -20,7 +21,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
logger.Error(ctx, err)
return nil, err
}
var res types.Resources
var res resourcetypes.Resources
var node *types.Node
var err error

Expand Down Expand Up @@ -212,7 +213,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
n.Labels = opts.Labels
}

var origin types.Resources
var origin resourcetypes.Resources
return utils.Txn(ctx,
// if: update node resource capacity success
func(ctx context.Context) error {
Expand Down
5 changes: 3 additions & 2 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
lockmocks "github.com/projecteru2/core/lock/mocks"
resourcemocks "github.com/projecteru2/core/resource/mocks"
plugintypes "github.com/projecteru2/core/resource/plugins/types"
resourcetypes "github.com/projecteru2/core/resource/types"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"

Expand Down Expand Up @@ -42,7 +43,7 @@ func TestAddNode(t *testing.T) {
assert.Error(t, err)
rmgr.AssertExpectations(t)
rmgr.On("AddNode", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
types.Resources{}, nil)
resourcetypes.Resources{}, nil)
rmgr.On("RemoveNode", mock.Anything, mock.Anything).Return(nil)

// failed by store.AddNode
Expand Down Expand Up @@ -248,7 +249,7 @@ func TestSetNode(t *testing.T) {
opts.Labels = labels

// failed by SetNodeResourceCapacity
opts.Resources = types.Resources{"a": {"a": 1}}
opts.Resources = resourcetypes.Resources{"a": {"a": 1}}
rmgr.On("SetNodeResourceCapacity", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
nil, nil, types.ErrMockError,
).Once()
Expand Down
7 changes: 4 additions & 3 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
resourcetypes "github.com/projecteru2/core/resource/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -31,9 +32,9 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
}

func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workload *types.Workload, originWorkload types.Workload, opts *types.ReallocOptions) error {
var resources types.Resources
var deltaResources types.Resources
var engineParams types.Resources
var resources resourcetypes.Resources
var deltaResources resourcetypes.Resources
var engineParams resourcetypes.Resources
var err error

logger := log.WithFunc("calcium.doReallocOnNode").WithField("opts", opts)
Expand Down
Loading

0 comments on commit 0ab6590

Please sign in to comment.