Skip to content

Commit

Permalink
Merge branch 'master' of github.com:projecteru2/core into zc/conn
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Feb 19, 2021
2 parents da2f392 + 86cafe8 commit b2f10d3
Show file tree
Hide file tree
Showing 21 changed files with 1,078 additions and 635 deletions.
9 changes: 8 additions & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Calcium struct {
scheduler scheduler.Scheduler
source source.Source
watcher discovery.Service
wal *WAL
}

// New returns a new cluster config
Expand Down Expand Up @@ -52,11 +53,17 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
default:
log.Warn("[Calcium] SCM not set, build API disabled")
}
if err != nil {
log.Errorf("[Calcium] SCAM failed: %v", err)
return nil, err
}

// set watcher
watcher := helium.New(config.GRPCConfig, store)

return &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}, err
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
return cal, err
}

// Finalizer use for defer
Expand Down
45 changes: 39 additions & 6 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package calcium
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -32,6 +34,11 @@ func (d *dummyLock) Unlock(ctx context.Context) error {
}

func NewTestCluster() *Calcium {
walDir, err := ioutil.TempDir(os.TempDir(), "core.wal.*")
if err != nil {
panic(err)
}

c := &Calcium{}
c.config = types.Config{
GlobalTimeout: 30 * time.Second,
Expand All @@ -43,34 +50,60 @@ func NewTestCluster() *Calcium {
ShareBase: 100,
},
MaxConcurrency: 10,
WALFile: filepath.Join(walDir, "core.wal.log"),
}
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
c.source = &sourcemocks.Source{}

wal, err := newCalciumWAL(c)
if err != nil {
panic(err)
}
c.wal = wal

return c
}

func TestNewCluster(t *testing.T) {
_, err := New(types.Config{}, false)
config := types.Config{WALFile: "/tmp/a"}
_, err := New(config, false)
assert.Error(t, err)
c, err := New(types.Config{}, true)

c, err := New(config, true)
assert.NoError(t, err)

c.Finalizer()
privFile, err := ioutil.TempFile("", "priv")
assert.NoError(t, err)
_, err = privFile.WriteString("privkey")
assert.NoError(t, err)
defer privFile.Close()

config.Git = types.GitConfig{PrivateKey: privFile.Name()}

var wg sync.WaitGroup
wg.Add(1)
go func() {
c, err := New(types.Config{Git: types.GitConfig{SCMType: "gitlab", PrivateKey: privFile.Name()}}, true)
assert.NoError(t, err)
defer wg.Done()
config.Git.SCMType = "gitlab"
config.WALFile = "/tmp/b"
c, err := New(config, true)
assert.NoError(t, err, err)
c.Finalizer()
}()

wg.Add(1)
go func() {
c, err := New(types.Config{Git: types.GitConfig{SCMType: "github", PrivateKey: privFile.Name()}}, true)
assert.NoError(t, err)
defer wg.Done()
config.WALFile = "/tmp/c"
config.Git.SCMType = "github"
c, err := New(config, true)
assert.NoError(t, err, err)
c.Finalizer()
}()

wg.Wait()
}

func TestFinalizer(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,9 @@ func (c *Calcium) doMakeWorkloadOptions(no int, msg *types.CreateWorkloadMessage
entry := opts.Entrypoint
config.WorkingDir = entry.Dir
config.Privileged = entry.Privileged
config.RestartPolicy = entry.RestartPolicy
config.Sysctl = entry.Sysctls
config.Publish = entry.Publish
config.Restart = entry.Restart
if entry.Log != nil {
config.LogType = entry.Log.Type
config.LogConfig = entry.Log.Config
Expand Down
75 changes: 72 additions & 3 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestCreateWorkload(t *testing.T) {
}

func TestCreateWorkloadTxn(t *testing.T) {
c := NewTestCluster()
c, nodes := newCreateWorkloadCluster(t)
ctx := context.Background()
opts := &types.DeployOptions{
Name: "zc:name",
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
},
Engine: engine,
}
nodes := []*types.Node{node1, node2}
nodes = []*types.Node{node1, node2}

store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -155,6 +155,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(
errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"),
).Once()

ch, err := c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
cnt := 0
Expand All @@ -166,7 +167,6 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 1, cnt)

// commit resource changes fails: UpdateNodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
old := strategy.Plans[strategy.Auto]
strategy.Plans[strategy.Auto] = func(sis []strategy.Info, need, total, _ int) (map[string]int, error) {
Expand All @@ -189,6 +189,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.Error(t, m.Error, "UpdateNodes1")
}
assert.EqualValues(t, 1, cnt)
node1, node2 = nodes[0], nodes[1]
assert.EqualValues(t, 1, node1.CPUUsed)
assert.EqualValues(t, 1, node2.CPUUsed)
node1.CPUUsed = 0
Expand All @@ -208,6 +209,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
}
return
}, nil)
engine = node1.Engine.(*enginemocks.API)
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -287,3 +289,70 @@ func TestCreateWorkloadTxn(t *testing.T) {
store.AssertExpectations(t)
engine.AssertExpectations(t)
}

func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
c := NewTestCluster()
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
scheduler.InitSchedulerV1(c.scheduler)

engine := &enginemocks.API{}
pod1 := &types.Pod{Name: "p1"}
node1 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "n1",
},
Engine: engine,
}
node2 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "n2",
},
Engine: engine,
}
nodes := []*types.Node{node1, node2}

store := c.store.(*storemocks.Store)
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)

// doAllocResource fails: MakeDeployStatus
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
store.On("GetNode",
mock.AnythingOfType("*context.emptyCtx"),
mock.AnythingOfType("string"),
).Return(
func(_ context.Context, name string) (node *types.Node) {
node = node1
if name == "n2" {
node = node2
}
return
}, nil)

sche := c.scheduler.(*schedulermocks.Scheduler)
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, len(nodes), nil)
sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, len(nodes), nil)
sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, nil, len(nodes), nil)
sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo {
for i := range scheduleInfos {
scheduleInfos[i].Capacity = 1
}
return scheduleInfos
}, len(nodes), nil)

return c, nodes
}
32 changes: 31 additions & 1 deletion cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ import (
"strconv"
"sync"

"github.com/google/uuid"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"
)

const exitDataPrefix = "[exitcode] "
const (
exitDataPrefix = "[exitcode] "
labelLambdaID = "LambdaID"
)

// RunAndWait implement lambda
func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) {
Expand All @@ -28,6 +34,10 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
return nil, types.ErrRunAndWaitCountOneWithStdin
}

commit, err := c.walCreateLambda(ctx, opts)
if err != nil {
return nil, err
}
createChan, err := c.CreateWorkload(ctx, opts)
if err != nil {
log.Errorf("[RunAndWait] Create workload error %s", err)
Expand Down Expand Up @@ -113,8 +123,28 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
go func() {
defer close(runMsgCh)
wg.Wait()
if err := commit(context.Background()); err != nil {
log.Errorf("[RunAndWait] Commit WAL %s failed", eventCreateLambda)
}
log.Info("[RunAndWait] Finish run and wait for workloads")
}()

return runMsgCh, nil
}

func (c *Calcium) walCreateLambda(ctx context.Context, opts *types.DeployOptions) (wal.Commit, error) {
uid, err := uuid.NewRandom()
if err != nil {
return nil, err
}

lambdaID := uid.String()

if opts.Labels != nil {
opts.Labels[labelLambdaID] = lambdaID
} else {
opts.Labels = map[string]string{labelLambdaID: lambdaID}
}

return c.wal.logCreateLambda(ctx, opts)
}
56 changes: 56 additions & 0 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package calcium

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"
walmocks "github.com/projecteru2/core/wal/mocks"
)

func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
c, _ := newCreateWorkloadCluster(t)
c.wal = &WAL{WAL: &walmocks.WAL{}}

mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
var walCommitted bool
commit := wal.Commit(func(context.Context) error {
walCommitted = true
return nil
})
mwal.On("Log", mock.Anything, eventCreateLambda, mock.Anything).Return(commit, nil).Once()

opts := &types.DeployOptions{
Name: "zc:name",
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
ResourceOpts: types.ResourceOptions{CPUQuotaLimit: 1},
Image: "zc:test",
Entrypoint: &types.Entrypoint{
Name: "good-entrypoint",
},
}

mstore := c.store.(*storemocks.Store)
mstore.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")).Once()

ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
require.NoError(t, err)
require.NotNil(t, ch)
require.False(t, walCommitted)
require.Nil(t, <-ch) // recv nil due to the ch will be closed.

lambdaID, exists := opts.Labels[labelLambdaID]
require.True(t, exists)
require.True(t, len(lambdaID) > 1)
require.True(t, walCommitted)
}
Loading

0 comments on commit b2f10d3

Please sign in to comment.