From d36ef7989c9b65f5b29f2c49e7e6638a84adf42a Mon Sep 17 00:00:00 2001 From: tonic Date: Thu, 15 Apr 2021 14:48:10 +0800 Subject: [PATCH] some refines to make RunAndWait "run and wait" --- cluster/calcium/lambda.go | 193 +++++++++++++++++---------------- cluster/calcium/lambda_test.go | 41 +++---- cluster/cluster.go | 2 +- cluster/mocks/Cluster.go | 27 +++-- rpc/rpc.go | 17 ++- rpc/rpc_test.go | 20 +--- 6 files changed, 155 insertions(+), 145 deletions(-) diff --git a/cluster/calcium/lambda.go b/cluster/calcium/lambda.go index cdab92dcb..e70887c0f 100644 --- a/cluster/calcium/lambda.go +++ b/cluster/calcium/lambda.go @@ -25,142 +25,152 @@ const ( ) // RunAndWait implement lambda -func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) { +func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) ([]string, <-chan *types.AttachWorkloadMessage, error) { + workloadIDs := []string{} + logger := log.WithField("Calcium", "RunAndWait").WithField("opts", opts) if err := opts.Validate(); err != nil { - return nil, logger.Err(err) + return workloadIDs, nil, logger.Err(err) } opts.Lambda = true // count = 1 && OpenStdin if opts.OpenStdin && (opts.Count != 1 || opts.DeployStrategy != strategy.Auto) { logger.Errorf("Count %d method %s", opts.Count, opts.DeployStrategy) - return nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin) + return workloadIDs, nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin) } commit, err := c.walCreateLambda(opts) if err != nil { - return nil, logger.Err(err) + return workloadIDs, nil, logger.Err(err) } createChan, err := c.CreateWorkload(ctx, opts) if err != nil { logger.Errorf("[RunAndWait] Create workload error %+v", err) - return nil, err + return workloadIDs, nil, err } var ( - runMsgCh = make(chan *types.AttachWorkloadMessage) - wg = &sync.WaitGroup{} - errorMessages = []*types.AttachWorkloadMessage{} + runMsgCh = make(chan *types.AttachWorkloadMessage) + wg = &sync.WaitGroup{} ) - for message := range createChan { + + lambda := func(message *types.CreateWorkloadMessage) { + // should Done this waitgroup anyway + defer wg.Done() + + // if workload is empty, which means error occurred when created workload + // we don't need to remove this non-existing workload + // so just send the error message and return if message.Error != nil || message.WorkloadID == "" { logger.Errorf("[RunAndWait] Create workload failed %+v", message.Error) - errorMessages = append(errorMessages, &types.AttachWorkloadMessage{ + runMsgCh <- &types.AttachWorkloadMessage{ WorkloadID: "", Data: []byte(fmt.Sprintf("Create workload failed %+v", message.Error)), StdStreamType: types.EruError, - }) - continue - } - - lambda := func(message *types.CreateWorkloadMessage) { - defer func() { - if err := c.doRemoveWorkloadSync(context.TODO(), []string{message.WorkloadID}); err != nil { - logger.Errorf("[RunAndWait] Remove lambda workload failed %+v", err) - } else { - log.Infof("[RunAndWait] Workload %s finished and removed", utils.ShortID(message.WorkloadID)) - } - wg.Done() - }() - - workload, err := c.GetWorkload(ctx, message.WorkloadID) - if err != nil { - logger.Errorf("[RunAndWait] Get workload failed %+v", err) - errorMessages = append(errorMessages, &types.AttachWorkloadMessage{ - WorkloadID: message.WorkloadID, - Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, err)), - StdStreamType: types.EruError, - }) - return } + return + } - var stdout, stderr io.ReadCloser - if stdout, stderr, err = workload.Engine.VirtualizationLogs(ctx, &enginetypes.VirtualizationLogStreamOptions{ - ID: message.WorkloadID, - Follow: true, - Stdout: true, - Stderr: true, - }); err != nil { - logger.Errorf("[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err) - errorMessages = append(errorMessages, &types.AttachWorkloadMessage{ - WorkloadID: message.WorkloadID, - Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, err)), - StdStreamType: types.EruError, - }) - return + // the workload should be removed if it exists + // no matter the workload exits successfully or not + defer func() { + if err := c.doRemoveWorkloadSync(context.TODO(), []string{message.WorkloadID}); err != nil { + logger.Errorf("[RunAndWait] Remove lambda workload failed %+v", err) + } else { + log.Infof("[RunAndWait] Workload %s finished and removed", utils.ShortID(message.WorkloadID)) } + }() - splitFunc, split := bufio.ScanLines, byte('\n') - - // use attach if use stdin - if opts.OpenStdin { - var inStream io.WriteCloser - stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true) - if err != nil { - logger.Errorf("[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err) - errorMessages = append(errorMessages, &types.AttachWorkloadMessage{ - WorkloadID: message.WorkloadID, - Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, err)), - StdStreamType: types.EruError, - }) - return - } - - processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error { - return workload.Engine.VirtualizationResize(ctx, message.WorkloadID, height, width) - }) - - splitFunc, split = bufio.ScanBytes, byte(0) + // if we can't get the workload but message has workload field + // this is weird, we return the error directly and try to delete data + workload, err := c.GetWorkload(ctx, message.WorkloadID) + if err != nil { + logger.Errorf("[RunAndWait] Get workload failed %+v", err) + runMsgCh <- &types.AttachWorkloadMessage{ + WorkloadID: message.WorkloadID, + Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, err)), + StdStreamType: types.EruError, } + return + } - // return workload id as first normal message for lambda + // for other cases, we have the workload and it works fine + // then we need to forward log, and finally delete the workload + // of course all the error messages will be sent back + var stdout, stderr io.ReadCloser + if stdout, stderr, err = workload.Engine.VirtualizationLogs(ctx, &enginetypes.VirtualizationLogStreamOptions{ + ID: message.WorkloadID, + Follow: true, + Stdout: true, + Stderr: true, + }); err != nil { + logger.Errorf("[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err) runMsgCh <- &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, - Data: []byte(""), - StdStreamType: types.TypeWorkloadID, - } - for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) { - runMsgCh <- &types.AttachWorkloadMessage{ - WorkloadID: message.WorkloadID, - Data: m.Data, - StdStreamType: m.StdStreamType, - } + Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, err)), + StdStreamType: types.EruError, } + return + } - // wait and forward exitcode - r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "") + splitFunc, split := bufio.ScanLines, byte('\n') + + // use attach if use stdin + if opts.OpenStdin { + var inStream io.WriteCloser + stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true) if err != nil { - logger.Errorf("[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err) - errorMessages = append(errorMessages, &types.AttachWorkloadMessage{ + logger.Errorf("[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err) + runMsgCh <- &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, - Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, err)), + Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, err)), StdStreamType: types.EruError, - }) + } return } - if r.Code != 0 { - logger.Errorf("[RunAndWait] %s run failed %s", utils.ShortID(message.WorkloadID), r.Message) + processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error { + return workload.Engine.VirtualizationResize(ctx, message.WorkloadID, height, width) + }) + + splitFunc, split = bufio.ScanBytes, byte(0) + } + + for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) { + runMsgCh <- &types.AttachWorkloadMessage{ + WorkloadID: message.WorkloadID, + Data: m.Data, + StdStreamType: m.StdStreamType, } + } - exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code))) + // wait and forward exitcode + r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "") + if err != nil { + logger.Errorf("[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err) runMsgCh <- &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, - Data: exitData, - StdStreamType: types.Stdout, + Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, err)), + StdStreamType: types.EruError, } + return + } + + if r.Code != 0 { + logger.Errorf("[RunAndWait] %s run failed %s", utils.ShortID(message.WorkloadID), r.Message) + } + + exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code))) + runMsgCh <- &types.AttachWorkloadMessage{ + WorkloadID: message.WorkloadID, + Data: exitData, + StdStreamType: types.Stdout, } + } + for message := range createChan { + // iterate over messages to store workload ids + workloadIDs = append(workloadIDs, message.WorkloadID) wg.Add(1) go lambda(message) } @@ -172,13 +182,10 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC logger.Errorf("[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err) } - for _, message := range errorMessages { - runMsgCh <- message - } log.Info("[RunAndWait] Finish run and wait for workloads") }() - return runMsgCh, nil + return workloadIDs, runMsgCh, nil } func (c *Calcium) walCreateLambda(opts *types.DeployOptions) (wal.Commit, error) { diff --git a/cluster/calcium/lambda_test.go b/cluster/calcium/lambda_test.go index 80640808a..dc6308ff3 100644 --- a/cluster/calcium/lambda_test.go +++ b/cluster/calcium/lambda_test.go @@ -53,11 +53,15 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) { mstore := c.store.(*storemocks.Store) mstore.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")).Once() - ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) + _, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) assert.NoError(err) assert.NotNil(ch) assert.False(walCommitted) - m := <-ch + ms := []*types.AttachWorkloadMessage{} + for m := range ch { + ms = append(ms, m) + } + m := ms[0] assert.Equal(m.WorkloadID, "") assert.True(strings.HasPrefix(string(m.Data), "Create workload failed")) @@ -105,20 +109,19 @@ func TestLambdaWithWorkloadIDReturned(t *testing.T) { engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil) engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationWaitResult{Code: 0}, nil) - ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) + ids, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) assert.NoError(err) assert.NotNil(ch) + assert.Equal(len(ids), 2) + assert.Equal(ids[0], "workloadfortonictest") ms := []*types.AttachWorkloadMessage{} for m := range ch { ms = append(ms, m) } - assert.Equal(len(ms), 8) - assert.Equal(ms[0].WorkloadID, "workloadfortonictest") - assert.Equal(ms[0].Data, []byte("")) - assert.Equal(ms[0].StdStreamType, types.TypeWorkloadID) - assert.True(strings.HasPrefix(string(ms[7].Data), exitDataPrefix)) - assert.Equal(ms[7].StdStreamType, types.Stdout) + assert.Equal(len(ms), 6) + assert.True(strings.HasPrefix(string(ms[5].Data), exitDataPrefix)) + assert.Equal(ms[5].StdStreamType, types.Stdout) } func TestLambdaWithError(t *testing.T) { @@ -143,7 +146,7 @@ func TestLambdaWithError(t *testing.T) { } store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error")).Twice() - ch0, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) + _, ch0, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) assert.NoError(err) assert.NotNil(ch0) m0 := <-ch0 @@ -154,7 +157,7 @@ func TestLambdaWithError(t *testing.T) { store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil) engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("error")).Twice() - ch1, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) + _, ch1, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) assert.NoError(err) assert.NotNil(ch1) m1 := <-ch1 @@ -177,22 +180,20 @@ func TestLambdaWithError(t *testing.T) { engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil) engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error")) - ch2, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) + ids, ch2, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) assert.NoError(err) assert.NotNil(ch2) + assert.Equal(ids[0], "workloadfortonictest") + assert.Equal(ids[1], "workloadfortonictest") ms := []*types.AttachWorkloadMessage{} for m := range ch2 { ms = append(ms, m) } - assert.Equal(len(ms), 8) - assert.Equal(ms[0].WorkloadID, "workloadfortonictest") - assert.Equal(ms[0].Data, []byte("")) - assert.Equal(ms[0].StdStreamType, types.TypeWorkloadID) - - assert.Equal(ms[7].WorkloadID, "workloadfortonictest") - assert.True(strings.HasPrefix(string(ms[7].Data), "Wait workload")) - assert.Equal(ms[7].StdStreamType, types.EruError) + assert.Equal(len(ms), 6) + assert.Equal(ms[5].WorkloadID, "workloadfortonictest") + assert.True(strings.HasPrefix(string(ms[5].Data), "Wait workload")) + assert.Equal(ms[5].StdStreamType, types.EruError) } func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) { diff --git a/cluster/cluster.go b/cluster/cluster.go index 70c7b4918..a5263201d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -82,7 +82,7 @@ type Cluster interface { ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorkloadOptions, inCh <-chan []byte) chan *types.AttachWorkloadMessage ReallocResource(ctx context.Context, opts *types.ReallocOptions) error LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error) - RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) + RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) ([]string, <-chan *types.AttachWorkloadMessage, error) // finalizer Finalizer() } diff --git a/cluster/mocks/Cluster.go b/cluster/mocks/Cluster.go index df5ea4ae2..71c8375bc 100644 --- a/cluster/mocks/Cluster.go +++ b/cluster/mocks/Cluster.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.5.1. DO NOT EDIT. +// Code generated by mockery 2.7.4. DO NOT EDIT. package mocks @@ -731,26 +731,35 @@ func (_m *Cluster) ReplaceWorkload(ctx context.Context, opts *types.ReplaceOptio } // RunAndWait provides a mock function with given fields: ctx, opts, inCh -func (_m *Cluster) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) { +func (_m *Cluster) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) ([]string, <-chan *types.AttachWorkloadMessage, error) { ret := _m.Called(ctx, opts, inCh) - var r0 <-chan *types.AttachWorkloadMessage - if rf, ok := ret.Get(0).(func(context.Context, *types.DeployOptions, <-chan []byte) <-chan *types.AttachWorkloadMessage); ok { + var r0 []string + if rf, ok := ret.Get(0).(func(context.Context, *types.DeployOptions, <-chan []byte) []string); ok { r0 = rf(ctx, opts, inCh) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan *types.AttachWorkloadMessage) + r0 = ret.Get(0).([]string) } } - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *types.DeployOptions, <-chan []byte) error); ok { + var r1 <-chan *types.AttachWorkloadMessage + if rf, ok := ret.Get(1).(func(context.Context, *types.DeployOptions, <-chan []byte) <-chan *types.AttachWorkloadMessage); ok { r1 = rf(ctx, opts, inCh) } else { - r1 = ret.Error(1) + if ret.Get(1) != nil { + r1 = ret.Get(1).(<-chan *types.AttachWorkloadMessage) + } } - return r0, r1 + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, *types.DeployOptions, <-chan []byte) error); ok { + r2 = rf(ctx, opts, inCh) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } // Send provides a mock function with given fields: ctx, opts diff --git a/rpc/rpc.go b/rpc/rpc.go index 4cd3daa06..51342d210 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -845,18 +845,23 @@ func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error { } }() - ch, err := v.cluster.RunAndWait(ctx, deployOpts, inCh) + ids, ch, err := v.cluster.RunAndWait(ctx, deployOpts, inCh) if err != nil { return err } - // must send the first message to client before return, otherwise the Stream will be closed - // client will get workloadID from the first message - m := <-ch - if err = stream.Send(toRPCAttachWorkloadMessage(m)); err != nil { - v.logUnsentMessages("RunAndWait: first message send failed", err, m) + // send workload ids to client first + for _, id := range ids { + if err = stream.Send(&pb.AttachWorkloadMessage{ + WorkloadId: id, + Data: []byte(""), + StdStreamType: pb.StdStreamType_TYPEWORKLOADID, + }); err != nil { + v.logUnsentMessages("RunAndWait: first message send failed", err, id) + } } + // then deal with the rest messages runAndWait := func(f func(<-chan *types.AttachWorkloadMessage)) { defer v.taskDone("RunAndWait", true) f(ch) diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 0730bdadb..382ecf29f 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -102,13 +102,7 @@ func TestRunAndWaitSync(t *testing.T) { runAndWait := func(_ context.Context, _ *types.DeployOptions, _ <-chan []byte) <-chan *types.AttachWorkloadMessage { ch := make(chan *types.AttachWorkloadMessage) go func() { - // first message to report workload id - ch <- &types.AttachWorkloadMessage{ - WorkloadID: "workloadidfortonic", - Data: []byte(""), - StdStreamType: types.TypeWorkloadID, - } - // second message to report output of process + // message to report output of process ch <- &types.AttachWorkloadMessage{ WorkloadID: "workloadidfortonic", Data: []byte("network not reachable"), @@ -119,7 +113,7 @@ func TestRunAndWaitSync(t *testing.T) { return ch } cluster := v.cluster.(*clustermock.Cluster) - cluster.On("RunAndWait", mock.Anything, mock.Anything, mock.Anything).Return(runAndWait, nil) + cluster.On("RunAndWait", mock.Anything, mock.Anything, mock.Anything).Return([]string{"workloadidfortonic"}, runAndWait, nil) err := v.RunAndWait(stream) assert.NoError(t, err) @@ -167,13 +161,7 @@ func TestRunAndWaitAsync(t *testing.T) { runAndWait := func(_ context.Context, _ *types.DeployOptions, _ <-chan []byte) <-chan *types.AttachWorkloadMessage { ch := make(chan *types.AttachWorkloadMessage) go func() { - // first message to report workload id - ch <- &types.AttachWorkloadMessage{ - WorkloadID: "workloadidfortonic", - Data: []byte(""), - StdStreamType: types.TypeWorkloadID, - } - // second message to report output of process + // message to report output of process ch <- &types.AttachWorkloadMessage{ WorkloadID: "workloadidfortonic", Data: []byte("network not reachable"), @@ -184,7 +172,7 @@ func TestRunAndWaitAsync(t *testing.T) { return ch } cluster := v.cluster.(*clustermock.Cluster) - cluster.On("RunAndWait", mock.Anything, mock.Anything, mock.Anything).Return(runAndWait, nil) + cluster.On("RunAndWait", mock.Anything, mock.Anything, mock.Anything).Return([]string{"workloadidfortonic"}, runAndWait, nil) err := v.RunAndWait(stream) assert.NoError(t, err)