Skip to content

Commit

Permalink
feat: WAL for creating labmda
Browse files Browse the repository at this point in the history
  • Loading branch information
anrs committed Feb 18, 2021
1 parent c6bd2aa commit 7dfa673
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 104 deletions.
9 changes: 8 additions & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Calcium struct {
scheduler scheduler.Scheduler
source source.Source
watcher discovery.Service
wal *WAL
}

// New returns a new cluster config
Expand Down Expand Up @@ -52,11 +53,17 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
default:
log.Warn("[Calcium] SCM not set, build API disabled")
}
if err != nil {
log.Errorf("[Calcium] SCAM failed: %v", err)
return nil, err
}

// set watcher
watcher := helium.New(config.GRPCConfig, store)

return &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}, err
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
return cal, err
}

// Finalizer use for defer
Expand Down
45 changes: 39 additions & 6 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package calcium
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -32,6 +34,11 @@ func (d *dummyLock) Unlock(ctx context.Context) error {
}

func NewTestCluster() *Calcium {
walDir, err := ioutil.TempDir(os.TempDir(), "core.wal.*")
if err != nil {
panic(err)
}

c := &Calcium{}
c.config = types.Config{
GlobalTimeout: 30 * time.Second,
Expand All @@ -42,34 +49,60 @@ func NewTestCluster() *Calcium {
MaxShare: -1,
ShareBase: 100,
},
WALFile: filepath.Join(walDir, "core.wal.log"),
}
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
c.source = &sourcemocks.Source{}

wal, err := newCalciumWAL(c)
if err != nil {
panic(err)
}
c.wal = wal

return c
}

func TestNewCluster(t *testing.T) {
_, err := New(types.Config{}, false)
config := types.Config{WALFile: "/tmp/a"}
_, err := New(config, false)
assert.Error(t, err)
c, err := New(types.Config{}, true)

c, err := New(config, true)
assert.NoError(t, err)

c.Finalizer()
privFile, err := ioutil.TempFile("", "priv")
assert.NoError(t, err)
_, err = privFile.WriteString("privkey")
assert.NoError(t, err)
defer privFile.Close()

config.Git = types.GitConfig{PrivateKey: privFile.Name()}

var wg sync.WaitGroup
wg.Add(1)
go func() {
c, err := New(types.Config{Git: types.GitConfig{SCMType: "gitlab", PrivateKey: privFile.Name()}}, true)
assert.NoError(t, err)
defer wg.Done()
config.Git.SCMType = "gitlab"
config.WALFile = "/tmp/b"
c, err := New(config, true)
assert.NoError(t, err, err)
c.Finalizer()
}()

wg.Add(1)
go func() {
c, err := New(types.Config{Git: types.GitConfig{SCMType: "github", PrivateKey: privFile.Name()}}, true)
assert.NoError(t, err)
defer wg.Done()
config.WALFile = "/tmp/c"
config.Git.SCMType = "github"
c, err := New(config, true)
assert.NoError(t, err, err)
c.Finalizer()
}()

wg.Wait()
}

func TestFinalizer(t *testing.T) {
Expand Down
133 changes: 72 additions & 61 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestCreateWorkload(t *testing.T) {
}

func TestCreateWorkloadTxn(t *testing.T) {
c := NewTestCluster()
c, nodes := newCreateWorkloadCluster(t)
ctx := context.Background()
opts := &types.DeployOptions{
Name: "zc:name",
Expand All @@ -94,69 +94,12 @@ func TestCreateWorkloadTxn(t *testing.T) {
Name: "good-entrypoint",
},
}
store := &storemocks.Store{}
sche := &schedulermocks.Scheduler{}
scheduler.InitSchedulerV1(sche)
c.store = store
c.scheduler = sche
engine := &enginemocks.API{}

pod1 := &types.Pod{Name: "p1"}
node1 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "n1",
},
Engine: engine,
}
node2 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "n2",
},
Engine: engine,
}
nodes := []*types.Node{node1, node2}

store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)

// doAllocResource fails: MakeDeployStatus
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
store.On("GetNode",
mock.AnythingOfType("*context.emptyCtx"),
mock.AnythingOfType("string"),
).Return(
func(_ context.Context, name string) (node *types.Node) {
node = node1
if name == "n2" {
node = node2
}
return
}, nil)
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, len(nodes), nil)
sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, len(nodes), nil)
sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, nil, len(nodes), nil)
sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo {
for i := range scheduleInfos {
scheduleInfos[i].Capacity = 1
}
return scheduleInfos
}, len(nodes), nil)
store := c.store.(*storemocks.Store)
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(
errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"),
).Once()

ch, err := c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
cnt := 0
Expand All @@ -168,7 +111,6 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 1, cnt)

// commit resource changes fails: UpdateNodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
old := strategy.Plans[strategy.Auto]
strategy.Plans[strategy.Auto] = func(sis []strategy.Info, need, total, _ int) (map[string]int, error) {
Expand All @@ -191,6 +133,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.Error(t, m.Error, "UpdateNodes1")
}
assert.EqualValues(t, 1, cnt)
node1, node2 := nodes[0], nodes[1]
assert.EqualValues(t, 1, node1.CPUUsed)
assert.EqualValues(t, 1, node2.CPUUsed)
node1.CPUUsed = 0
Expand Down Expand Up @@ -221,6 +164,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
}
return
}, nil)
engine := node1.Engine.(*enginemocks.API)
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -299,3 +243,70 @@ func TestCreateWorkloadTxn(t *testing.T) {
assert.EqualValues(t, 1, node1.CPUUsed+node2.CPUUsed)
return
}

func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
c := NewTestCluster()
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
scheduler.InitSchedulerV1(c.scheduler)

engine := &enginemocks.API{}
pod1 := &types.Pod{Name: "p1"}
node1 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "n1",
},
Engine: engine,
}
node2 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "n2",
},
Engine: engine,
}
nodes := []*types.Node{node1, node2}

store := c.store.(*storemocks.Store)
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)

// doAllocResource fails: MakeDeployStatus
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
store.On("GetNode",
mock.AnythingOfType("*context.emptyCtx"),
mock.AnythingOfType("string"),
).Return(
func(_ context.Context, name string) (node *types.Node) {
node = node1
if name == "n2" {
node = node2
}
return
}, nil)

sche := c.scheduler.(*schedulermocks.Scheduler)
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, len(nodes), nil)
sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, len(nodes), nil)
sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo {
return scheduleInfos
}, nil, len(nodes), nil)
sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo {
for i := range scheduleInfos {
scheduleInfos[i].Capacity = 1
}
return scheduleInfos
}, len(nodes), nil)

return c, nodes
}
32 changes: 31 additions & 1 deletion cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ import (
"strconv"
"sync"

"github.com/google/uuid"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"
)

const exitDataPrefix = "[exitcode] "
const (
exitDataPrefix = "[exitcode] "
labelLambdaID = "LambdaID"
)

// RunAndWait implement lambda
func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) {
Expand All @@ -28,6 +34,10 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
return nil, types.ErrRunAndWaitCountOneWithStdin
}

commit, err := c.walCreateLambda(ctx, opts)
if err != nil {
return nil, err
}
createChan, err := c.CreateWorkload(ctx, opts)
if err != nil {
log.Errorf("[RunAndWait] Create workload error %s", err)
Expand Down Expand Up @@ -113,8 +123,28 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
go func() {
defer close(runMsgCh)
wg.Wait()
if err := commit(context.Background()); err != nil {
log.Errorf("[RunAndWait] Commit WAL %s failed", eventCreateLambda)
}
log.Info("[RunAndWait] Finish run and wait for workloads")
}()

return runMsgCh, nil
}

func (c *Calcium) walCreateLambda(ctx context.Context, opts *types.DeployOptions) (wal.Commit, error) {
uid, err := uuid.NewRandom()
if err != nil {
return nil, err
}

lambdaID := uid.String()

if opts.Labels != nil {
opts.Labels[labelLambdaID] = lambdaID
} else {
opts.Labels = map[string]string{labelLambdaID: lambdaID}
}

return c.wal.logCreateLambda(ctx, opts)
}
Loading

0 comments on commit 7dfa673

Please sign in to comment.