Skip to content

Commit

Permalink
WAL can wait and recycle lambdas (projecteru2#531)
Browse files Browse the repository at this point in the history
* wal can wait and recycle lambdas

* fix test and lint
  • Loading branch information
jschwinger233 authored and DuodenumL committed Jan 21, 2022
1 parent b38b88f commit a00c69b
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 113 deletions.
61 changes: 26 additions & 35 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"

"github.com/google/uuid"
"github.com/pkg/errors"
)

const (
exitDataPrefix = "[exitcode] "
labelLambdaID = "LambdaID"
)

// RunAndWait implement lambda
Expand All @@ -39,10 +37,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
return workloadIDs, nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin)
}

commit, err := c.walCreateLambda(opts)
if err != nil {
return workloadIDs, nil, logger.Err(ctx, err)
}
createChan, err := c.CreateWorkload(ctx, opts)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Create workload error %+v", err)
Expand All @@ -54,23 +48,40 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
wg = &sync.WaitGroup{}
)

lambda := func(message *types.CreateWorkloadMessage) {
lambda := func(message *types.CreateWorkloadMessage) (attachMessage *types.AttachWorkloadMessage) {
// should Done this waitgroup anyway
defer wg.Done()

defer func() {
runMsgCh <- attachMessage
}()

// if workload is empty, which means error occurred when created workload
// we don't need to remove this non-existing workload
// so just send the error message and return
if message.Error != nil || message.WorkloadID == "" {
logger.Errorf(ctx, "[RunAndWait] Create workload failed %+v", message.Error)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: "",
Data: []byte(fmt.Sprintf("Create workload failed %+v", errors.Unwrap(message.Error))),
StdStreamType: types.EruError,
}
return
}

commit, err := c.walCreateLambda(message)
if err != nil {
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Create wal failed: %s, %+v", message.WorkloadID, logger.Err(ctx, err))),
StdStreamType: types.EruError,
}
}
defer func() {
if err := commit(); err != nil {
logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %s, %v", eventCreateLambda, message.WorkloadID, err)
}
}()

// the workload should be removed if it exists
// no matter the workload exits successfully or not
defer func() {
Expand All @@ -86,12 +97,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
workload, err := c.GetWorkload(ctx, message.WorkloadID)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Get workload failed %+v", err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

// for other cases, we have the workload and it works fine
Expand All @@ -105,12 +115,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
Stderr: true,
}); err != nil {
logger.Errorf(ctx, "[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

splitFunc, split := bufio.ScanLines, byte('\n')
Expand All @@ -121,12 +130,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
Expand All @@ -148,20 +156,19 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "")
if err != nil {
logger.Errorf(ctx, "[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

if r.Code != 0 {
logger.Errorf(ctx, "[RunAndWait] %s run failed %s", utils.ShortID(message.WorkloadID), r.Message)
}

exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code)))
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: exitData,
StdStreamType: types.Stdout,
Expand All @@ -182,29 +189,13 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
utils.SentryGo(func() {
defer close(runMsgCh)
wg.Wait()
if err := commit(); err != nil {
logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err)
}

log.Info("[RunAndWait] Finish run and wait for workloads")
})

return workloadIDs, runMsgCh, nil
}

func (c *Calcium) walCreateLambda(opts *types.DeployOptions) (wal.Commit, error) {
uid, err := uuid.NewRandom()
if err != nil {
return nil, errors.WithStack(err)
}

lambdaID := uid.String()

if opts.Labels != nil {
opts.Labels[labelLambdaID] = lambdaID
} else {
opts.Labels = map[string]string{labelLambdaID: lambdaID}
}

func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return c.wal.logCreateLambda(opts)
}
12 changes: 0 additions & 12 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"
walmocks "github.com/projecteru2/core/wal/mocks"

"github.com/stretchr/testify/assert"
Expand All @@ -31,12 +30,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {

mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
var walCommitted bool
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", eventCreateLambda, mock.Anything).Return(commit, nil).Once()

opts := &types.DeployOptions{
Name: "zc:name",
Expand All @@ -56,7 +49,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
_, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch)
assert.False(walCommitted)
ms := []*types.AttachWorkloadMessage{}
for m := range ch {
ms = append(ms, m)
Expand All @@ -65,10 +57,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
assert.Equal(m.WorkloadID, "")
assert.True(strings.HasPrefix(string(m.Data), "Create workload failed"))

lambdaID, exists := opts.Labels[labelLambdaID]
assert.True(exists)
assert.True(len(lambdaID) > 1)
assert.True(walCommitted)
assert.Equal(m.StdStreamType, types.EruError)
}

Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/projecteru2/core/utils"

"github.com/pkg/errors"
"github.com/sanity-io/litter"
)

// AddNode adds a node
Expand Down Expand Up @@ -127,13 +126,13 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}
var n *types.Node
return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
litter.Dump(opts)
logger.Infof(ctx, "set node")
opts.Normalize(node)
n = node

n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
logger.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if opts.WorkloadsDown {
c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename)
Expand Down
70 changes: 27 additions & 43 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,8 @@ func (w *WAL) logCreateWorkload(workloadID, nodename string) (wal.Commit, error)
})
}

func (w *WAL) logCreateLambda(opts *types.DeployOptions) (wal.Commit, error) {
return w.Log(eventCreateLambda, &types.ListWorkloadsOptions{
Appname: opts.Name,
Entrypoint: opts.Entrypoint.Name,
Labels: map[string]string{labelLambdaID: opts.Labels[labelLambdaID]},
})
func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return w.Log(eventCreateLambda, opts.WorkloadID)
}

// CreateWorkloadHandler indicates event handler for creating workload.
Expand Down Expand Up @@ -179,64 +175,52 @@ func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error)

// Encode .
func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) {
opts, ok := raw.(*types.ListWorkloadsOptions)
workloadID, ok := raw.(string)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return json.Marshal(opts)
return []byte(workloadID), nil
}

// Decode .
func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
opts := &types.ListWorkloadsOptions{}
err := json.Unmarshal(bs, opts)
return opts, err
return string(bs), nil
}

// Handle .
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
opts, ok := raw.(*types.ListWorkloadsOptions)
workloadID, ok := raw.(string)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}

workloadIDs, err := h.getWorkloadIDs(ctx, opts)
if err != nil {
log.Errorf(nil, "[CreateLambdaHandler.Handle] Get workloads %s/%s/%v failed: %v", //nolint
opts.Appname, opts.Entrypoint, opts.Labels, err)
return err
}

ctx, cancel := getReplayContext(ctx)
defer cancel()
logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID)
go func() {
logger.Infof(ctx, "recovery start")
workload, err := h.calcium.GetWorkload(ctx, workloadID)
if err != nil {
logger.Errorf(ctx, "Get workload failed: %v", err)
return
}

if err := h.calcium.doRemoveWorkloadSync(ctx, workloadIDs); err != nil {
log.Errorf(ctx, "[CreateLambdaHandler.Handle] Remove lambda %v failed: %v", opts, err)
return err
}
r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "")
if err != nil {
logger.Errorf(ctx, "Wait failed: %+v", err)
return
}
if r.Code != 0 {
logger.Errorf(ctx, "Run failed: %s", r.Message)
}

log.Infof(ctx, "[CreateLambdaHandler.Handle] Lambda %v removed", opts)
if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil {
logger.Errorf(ctx, "Remove failed: %+v", err)
}
logger.Infof(ctx, "waited and removed")
}()

return nil
}

func (h *CreateLambdaHandler) getWorkloadIDs(ctx context.Context, opts *types.ListWorkloadsOptions) ([]string, error) {
ctx, cancel := getReplayContext(ctx)
defer cancel()

workloads, err := h.calcium.ListWorkloads(ctx, opts)
if err != nil {
return nil, err
}

workloadIDs := make([]string, len(workloads))
for i, wrk := range workloads {
workloadIDs[i] = wrk.ID
}

return workloadIDs, nil
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32)
}
Loading

0 comments on commit a00c69b

Please sign in to comment.