Skip to content

Commit

Permalink
refactor stage 2
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jul 14, 2022
1 parent 4fe8207 commit ce50438
Show file tree
Hide file tree
Showing 71 changed files with 2,159 additions and 2,205 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ cloc:

unit-test:
go vet `go list ./... | grep -v '/vendor/' | grep -v '/tools'` && \
go test -race -timeout 240s -count=1 -vet=off -cover ./utils/... \
go test -race -timeout 600s -count=1 -vet=off -cover ./utils/... \
./types/... \
./store/etcdv3/. \
./store/etcdv3/embedded/. \
Expand All @@ -67,7 +67,7 @@ unit-test:
./wal/kv/. \
./store/redis/... \
./lock/redis/... && \
go test -timeout 240s -count=1 -cover ./cluster/calcium/...
go test -timeout 600s -count=1 -cover ./cluster/calcium/...

lint:
golangci-lint run
2 changes: 1 addition & 1 deletion client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ func checkAlive(ctx context.Context, rpc *clientWithStatus, timeout time.Duratio

func (c *Pool) updateClientsStatus(ctx context.Context, timeout time.Duration) {
wg := &sync.WaitGroup{}
defer wg.Wait()
for _, rpc := range c.rpcClients {
wg.Add(1)
go func(r *clientWithStatus) {
defer wg.Done()
r.alive = checkAlive(ctx, r, timeout)
}(rpc)
}
wg.Wait()
}
23 changes: 14 additions & 9 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
return nil, errors.WithStack(types.ErrNoBuildPod)
}

// get node by scheduler
ch, err := c.ListPodNodes(ctx, &types.ListNodesOptions{Podname: c.config.Docker.BuildPod})
// get nodes
nodes, err := c.store.GetNodesByPod(ctx, c.config.Docker.BuildPod, nil, false)
if err != nil {
return nil, err
}

nodes := []*types.Node{}
for n := range ch {
nodes = append(nodes, n)
}
if len(nodes) == 0 {
return nil, errors.WithStack(types.ErrInsufficientNodes)
}
Expand All @@ -79,14 +75,14 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
}

func (c *Calcium) getMostIdleNode(ctx context.Context, nodes []*types.Node) (*types.Node, error) {
nodeNames := []string{}
nodenames := []string{}
nodeMap := map[string]*types.Node{}
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
nodenames = append(nodenames, node.Name)
nodeMap[node.Name] = node
}

mostIdleNode, err := c.rmgr.GetMostIdleNode(ctx, nodeNames)
mostIdleNode, err := c.rmgr.GetMostIdleNode(ctx, nodenames)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,6 +190,15 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod

}

func (c *Calcium) getWorkloadNode(ctx context.Context, id string) (*types.Node, error) {
w, err := c.store.GetWorkload(ctx, id)
if err != nil {
return nil, err
}
node, err := c.store.GetNode(ctx, w.Nodename)
return node, err
}

func withImageBuiltChannel(f func(chan *types.BuildImageMessage)) chan *types.BuildImageMessage {
ch := make(chan *types.BuildImageMessage)
utils.SentryGo(func() {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
return nil, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint
}

go cal.InitMetrics()
go cal.InitMetrics(ctx)

return cal, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewTestCluster() *Calcium {
ServiceDiscoveryPushInterval: 15 * time.Second,
},
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
MaxConcurrency: 100000,
HAKeepaliveInterval: 16 * time.Second,
}
c.store = &storemocks.Store{}
Expand Down
37 changes: 18 additions & 19 deletions cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,47 @@ import (
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/sanity-io/litter"
"golang.org/x/exp/maps"

"github.com/pkg/errors"
)

// CalculateCapacity calculates capacity
func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptions) (*types.CapacityMessage, error) {
logger := log.WithField("Calcium", "CalculateCapacity").WithField("opts", opts)
logger.Infof(ctx, "[CalculateCapacity] Calculate capacity with options:\n%s", litter.Options{Compact: true}.Sdump(opts))
var err error
log.Infof(ctx, "[CalculateCapacity] Calculate capacity with options:\n%s", litter.Options{Compact: true}.Sdump(opts))
msg := &types.CapacityMessage{
Total: 0,
NodeCapacities: map[string]int{},
}

return msg, c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
nodes := []string{}
for node := range nodeMap {
nodes = append(nodes, node)
}
nodenames := maps.Keys(nodeMap)

if opts.DeployStrategy != strategy.Dummy {
if msg.NodeCapacities, err = c.doGetDeployMap(ctx, nodes, opts); err != nil {
if msg.NodeCapacities, err = c.doGetDeployStrategy(ctx, nodenames, opts); err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doGetDeployMap failed: %+v", err)
return err
}

for _, capacity := range msg.NodeCapacities {
msg.Total += capacity
}
} else {
var infos map[string]*resources.NodeCapacityInfo
infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts)
if err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
return err
}
if msg.Total <= 0 {
return errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
}
for node, info := range infos {
msg.NodeCapacities[node] = info.Capacity
}
return nil
}

var infos map[string]*resources.NodeCapacityInfo
infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodenames, opts.ResourceOpts)
if err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
return err
}
if msg.Total <= 0 {
return errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
}
for node, info := range infos {
msg.NodeCapacities[node] = info.Capacity
}
return nil
})
Expand Down
75 changes: 47 additions & 28 deletions cluster/calcium/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package calcium

import (
"context"
"errors"
"testing"

enginemocks "github.com/projecteru2/core/engine/mocks"
Expand All @@ -21,66 +20,86 @@ func TestCalculateCapacity(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
engine := &enginemocks.API{}

// pod1 := &types.Pod{Name: "p1"}
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(ctx, nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)

engine := &enginemocks.API{}
name := "n1"
node1 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "n1",
Name: name,
},
Engine: engine,
}
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
// failed by call plugin

opts := &types.DeployOptions{
Entrypoint: &types.Entrypoint{
Name: "entry",
},
ResourceOpts: types.WorkloadResourceOpts{},
DeployStrategy: strategy.Auto,
NodeFilter: types.NodeFilter{
Includes: []string{"n1"},
Includes: []string{name},
},
Count: 3,
}
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, errors.New("not implemented")).Times(3)

// failed by call plugin
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, types.ErrNoETCD).Once()
_, err := c.CalculateCapacity(ctx, opts)
assert.Error(t, err)

// failed by get deploy status
nrim := map[string]*resources.NodeCapacityInfo{
name: {
NodeName: name,
Capacity: 10,
Usage: 0.5,
Rate: 0.5,
Weight: 100,
},
}
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(
nrim, 100, nil).Times(3)
store.On("GetDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err = c.CalculateCapacity(ctx, opts)
assert.Error(t, err)
store.On("GetDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(map[string]int{"n1": 0}, nil)

// failed by get deploy plan
opts.DeployStrategy = "FAKE"
// failed by get deploy strategy
store.On("GetDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(map[string]int{name: 0}, nil)
opts.Count = -1
_, err = c.CalculateCapacity(ctx, opts)
assert.Error(t, err)

// success
opts.Count = 1
_, err = c.CalculateCapacity(ctx, opts)
assert.NoError(t, err)

// strategy: dummy
opts.DeployStrategy = strategy.Dummy

// failed by GetNodesDeployCapacity
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, types.ErrNoETCD).Once()
_, err = c.CalculateCapacity(ctx, opts)
assert.Error(t, err)

// failed by total <= 0
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(nil, -1, nil).Once()
_, err = c.CalculateCapacity(ctx, opts)
assert.Error(t, err)

// success
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(
map[string]*resources.NodeCapacityInfo{
"n1": {
NodeName: "n1",
Capacity: 10,
Usage: 0.5,
Rate: 0.5,
Weight: 100,
},
},
10, nil,
)
nrim, 10, nil)
msg, err := c.CalculateCapacity(ctx, opts)
assert.NoError(t, err)
assert.Equal(t, msg.NodeCapacities["n1"], 10)
assert.Equal(t, msg.NodeCapacities[name], 10)
assert.Equal(t, msg.Total, 10)

rmgr.AssertExpectations(t)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
defer close(ch)
wg := &sync.WaitGroup{}
wg.Add(len(ids))
defer wg.Wait()
for _, id := range ids {
id := id
_ = c.pool.Invoke(func() {
Expand Down Expand Up @@ -59,7 +60,6 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
}
})
}
wg.Wait()
})

return ch, nil
Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestControlStart(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Lock", mock.Anything).Return(ctx, nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
// failed by GetWorkloads
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestControlStop(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Lock", mock.Anything).Return(ctx, nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
workload := &types.Workload{
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestControlRestart(t *testing.T) {
ctx := context.Background()
store := c.store.(*storemocks.Store)
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Lock", mock.Anything).Return(ctx, nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
engine := &enginemocks.API{}
Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
defer close(ch)

wg := sync.WaitGroup{}
wg.Add(len(opts.Targets))
defer wg.Wait()
log.Infof(ctx, "[Copy] Copy %d workloads files", len(opts.Targets))

// workload one by one
for id, paths := range opts.Targets {
wg.Add(1)

utils.SentryGo(func(id string, paths []string) func() {
return func() {
defer wg.Done()
Expand Down Expand Up @@ -61,7 +61,6 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
}
}(id, paths))
}
wg.Wait()
})
return ch, nil
}
5 changes: 3 additions & 2 deletions cluster/calcium/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestCopy(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()

// failed by target
_, err := c.Copy(ctx, &types.CopyOptions{
Targets: map[string][]string{},
})
Expand All @@ -32,7 +33,7 @@ func TestCopy(t *testing.T) {
}
store := c.store.(*storemocks.Store)
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Lock", mock.Anything).Return(ctx, nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
// failed by GetWorkload
Expand All @@ -47,7 +48,7 @@ func TestCopy(t *testing.T) {
workload.Engine = engine
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
// failed by VirtualizationCopyFrom
engine.On("VirtualizationCopyFrom", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, 0, int64(0), types.ErrNilEngine).Twice()
engine.On("VirtualizationCopyFrom", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, 0, int64(0), types.ErrNoETCD).Twice()
ch, err = c.Copy(ctx, opts)
assert.NoError(t, err)
for r := range ch {
Expand Down
Loading

0 comments on commit ce50438

Please sign in to comment.