Skip to content

Commit

Permalink
resource request & limit (#258)
Browse files Browse the repository at this point in the history
* re-design data structure for resource request/limit

* modify grpc api: add *_request field in deployopt

* container records request and limit

* module change: independent resources module

* realloc adapts for resource request/limit

* pass unittest and add more test

* GetContainer response contains resource request
  • Loading branch information
jschwinger233 authored Nov 6, 2020
1 parent 1631542 commit 51a0fa5
Show file tree
Hide file tree
Showing 57 changed files with 7,515 additions and 4,410 deletions.
89 changes: 45 additions & 44 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/projecteru2/core/cluster"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/metrics"
resourcetypes "github.com/projecteru2/core/resources/types"

"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/sanity-io/litter"
Expand All @@ -27,12 +29,6 @@ func (c *Calcium) CreateContainer(ctx context.Context, opts *types.DeployOptions
return nil, errors.WithStack(types.NewDetailedErr(types.ErrBadCount, opts.Count))
}

for _, req := range opts.ResourceRequests {
if err := req.DeployValidate(); err != nil {
return nil, errors.WithStack(err)
}
}

ch, err := c.doCreateWorkloads(ctx, opts)
return ch, errors.WithStack(err)
}
Expand All @@ -46,7 +42,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

var (
err error
planMap map[types.ResourceType]types.ResourcePlans
planMap map[types.ResourceType]resourcetypes.ResourcePlans
deployMap map[string]*types.DeployInfo
rollbackMap map[string][]int
)
Expand Down Expand Up @@ -81,13 +77,11 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

// commit changes
nodes := []*types.Node{}
for nodeName, deployInfo := range deployMap {
for nodename, deployInfo := range deployMap {
for _, plan := range planMap {
plan.ApplyChangesOnNode(nodeMap[nodeName], utils.Range(deployInfo.Deploy)...)
plan.ApplyChangesOnNode(nodeMap[nodename], utils.Range(deployInfo.Deploy)...)
}
nodes = append(nodes, nodeMap[nodeName])
}
for nodename, deployInfo := range deployMap {
nodes = append(nodes, nodeMap[nodename])
if err = c.store.SaveProcessing(ctx, opts, nodename, deployInfo.Deploy); err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -127,7 +121,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
return ch, err
}

func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateContainerMessage, opts *types.DeployOptions, planMap map[types.ResourceType]types.ResourcePlans, deployMap map[string]*types.DeployInfo) (_ map[string][]int, err error) {
func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateContainerMessage, opts *types.DeployOptions, planMap map[types.ResourceType]resourcetypes.ResourcePlans, deployMap map[string]*types.DeployInfo) (_ map[string][]int, err error) {
wg := sync.WaitGroup{}
wg.Add(len(deployMap))

Expand All @@ -151,7 +145,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateCo
}

// deploy scheduled containers on one node
func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.CreateContainerMessage, nodeName string, opts *types.DeployOptions, deployInfo *types.DeployInfo, planMap map[types.ResourceType]types.ResourcePlans, seq int) (indices []int, err error) {
func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.CreateContainerMessage, nodeName string, opts *types.DeployOptions, deployInfo *types.DeployInfo, planMap map[types.ResourceType]resourcetypes.ResourcePlans, seq int) (indices []int, err error) {
node, err := c.doGetAndPrepareNode(ctx, nodeName, opts.Image)
if err != nil {
for i := 0; i < deployInfo.Deploy; i++ {
Expand All @@ -178,18 +172,18 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
ch <- createMsg
}()

resources := &types.Resources{}
o := types.DispenseOptions{
rsc := &types.Resources{}
o := resourcetypes.DispenseOptions{
Node: node,
Index: idx,
}
for _, plan := range planMap {
if e = plan.Dispense(o, resources); e != nil {
if e = plan.Dispense(o, rsc); e != nil {
return
}
}

createMsg.Resources = *resources
createMsg.Resources = *rsc
e = c.doDeployOneWorkload(ctx, node, opts, createMsg, seq+idx, deployInfo.Deploy-1-idx)
}()
}
Expand All @@ -216,23 +210,30 @@ func (c *Calcium) doDeployOneWorkload(
) (err error) {
config := c.doMakeContainerOptions(no, msg, opts, node)
container := &types.Container{
Name: config.Name,
Labels: config.Labels,
Podname: opts.Podname,
Nodename: node.Name,
CPU: msg.CPU,
Quota: msg.Quota,
Memory: msg.Memory,
Storage: msg.Storage,
Hook: opts.Entrypoint.Hook,
Privileged: opts.Entrypoint.Privileged,
Engine: node.Engine,
SoftLimit: opts.SoftLimit,
Image: opts.Image,
Env: opts.Env,
User: opts.User,
Volumes: msg.Volume,
VolumePlan: msg.VolumePlan,
Name: config.Name,
Labels: config.Labels,
Podname: opts.Podname,
Nodename: node.Name,
CPURequest: msg.CPURequest,
CPULimit: msg.CPULimit,
QuotaRequest: msg.CPUQuotaRequest,
QuotaLimit: msg.CPUQuotaLimit,
MemoryRequest: msg.MemoryRequest,
MemoryLimit: msg.MemoryLimit,
StorageRequest: msg.StorageRequest,
StorageLimit: msg.StorageLimit,
VolumeRequest: msg.VolumeRequest,
VolumeLimit: msg.VolumeLimit,
VolumePlanRequest: msg.VolumePlanRequest,
VolumePlanLimit: msg.VolumePlanLimit,
Hook: opts.Entrypoint.Hook,
Privileged: opts.Entrypoint.Privileged,
Engine: node.Engine,
SoftLimit: opts.SoftLimit,
Image: opts.Image,
Env: opts.Env,
User: opts.User,
ResourceSubdivisible: true,
}
return utils.Txn(
ctx,
Expand Down Expand Up @@ -317,11 +318,11 @@ func (c *Calcium) doDeployOneWorkload(
func (c *Calcium) doMakeContainerOptions(no int, msg *types.CreateContainerMessage, opts *types.DeployOptions, node *types.Node) *enginetypes.VirtualizationCreateOptions {
config := &enginetypes.VirtualizationCreateOptions{}
// general
config.CPU = msg.CPU
config.Quota = msg.Quota
config.Memory = msg.Memory
config.Storage = msg.Storage
config.NUMANode = node.GetNUMANode(msg.CPU)
config.CPU = msg.CPULimit
config.Quota = msg.CPUQuotaLimit
config.Memory = msg.MemoryLimit
config.Storage = msg.StorageLimit
config.NUMANode = node.GetNUMANode(msg.CPULimit)
config.SoftLimit = opts.SoftLimit
config.RawArgs = opts.RawArgs
config.Lambda = opts.Lambda
Expand All @@ -330,8 +331,8 @@ func (c *Calcium) doMakeContainerOptions(no int, msg *types.CreateContainerMessa
config.Image = opts.Image
config.Stdin = opts.OpenStdin
config.Hosts = opts.ExtraHosts
config.Volumes = msg.Volume.ApplyPlan(msg.VolumePlan).ToStringSlice(false, true)
config.VolumePlan = msg.VolumePlan.ToLiteral()
config.Volumes = msg.VolumeLimit.ApplyPlan(msg.VolumePlanLimit).ToStringSlice(false, true)
config.VolumePlan = msg.VolumePlanLimit.ToLiteral()
config.Debug = opts.Debug
config.Network = opts.NetworkMode
config.Networks = opts.Networks
Expand Down Expand Up @@ -360,8 +361,8 @@ func (c *Calcium) doMakeContainerOptions(no int, msg *types.CreateContainerMessa
env = append(env, fmt.Sprintf("ERU_POD=%s", opts.Podname))
env = append(env, fmt.Sprintf("ERU_NODE_NAME=%s", node.Name))
env = append(env, fmt.Sprintf("ERU_CONTAINER_NO=%d", no))
env = append(env, fmt.Sprintf("ERU_MEMORY=%d", msg.Memory))
env = append(env, fmt.Sprintf("ERU_STORAGE=%d", msg.Storage))
env = append(env, fmt.Sprintf("ERU_MEMORY=%d", msg.MemoryLimit))
env = append(env, fmt.Sprintf("ERU_STORAGE=%d", msg.StorageLimit))
config.Env = env
// basic labels, bind to LabelMeta
config.Labels = map[string]string{
Expand Down
41 changes: 26 additions & 15 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
lockmocks "github.com/projecteru2/core/lock/mocks"
"github.com/projecteru2/core/scheduler"
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
"github.com/projecteru2/core/scheduler/resources"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -40,27 +39,33 @@ func TestCreateContainer(t *testing.T) {
opts.Count = 1

// failed by memory check
opts.ResourceRequests = append(opts.ResourceRequests, resources.CPUMemResourceRequest{Memory: -1})
_, err = c.CreateContainer(ctx, opts)
assert.Error(t, err)
opts.ResourceRequests[0] = resources.CPUMemResourceRequest{Memory: 1}
opts.RawResourceOptions = types.RawResourceOptions{MemoryLimit: -1}
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
ch, err := c.CreateContainer(ctx, opts)
assert.Nil(t, err)
for m := range ch {
assert.Error(t, m.Error)
}

// failed by CPUQuota
opts.ResourceRequests[0] = resources.CPUMemResourceRequest{CPUQuota: -1}
_, err = c.CreateContainer(ctx, opts)
assert.Error(t, err)
opts.RawResourceOptions = types.RawResourceOptions{CPULimit: -1, MemoryLimit: 1}
ch, err = c.CreateContainer(ctx, opts)
assert.Nil(t, err)
for m := range ch {
assert.Error(t, m.Error)
}
}

func TestCreateContainerTxn(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
opts := &types.DeployOptions{
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
ResourceRequests: []types.ResourceRequest{resources.CPUMemResourceRequest{CPUQuota: 1}},
Image: "zc:test",
Entrypoint: &types.Entrypoint{},
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
RawResourceOptions: types.RawResourceOptions{CPULimit: 1},
Image: "zc:test",
Entrypoint: &types.Entrypoint{},
}
store := &storemocks.Store{}
sche := &schedulermocks.Scheduler{}
Expand Down Expand Up @@ -102,6 +107,12 @@ func TestCreateContainerTxn(t *testing.T) {
}
return
}, nil)
sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("int64")).Return(func(nodesInfo []types.NodeInfo, _ int64) []types.NodeInfo {
return nodesInfo
}, len(nodes), nil)
sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(nodesInfo []types.NodeInfo, _ types.VolumeBindings) []types.NodeInfo {
return nodesInfo
}, nil, len(nodes), nil)
sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
func(nodesInfo []types.NodeInfo, _ float64, _ int64) []types.NodeInfo {
for i := range nodesInfo {
Expand All @@ -126,7 +137,7 @@ func TestCreateContainerTxn(t *testing.T) {
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
old := strategy.Plans[strategy.Auto]
strategy.Plans[strategy.Auto] = func(sis []types.StrategyInfo, need, total, _ int, resourceType types.ResourceType) (map[string]*types.DeployInfo, error) {
strategy.Plans[strategy.Auto] = func(sis []strategy.Info, need, total, _ int, resourceType types.ResourceType) (map[string]*types.DeployInfo, error) {
deployInfos := make(map[string]*types.DeployInfo)
for _, si := range sis {
deployInfos[si.Nodename] = &types.DeployInfo{
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (c *Calcium) DissociateContainer(ctx context.Context, IDs []string) (chan *
// then
func(ctx context.Context) error {
log.Infof("[DissociateContainer] Container %s dissociated", container.ID)
return c.store.UpdateNodeResource(ctx, node, container.CPU, container.Quota, container.Memory, container.Storage, container.VolumePlan.IntoVolumeMap(), store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, container.CPURequest, container.QuotaRequest, container.MemoryRequest, container.StorageRequest, container.VolumePlanRequest.IntoVolumeMap(), store.ActionIncr)
},
// rollback
nil,
Expand Down
14 changes: 8 additions & 6 deletions cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ func TestDissociateContainer(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)

c1 := &types.Container{
ID: "c1",
Podname: "p1",
Memory: 5 * int64(units.MiB),
Quota: 0.9,
CPU: types.CPUMap{"2": 90},
Nodename: "node1",
ID: "c1",
Podname: "p1",
MemoryLimit: 5 * int64(units.MiB),
MemoryRequest: 5 * int64(units.MiB),
QuotaLimit: 0.9,
QuotaRequest: 0.9,
CPURequest: types.CPUMap{"2": 90},
Nodename: "node1",
}

node1 := &types.Node{
Expand Down
Loading

0 comments on commit 51a0fa5

Please sign in to comment.