Skip to content

Commit

Permalink
some refines to make RunAndWait "run and wait"
Browse files Browse the repository at this point in the history
  • Loading branch information
tonicmuroq committed Apr 15, 2021
1 parent 9a2dea0 commit d36ef79
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 145 deletions.
193 changes: 100 additions & 93 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,142 +25,152 @@ const (
)

// RunAndWait implement lambda
func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) {
func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) ([]string, <-chan *types.AttachWorkloadMessage, error) {
workloadIDs := []string{}

logger := log.WithField("Calcium", "RunAndWait").WithField("opts", opts)
if err := opts.Validate(); err != nil {
return nil, logger.Err(err)
return workloadIDs, nil, logger.Err(err)
}
opts.Lambda = true
// count = 1 && OpenStdin
if opts.OpenStdin && (opts.Count != 1 || opts.DeployStrategy != strategy.Auto) {
logger.Errorf("Count %d method %s", opts.Count, opts.DeployStrategy)
return nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin)
return workloadIDs, nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin)
}

commit, err := c.walCreateLambda(opts)
if err != nil {
return nil, logger.Err(err)
return workloadIDs, nil, logger.Err(err)
}
createChan, err := c.CreateWorkload(ctx, opts)
if err != nil {
logger.Errorf("[RunAndWait] Create workload error %+v", err)
return nil, err
return workloadIDs, nil, err
}

var (
runMsgCh = make(chan *types.AttachWorkloadMessage)
wg = &sync.WaitGroup{}
errorMessages = []*types.AttachWorkloadMessage{}
runMsgCh = make(chan *types.AttachWorkloadMessage)
wg = &sync.WaitGroup{}
)
for message := range createChan {

lambda := func(message *types.CreateWorkloadMessage) {
// should Done this waitgroup anyway
defer wg.Done()

// if workload is empty, which means error occurred when created workload
// we don't need to remove this non-existing workload
// so just send the error message and return
if message.Error != nil || message.WorkloadID == "" {
logger.Errorf("[RunAndWait] Create workload failed %+v", message.Error)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: "",
Data: []byte(fmt.Sprintf("Create workload failed %+v", message.Error)),
StdStreamType: types.EruError,
})
continue
}

lambda := func(message *types.CreateWorkloadMessage) {
defer func() {
if err := c.doRemoveWorkloadSync(context.TODO(), []string{message.WorkloadID}); err != nil {
logger.Errorf("[RunAndWait] Remove lambda workload failed %+v", err)
} else {
log.Infof("[RunAndWait] Workload %s finished and removed", utils.ShortID(message.WorkloadID))
}
wg.Done()
}()

workload, err := c.GetWorkload(ctx, message.WorkloadID)
if err != nil {
logger.Errorf("[RunAndWait] Get workload failed %+v", err)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.EruError,
})
return
}
return
}

var stdout, stderr io.ReadCloser
if stdout, stderr, err = workload.Engine.VirtualizationLogs(ctx, &enginetypes.VirtualizationLogStreamOptions{
ID: message.WorkloadID,
Follow: true,
Stdout: true,
Stderr: true,
}); err != nil {
logger.Errorf("[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.EruError,
})
return
// the workload should be removed if it exists
// no matter the workload exits successfully or not
defer func() {
if err := c.doRemoveWorkloadSync(context.TODO(), []string{message.WorkloadID}); err != nil {
logger.Errorf("[RunAndWait] Remove lambda workload failed %+v", err)
} else {
log.Infof("[RunAndWait] Workload %s finished and removed", utils.ShortID(message.WorkloadID))
}
}()

splitFunc, split := bufio.ScanLines, byte('\n')

// use attach if use stdin
if opts.OpenStdin {
var inStream io.WriteCloser
stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true)
if err != nil {
logger.Errorf("[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.EruError,
})
return
}

processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
return workload.Engine.VirtualizationResize(ctx, message.WorkloadID, height, width)
})

splitFunc, split = bufio.ScanBytes, byte(0)
// if we can't get the workload but message has workload field
// this is weird, we return the error directly and try to delete data
workload, err := c.GetWorkload(ctx, message.WorkloadID)
if err != nil {
logger.Errorf("[RunAndWait] Get workload failed %+v", err)
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.EruError,
}
return
}

// return workload id as first normal message for lambda
// for other cases, we have the workload and it works fine
// then we need to forward log, and finally delete the workload
// of course all the error messages will be sent back
var stdout, stderr io.ReadCloser
if stdout, stderr, err = workload.Engine.VirtualizationLogs(ctx, &enginetypes.VirtualizationLogStreamOptions{
ID: message.WorkloadID,
Follow: true,
Stdout: true,
Stderr: true,
}); err != nil {
logger.Errorf("[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err)
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(""),
StdStreamType: types.TypeWorkloadID,
}
for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) {
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: m.Data,
StdStreamType: m.StdStreamType,
}
Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.EruError,
}
return
}

// wait and forward exitcode
r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "")
splitFunc, split := bufio.ScanLines, byte('\n')

// use attach if use stdin
if opts.OpenStdin {
var inStream io.WriteCloser
stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true)
if err != nil {
logger.Errorf("[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
logger.Errorf("[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err)
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, err)),
Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.EruError,
})
}
return
}

if r.Code != 0 {
logger.Errorf("[RunAndWait] %s run failed %s", utils.ShortID(message.WorkloadID), r.Message)
processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
return workload.Engine.VirtualizationResize(ctx, message.WorkloadID, height, width)
})

splitFunc, split = bufio.ScanBytes, byte(0)
}

for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) {
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: m.Data,
StdStreamType: m.StdStreamType,
}
}

exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code)))
// wait and forward exitcode
r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "")
if err != nil {
logger.Errorf("[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err)
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: exitData,
StdStreamType: types.Stdout,
Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.EruError,
}
return
}

if r.Code != 0 {
logger.Errorf("[RunAndWait] %s run failed %s", utils.ShortID(message.WorkloadID), r.Message)
}

exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code)))
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: exitData,
StdStreamType: types.Stdout,
}
}

for message := range createChan {
// iterate over messages to store workload ids
workloadIDs = append(workloadIDs, message.WorkloadID)
wg.Add(1)
go lambda(message)
}
Expand All @@ -172,13 +182,10 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
logger.Errorf("[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err)
}

for _, message := range errorMessages {
runMsgCh <- message
}
log.Info("[RunAndWait] Finish run and wait for workloads")
}()

return runMsgCh, nil
return workloadIDs, runMsgCh, nil
}

func (c *Calcium) walCreateLambda(opts *types.DeployOptions) (wal.Commit, error) {
Expand Down
41 changes: 21 additions & 20 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
mstore := c.store.(*storemocks.Store)
mstore.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")).Once()

ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
_, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch)
assert.False(walCommitted)
m := <-ch
ms := []*types.AttachWorkloadMessage{}
for m := range ch {
ms = append(ms, m)
}
m := ms[0]
assert.Equal(m.WorkloadID, "")
assert.True(strings.HasPrefix(string(m.Data), "Create workload failed"))

Expand Down Expand Up @@ -105,20 +109,19 @@ func TestLambdaWithWorkloadIDReturned(t *testing.T) {
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)
engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationWaitResult{Code: 0}, nil)

ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
ids, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch)
assert.Equal(len(ids), 2)
assert.Equal(ids[0], "workloadfortonictest")

ms := []*types.AttachWorkloadMessage{}
for m := range ch {
ms = append(ms, m)
}
assert.Equal(len(ms), 8)
assert.Equal(ms[0].WorkloadID, "workloadfortonictest")
assert.Equal(ms[0].Data, []byte(""))
assert.Equal(ms[0].StdStreamType, types.TypeWorkloadID)
assert.True(strings.HasPrefix(string(ms[7].Data), exitDataPrefix))
assert.Equal(ms[7].StdStreamType, types.Stdout)
assert.Equal(len(ms), 6)
assert.True(strings.HasPrefix(string(ms[5].Data), exitDataPrefix))
assert.Equal(ms[5].StdStreamType, types.Stdout)
}

func TestLambdaWithError(t *testing.T) {
Expand All @@ -143,7 +146,7 @@ func TestLambdaWithError(t *testing.T) {
}

store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error")).Twice()
ch0, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
_, ch0, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch0)
m0 := <-ch0
Expand All @@ -154,7 +157,7 @@ func TestLambdaWithError(t *testing.T) {
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)

engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("error")).Twice()
ch1, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
_, ch1, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch1)
m1 := <-ch1
Expand All @@ -177,22 +180,20 @@ func TestLambdaWithError(t *testing.T) {
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)

engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error"))
ch2, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
ids, ch2, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch2)
assert.Equal(ids[0], "workloadfortonictest")
assert.Equal(ids[1], "workloadfortonictest")

ms := []*types.AttachWorkloadMessage{}
for m := range ch2 {
ms = append(ms, m)
}
assert.Equal(len(ms), 8)
assert.Equal(ms[0].WorkloadID, "workloadfortonictest")
assert.Equal(ms[0].Data, []byte(""))
assert.Equal(ms[0].StdStreamType, types.TypeWorkloadID)

assert.Equal(ms[7].WorkloadID, "workloadfortonictest")
assert.True(strings.HasPrefix(string(ms[7].Data), "Wait workload"))
assert.Equal(ms[7].StdStreamType, types.EruError)
assert.Equal(len(ms), 6)
assert.Equal(ms[5].WorkloadID, "workloadfortonictest")
assert.True(strings.HasPrefix(string(ms[5].Data), "Wait workload"))
assert.Equal(ms[5].StdStreamType, types.EruError)
}

func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Cluster interface {
ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorkloadOptions, inCh <-chan []byte) chan *types.AttachWorkloadMessage
ReallocResource(ctx context.Context, opts *types.ReallocOptions) error
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error)
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) ([]string, <-chan *types.AttachWorkloadMessage, error)
// finalizer
Finalizer()
}
Loading

0 comments on commit d36ef79

Please sign in to comment.