Skip to content

Commit

Permalink
always return errors to client
Browse files Browse the repository at this point in the history
  • Loading branch information
tonicmuroq committed Apr 15, 2021
1 parent 2350fdc commit 4464cb0
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 34 deletions.
45 changes: 41 additions & 4 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"bufio"
"context"
"fmt"
"io"
"strconv"
"sync"
Expand Down Expand Up @@ -46,11 +47,19 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
return nil, err
}

runMsgCh := make(chan *types.AttachWorkloadMessage)
wg := &sync.WaitGroup{}
var (
runMsgCh = make(chan *types.AttachWorkloadMessage)
wg = &sync.WaitGroup{}
errorMessages = []*types.AttachWorkloadMessage{}
)
for message := range createChan {
if message.Error != nil || message.WorkloadID == "" {
logger.Errorf("[RunAndWait] Create workload failed %+v", message.Error)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
WorkloadID: "",
Data: []byte(fmt.Sprintf("Create workload failed %+v", message.Error)),
StdStreamType: types.Stderr,
})
continue
}

Expand All @@ -67,13 +76,27 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
workload, err := c.GetWorkload(ctx, message.WorkloadID)
if err != nil {
logger.Errorf("[RunAndWait] Get workload failed %+v", err)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.Stderr,
})
return
}

var stdout, stderr io.ReadCloser
if stdout, stderr, err = workload.Engine.VirtualizationLogs(ctx, &enginetypes.VirtualizationLogStreamOptions{
ID: message.WorkloadID, Follow: true, Stdout: true, Stderr: true}); err != nil {
ID: message.WorkloadID,
Follow: true,
Stdout: true,
Stderr: true,
}); err != nil {
logger.Errorf("[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.Stderr,
})
return
}

Expand All @@ -85,6 +108,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true)
if err != nil {
logger.Errorf("[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.Stderr,
})
return
}

Expand All @@ -95,7 +123,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
splitFunc, split = bufio.ScanBytes, byte(0)
}

// return workload id as first message for lambda
// return workload id as first normal message for lambda
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(""),
Expand All @@ -113,6 +141,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "")
if err != nil {
logger.Errorf("[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err)
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, err)),
StdStreamType: types.Stderr,
})
return
}

Expand All @@ -134,6 +167,10 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
if err := commit(); err != nil {
logger.Errorf("[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err)
}

for _, message := range errorMessages {
runMsgCh <- message
}
log.Info("[RunAndWait] Finish run and wait for workloads")
}()

Expand Down
154 changes: 124 additions & 30 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"
"io"
"io/ioutil"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
Expand All @@ -25,6 +25,7 @@ import (
)

func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
assert := assert.New(t)
c, _ := newCreateWorkloadCluster(t)
c.wal = &WAL{WAL: &walmocks.WAL{}}

Expand Down Expand Up @@ -53,19 +54,29 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
mstore.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")).Once()

ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
require.NoError(t, err)
require.NotNil(t, ch)
require.False(t, walCommitted)
require.Nil(t, <-ch) // recv nil due to the ch will be closed.
assert.NoError(err)
assert.NotNil(ch)
assert.False(walCommitted)
m := <-ch
assert.Equal(m.WorkloadID, "")
assert.True(strings.HasPrefix(string(m.Data), "Create workload failed"))

lambdaID, exists := opts.Labels[labelLambdaID]
require.True(t, exists)
require.True(t, len(lambdaID) > 1)
require.True(t, walCommitted)
assert.True(exists)
assert.True(len(lambdaID) > 1)
assert.True(walCommitted)
assert.Equal(m.StdStreamType, types.Stderr)
}

func TestLambdaWithWorkloadIDReturned(t *testing.T) {
c, _ := newLambdaCluster(t)
assert := assert.New(t)
c, nodes := newLambdaCluster(t)
engine := nodes[0].Engine.(*enginemocks.API)

workload := &types.Workload{ID: "workloadfortonictest", Engine: engine}
store := c.store.(*storemocks.Store)
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)

opts := &types.DeployOptions{
Name: "zc:name",
Expand All @@ -79,12 +90,112 @@ func TestLambdaWithWorkloadIDReturned(t *testing.T) {
},
}

r1, w1 := io.Pipe()
go func() {
w1.Write([]byte("stdout line1\n"))
w1.Write([]byte("stdout line2\n"))
w1.Close()
}()
r2, w2 := io.Pipe()
go func() {
w2.Write([]byte("stderr line1\n"))
w2.Write([]byte("stderr line2\n"))
w2.Close()
}()
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)
engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationWaitResult{Code: 0}, nil)

ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(t, err)
assert.NotNil(t, ch)
assert.NoError(err)
assert.NotNil(ch)

m := <-ch
require.Equal(t, m.WorkloadID, "workloadfortonictest")
ms := []*types.AttachWorkloadMessage{}
for m := range ch {
ms = append(ms, m)
}
assert.Equal(len(ms), 8)
assert.Equal(ms[0].WorkloadID, "workloadfortonictest")
assert.Equal(ms[0].Data, []byte(""))
assert.Equal(ms[1].WorkloadID, "workloadfortonictest")
assert.Equal(ms[1].Data, []byte(""))
assert.True(strings.HasPrefix(string(ms[7].Data), exitDataPrefix))
assert.True(strings.HasPrefix(string(ms[6].Data), exitDataPrefix))
}

func TestLambdaWithError(t *testing.T) {
assert := assert.New(t)
c, nodes := newLambdaCluster(t)
engine := nodes[0].Engine.(*enginemocks.API)

workload := &types.Workload{ID: "workloadfortonictest", Engine: engine}
store := c.store.(*storemocks.Store)
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)

opts := &types.DeployOptions{
Name: "zc:name",
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
ResourceOpts: types.ResourceOptions{CPUQuotaLimit: 1},
Image: "zc:test",
Entrypoint: &types.Entrypoint{
Name: "good-entrypoint",
},
}

store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error")).Twice()
ch0, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch0)
m0 := <-ch0
assert.Equal(m0.WorkloadID, "workloadfortonictest")
assert.True(strings.HasPrefix(string(m0.Data), "Get workload"))

store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)

engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("error")).Twice()
ch1, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch1)
m1 := <-ch1
assert.Equal(m1.WorkloadID, "workloadfortonictest")
assert.True(strings.HasPrefix(string(m1.Data), "Fetch log for workload"))

r1, w1 := io.Pipe()
go func() {
w1.Write([]byte("stdout line1\n"))
w1.Write([]byte("stdout line2\n"))
w1.Close()
}()
r2, w2 := io.Pipe()
go func() {
w2.Write([]byte("stderr line1\n"))
w2.Write([]byte("stderr line2\n"))
w2.Close()
}()
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)

engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error"))
ch2, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch2)

ms := []*types.AttachWorkloadMessage{}
for m := range ch2 {
ms = append(ms, m)
}
assert.Equal(len(ms), 8)
assert.Equal(ms[0].WorkloadID, "workloadfortonictest")
assert.Equal(ms[0].Data, []byte(""))
assert.Equal(ms[1].WorkloadID, "workloadfortonictest")
assert.Equal(ms[1].Data, []byte(""))

m2 := ms[7]
assert.Equal(m2.WorkloadID, "workloadfortonictest")
assert.True(strings.HasPrefix(string(m2.Data), "Wait workload"))
m3 := ms[6]
assert.Equal(m3.WorkloadID, "workloadfortonictest")
assert.True(strings.HasPrefix(string(m3.Data), "Wait workload"))
}

func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
Expand Down Expand Up @@ -167,21 +278,6 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", nil)

r1, w1 := io.Pipe()
go func() {
w1.Write([]byte("stdout line1\n"))
w1.Write([]byte("stdout line2\n"))
w1.Close()
}()
r2, w2 := io.Pipe()
go func() {
w2.Write([]byte("stderr line1\n"))
w2.Write([]byte("stderr line2\n"))
w2.Close()
}()
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)
engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationWaitResult{Code: 0})

// doCreateAndStartWorkload fails: AddWorkload
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "workloadfortonictest"}, nil)
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -190,7 +286,5 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
store.On("AddWorkload", mock.Anything, mock.Anything).Return(nil)

workload := &types.Workload{ID: "workloadfortonictest", Engine: engine}
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
return c, nodes
}

0 comments on commit 4464cb0

Please sign in to comment.