Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WAL can wait and recycle lambdas #531

Merged
merged 2 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 26 additions & 35 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"

"github.com/google/uuid"
"github.com/pkg/errors"
)

const (
exitDataPrefix = "[exitcode] "
labelLambdaID = "LambdaID"
)

// RunAndWait implement lambda
Expand All @@ -39,10 +37,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
return workloadIDs, nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin)
}

commit, err := c.walCreateLambda(opts)
if err != nil {
return workloadIDs, nil, logger.Err(ctx, err)
}
createChan, err := c.CreateWorkload(ctx, opts)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Create workload error %+v", err)
Expand All @@ -54,23 +48,40 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
wg = &sync.WaitGroup{}
)

lambda := func(message *types.CreateWorkloadMessage) {
lambda := func(message *types.CreateWorkloadMessage) (attachMessage *types.AttachWorkloadMessage) {
// should Done this waitgroup anyway
defer wg.Done()

defer func() {
runMsgCh <- attachMessage
}()

// if workload is empty, which means error occurred when created workload
// we don't need to remove this non-existing workload
// so just send the error message and return
if message.Error != nil || message.WorkloadID == "" {
logger.Errorf(ctx, "[RunAndWait] Create workload failed %+v", message.Error)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: "",
Data: []byte(fmt.Sprintf("Create workload failed %+v", errors.Unwrap(message.Error))),
StdStreamType: types.EruError,
}
return
}

commit, err := c.walCreateLambda(message)
if err != nil {
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Create wal failed: %s, %+v", message.WorkloadID, logger.Err(ctx, err))),
StdStreamType: types.EruError,
}
}
defer func() {
if err := commit(); err != nil {
logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %s, %v", eventCreateLambda, message.WorkloadID, err)
}
}()

// the workload should be removed if it exists
// no matter the workload exits successfully or not
defer func() {
Expand All @@ -86,12 +97,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
workload, err := c.GetWorkload(ctx, message.WorkloadID)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Get workload failed %+v", err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

// for other cases, we have the workload and it works fine
Expand All @@ -105,12 +115,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
Stderr: true,
}); err != nil {
logger.Errorf(ctx, "[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

splitFunc, split := bufio.ScanLines, byte('\n')
Expand All @@ -121,12 +130,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
Expand All @@ -148,20 +156,19 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "")
if err != nil {
logger.Errorf(ctx, "[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

if r.Code != 0 {
logger.Errorf(ctx, "[RunAndWait] %s run failed %s", utils.ShortID(message.WorkloadID), r.Message)
}

exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code)))
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: exitData,
StdStreamType: types.Stdout,
Expand All @@ -182,29 +189,13 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
utils.SentryGo(func() {
defer close(runMsgCh)
wg.Wait()
if err := commit(); err != nil {
logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err)
}

log.Info("[RunAndWait] Finish run and wait for workloads")
})

return workloadIDs, runMsgCh, nil
}

func (c *Calcium) walCreateLambda(opts *types.DeployOptions) (wal.Commit, error) {
uid, err := uuid.NewRandom()
if err != nil {
return nil, errors.WithStack(err)
}

lambdaID := uid.String()

if opts.Labels != nil {
opts.Labels[labelLambdaID] = lambdaID
} else {
opts.Labels = map[string]string{labelLambdaID: lambdaID}
}

func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return c.wal.logCreateLambda(opts)
}
12 changes: 0 additions & 12 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"
walmocks "github.com/projecteru2/core/wal/mocks"

"github.com/stretchr/testify/assert"
Expand All @@ -31,12 +30,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {

mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
var walCommitted bool
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", eventCreateLambda, mock.Anything).Return(commit, nil).Once()

opts := &types.DeployOptions{
Name: "zc:name",
Expand All @@ -56,7 +49,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
_, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch)
assert.False(walCommitted)
ms := []*types.AttachWorkloadMessage{}
for m := range ch {
ms = append(ms, m)
Expand All @@ -65,10 +57,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
assert.Equal(m.WorkloadID, "")
assert.True(strings.HasPrefix(string(m.Data), "Create workload failed"))

lambdaID, exists := opts.Labels[labelLambdaID]
assert.True(exists)
assert.True(len(lambdaID) > 1)
assert.True(walCommitted)
assert.Equal(m.StdStreamType, types.EruError)
}

Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/projecteru2/core/utils"

"github.com/pkg/errors"
"github.com/sanity-io/litter"
)

// AddNode adds a node
Expand Down Expand Up @@ -127,13 +126,13 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}
var n *types.Node
return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
litter.Dump(opts)
logger.Infof(ctx, "set node")
opts.Normalize(node)
n = node

n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
logger.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if opts.WorkloadsDown {
c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename)
Expand Down
70 changes: 27 additions & 43 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,8 @@ func (w *WAL) logCreateWorkload(workloadID, nodename string) (wal.Commit, error)
})
}

func (w *WAL) logCreateLambda(opts *types.DeployOptions) (wal.Commit, error) {
return w.Log(eventCreateLambda, &types.ListWorkloadsOptions{
Appname: opts.Name,
Entrypoint: opts.Entrypoint.Name,
Labels: map[string]string{labelLambdaID: opts.Labels[labelLambdaID]},
})
func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return w.Log(eventCreateLambda, opts.WorkloadID)
}

// CreateWorkloadHandler indicates event handler for creating workload.
Expand Down Expand Up @@ -179,64 +175,52 @@ func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error)

// Encode .
func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) {
opts, ok := raw.(*types.ListWorkloadsOptions)
workloadID, ok := raw.(string)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return json.Marshal(opts)
return []byte(workloadID), nil
}

// Decode .
func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
opts := &types.ListWorkloadsOptions{}
err := json.Unmarshal(bs, opts)
return opts, err
return string(bs), nil
}

// Handle .
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
opts, ok := raw.(*types.ListWorkloadsOptions)
workloadID, ok := raw.(string)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}

workloadIDs, err := h.getWorkloadIDs(ctx, opts)
if err != nil {
log.Errorf(nil, "[CreateLambdaHandler.Handle] Get workloads %s/%s/%v failed: %v", //nolint
opts.Appname, opts.Entrypoint, opts.Labels, err)
return err
}

ctx, cancel := getReplayContext(ctx)
defer cancel()
logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID)
go func() {
logger.Infof(ctx, "recovery start")
workload, err := h.calcium.GetWorkload(ctx, workloadID)
if err != nil {
logger.Errorf(ctx, "Get workload failed: %v", err)
return
}

if err := h.calcium.doRemoveWorkloadSync(ctx, workloadIDs); err != nil {
log.Errorf(ctx, "[CreateLambdaHandler.Handle] Remove lambda %v failed: %v", opts, err)
return err
}
r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "")
if err != nil {
logger.Errorf(ctx, "Wait failed: %+v", err)
return
}
if r.Code != 0 {
logger.Errorf(ctx, "Run failed: %s", r.Message)
}

log.Infof(ctx, "[CreateLambdaHandler.Handle] Lambda %v removed", opts)
if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil {
logger.Errorf(ctx, "Remove failed: %+v", err)
}
logger.Infof(ctx, "waited and removed")
}()

return nil
}

func (h *CreateLambdaHandler) getWorkloadIDs(ctx context.Context, opts *types.ListWorkloadsOptions) ([]string, error) {
ctx, cancel := getReplayContext(ctx)
defer cancel()

workloads, err := h.calcium.ListWorkloads(ctx, opts)
if err != nil {
return nil, err
}

workloadIDs := make([]string, len(workloads))
for i, wrk := range workloads {
workloadIDs[i] = wrk.ID
}

return workloadIDs, nil
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32)
}
Loading