diff --git a/cluster/calcium/lambda.go b/cluster/calcium/lambda.go index b5c7a7c99..6af2ed9b3 100644 --- a/cluster/calcium/lambda.go +++ b/cluster/calcium/lambda.go @@ -15,13 +15,11 @@ import ( "github.com/projecteru2/core/utils" "github.com/projecteru2/core/wal" - "github.com/google/uuid" "github.com/pkg/errors" ) const ( exitDataPrefix = "[exitcode] " - labelLambdaID = "LambdaID" ) // RunAndWait implement lambda @@ -39,10 +37,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC return workloadIDs, nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin) } - commit, err := c.walCreateLambda(opts) - if err != nil { - return workloadIDs, nil, logger.Err(ctx, err) - } createChan, err := c.CreateWorkload(ctx, opts) if err != nil { logger.Errorf(ctx, "[RunAndWait] Create workload error %+v", err) @@ -54,23 +48,40 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC wg = &sync.WaitGroup{} ) - lambda := func(message *types.CreateWorkloadMessage) { + lambda := func(message *types.CreateWorkloadMessage) (attachMessage *types.AttachWorkloadMessage) { // should Done this waitgroup anyway defer wg.Done() + defer func() { + runMsgCh <- attachMessage + }() + // if workload is empty, which means error occurred when created workload // we don't need to remove this non-existing workload // so just send the error message and return if message.Error != nil || message.WorkloadID == "" { logger.Errorf(ctx, "[RunAndWait] Create workload failed %+v", message.Error) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: "", Data: []byte(fmt.Sprintf("Create workload failed %+v", errors.Unwrap(message.Error))), StdStreamType: types.EruError, } - return } + commit, err := c.walCreateLambda(message) + if err != nil { + return &types.AttachWorkloadMessage{ + WorkloadID: message.WorkloadID, + Data: []byte(fmt.Sprintf("Create wal failed: %s, %+v", message.WorkloadID, logger.Err(ctx, err))), + StdStreamType: types.EruError, + } + } + defer func() { + if err := commit(); err != nil { + logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %s, %v", eventCreateLambda, message.WorkloadID, err) + } + }() + // the workload should be removed if it exists // no matter the workload exits successfully or not defer func() { @@ -86,12 +97,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC workload, err := c.GetWorkload(ctx, message.WorkloadID) if err != nil { logger.Errorf(ctx, "[RunAndWait] Get workload failed %+v", err) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))), StdStreamType: types.EruError, } - return } // for other cases, we have the workload and it works fine @@ -105,12 +115,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC Stderr: true, }); err != nil { logger.Errorf(ctx, "[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))), StdStreamType: types.EruError, } - return } splitFunc, split := bufio.ScanLines, byte('\n') @@ -121,12 +130,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true) if err != nil { logger.Errorf(ctx, "[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))), StdStreamType: types.EruError, } - return } processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error { @@ -148,12 +156,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "") if err != nil { logger.Errorf(ctx, "[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))), StdStreamType: types.EruError, } - return } if r.Code != 0 { @@ -161,7 +168,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC } exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code))) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: exitData, StdStreamType: types.Stdout, @@ -182,9 +189,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC utils.SentryGo(func() { defer close(runMsgCh) wg.Wait() - if err := commit(); err != nil { - logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err) - } log.Info("[RunAndWait] Finish run and wait for workloads") }) @@ -192,19 +196,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC return workloadIDs, runMsgCh, nil } -func (c *Calcium) walCreateLambda(opts *types.DeployOptions) (wal.Commit, error) { - uid, err := uuid.NewRandom() - if err != nil { - return nil, errors.WithStack(err) - } - - lambdaID := uid.String() - - if opts.Labels != nil { - opts.Labels[labelLambdaID] = lambdaID - } else { - opts.Labels = map[string]string{labelLambdaID: lambdaID} - } - +func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) { return c.wal.logCreateLambda(opts) } diff --git a/cluster/calcium/lambda_test.go b/cluster/calcium/lambda_test.go index de36d4267..322240899 100644 --- a/cluster/calcium/lambda_test.go +++ b/cluster/calcium/lambda_test.go @@ -17,7 +17,6 @@ import ( storemocks "github.com/projecteru2/core/store/mocks" "github.com/projecteru2/core/strategy" "github.com/projecteru2/core/types" - "github.com/projecteru2/core/wal" walmocks "github.com/projecteru2/core/wal/mocks" "github.com/stretchr/testify/assert" @@ -31,12 +30,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) { mwal := c.wal.WAL.(*walmocks.WAL) defer mwal.AssertExpectations(t) - var walCommitted bool - commit := wal.Commit(func() error { - walCommitted = true - return nil - }) - mwal.On("Log", eventCreateLambda, mock.Anything).Return(commit, nil).Once() opts := &types.DeployOptions{ Name: "zc:name", @@ -56,7 +49,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) { _, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) assert.NoError(err) assert.NotNil(ch) - assert.False(walCommitted) ms := []*types.AttachWorkloadMessage{} for m := range ch { ms = append(ms, m) @@ -65,10 +57,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) { assert.Equal(m.WorkloadID, "") assert.True(strings.HasPrefix(string(m.Data), "Create workload failed")) - lambdaID, exists := opts.Labels[labelLambdaID] - assert.True(exists) - assert.True(len(lambdaID) > 1) - assert.True(walCommitted) assert.Equal(m.StdStreamType, types.EruError) } diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index e7c5b4631..736b14f25 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -9,7 +9,6 @@ import ( "github.com/projecteru2/core/utils" "github.com/pkg/errors" - "github.com/sanity-io/litter" ) // AddNode adds a node @@ -127,13 +126,13 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ } var n *types.Node return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error { - litter.Dump(opts) + logger.Infof(ctx, "set node") opts.Normalize(node) n = node n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass) if n.IsDown() { - log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename) + logger.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename) } if opts.WorkloadsDown { c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename) diff --git a/cluster/calcium/wal.go b/cluster/calcium/wal.go index aa7a9ce2d..f038eeedc 100644 --- a/cluster/calcium/wal.go +++ b/cluster/calcium/wal.go @@ -52,12 +52,8 @@ func (w *WAL) logCreateWorkload(workloadID, nodename string) (wal.Commit, error) }) } -func (w *WAL) logCreateLambda(opts *types.DeployOptions) (wal.Commit, error) { - return w.Log(eventCreateLambda, &types.ListWorkloadsOptions{ - Appname: opts.Name, - Entrypoint: opts.Entrypoint.Name, - Labels: map[string]string{labelLambdaID: opts.Labels[labelLambdaID]}, - }) +func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) { + return w.Log(eventCreateLambda, opts.WorkloadID) } // CreateWorkloadHandler indicates event handler for creating workload. @@ -179,64 +175,52 @@ func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) // Encode . func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) { - opts, ok := raw.(*types.ListWorkloadsOptions) + workloadID, ok := raw.(string) if !ok { return nil, types.NewDetailedErr(types.ErrInvalidType, raw) } - return json.Marshal(opts) + return []byte(workloadID), nil } // Decode . func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) { - opts := &types.ListWorkloadsOptions{} - err := json.Unmarshal(bs, opts) - return opts, err + return string(bs), nil } // Handle . func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error { - opts, ok := raw.(*types.ListWorkloadsOptions) + workloadID, ok := raw.(string) if !ok { return types.NewDetailedErr(types.ErrInvalidType, raw) } - workloadIDs, err := h.getWorkloadIDs(ctx, opts) - if err != nil { - log.Errorf(nil, "[CreateLambdaHandler.Handle] Get workloads %s/%s/%v failed: %v", //nolint - opts.Appname, opts.Entrypoint, opts.Labels, err) - return err - } - - ctx, cancel := getReplayContext(ctx) - defer cancel() + logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID) + go func() { + logger.Infof(ctx, "recovery start") + workload, err := h.calcium.GetWorkload(ctx, workloadID) + if err != nil { + logger.Errorf(ctx, "Get workload failed: %v", err) + return + } - if err := h.calcium.doRemoveWorkloadSync(ctx, workloadIDs); err != nil { - log.Errorf(ctx, "[CreateLambdaHandler.Handle] Remove lambda %v failed: %v", opts, err) - return err - } + r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "") + if err != nil { + logger.Errorf(ctx, "Wait failed: %+v", err) + return + } + if r.Code != 0 { + logger.Errorf(ctx, "Run failed: %s", r.Message) + } - log.Infof(ctx, "[CreateLambdaHandler.Handle] Lambda %v removed", opts) + if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil { + logger.Errorf(ctx, "Remove failed: %+v", err) + } + logger.Infof(ctx, "waited and removed") + }() return nil } -func (h *CreateLambdaHandler) getWorkloadIDs(ctx context.Context, opts *types.ListWorkloadsOptions) ([]string, error) { - ctx, cancel := getReplayContext(ctx) - defer cancel() - - workloads, err := h.calcium.ListWorkloads(ctx, opts) - if err != nil { - return nil, err - } - - workloadIDs := make([]string, len(workloads)) - for i, wrk := range workloads { - workloadIDs[i] = wrk.ID - } - - return workloadIDs, nil -} - func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) { return context.WithTimeout(ctx, time.Second*32) } diff --git a/cluster/calcium/wal_test.go b/cluster/calcium/wal_test.go index dca2a14f2..bda4244e0 100644 --- a/cluster/calcium/wal_test.go +++ b/cluster/calcium/wal_test.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "testing" + "time" enginemocks "github.com/projecteru2/core/engine/mocks" + enginetypes "github.com/projecteru2/core/engine/types" lockmocks "github.com/projecteru2/core/lock/mocks" storemocks "github.com/projecteru2/core/store/mocks" "github.com/projecteru2/core/types" @@ -131,12 +133,7 @@ func TestHandleCreateLambda(t *testing.T) { require.NoError(t, err) c.wal = wal - deployOpts := &types.DeployOptions{ - Name: "appname", - Entrypoint: &types.Entrypoint{Name: "entry"}, - Labels: map[string]string{labelLambdaID: "lambda"}, - } - _, err = c.wal.logCreateLambda(deployOpts) + _, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"}) require.NoError(t, err) node := &types.Node{ @@ -150,35 +147,34 @@ func TestHandleCreateLambda(t *testing.T) { } store := c.store.(*storemocks.Store) - defer store.AssertExpectations(t) - store.On("ListWorkloads", mock.Anything, deployOpts.Name, deployOpts.Entrypoint.Name, "", int64(0), deployOpts.Labels). - Return(nil, fmt.Errorf("err")). - Once() - store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD) + store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() c.wal.Recover(context.TODO()) + time.Sleep(500 * time.Millisecond) + store.AssertExpectations(t) - store.On("ListWorkloads", mock.Anything, deployOpts.Name, deployOpts.Entrypoint.Name, "", int64(0), deployOpts.Labels). - Return([]*types.Workload{wrk}, nil). + _, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"}) + require.NoError(t, err) + store.On("GetWorkload", mock.Anything, mock.Anything). + Return(wrk, nil). Once() - store.On("GetWorkloads", mock.Anything, []string{wrk.ID}). - Return([]*types.Workload{wrk}, nil). - Twice() store.On("GetNode", mock.Anything, wrk.Nodename). Return(node, nil) - eng := wrk.Engine.(*enginemocks.API) - defer eng.AssertExpectations(t) + eng.On("VirtualizationWait", mock.Anything, wrk.ID, "").Return(&enginetypes.VirtualizationWaitResult{Code: 0}, nil).Once() eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true). Return(nil). Once() - + eng.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return(nil, nil).Once() + store.On("GetWorkloads", mock.Anything, []string{wrk.ID}). + Return([]*types.Workload{wrk}, nil). + Twice() store.On("RemoveWorkload", mock.Anything, wrk). Return(nil). Once() store.On("UpdateNodeResource", mock.Anything, node, mock.Anything, mock.Anything). Return(nil). Once() - + store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once() lock := &lockmocks.DistributedLock{} lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) @@ -187,4 +183,7 @@ func TestHandleCreateLambda(t *testing.T) { c.wal.Recover(context.TODO()) // Recovered nothing. c.wal.Recover(context.TODO()) + time.Sleep(500 * time.Millisecond) + store.AssertExpectations(t) + eng.AssertExpectations(t) } diff --git a/log/log.go b/log/log.go index 7ef4550f5..0c2d55643 100644 --- a/log/log.go +++ b/log/log.go @@ -60,6 +60,12 @@ func (f Fields) Err(ctx context.Context, err error) error { return err } +// Infof . +func (f Fields) Infof(ctx context.Context, format string, args ...interface{}) { + format = getTracingInfo(ctx) + format + f.e.Infof(format, args...) +} + // WithField add kv into log entry func WithField(key string, value interface{}) Fields { return Fields{