Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge commits of v22.01.05 #541

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/dockerimage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ name: docker-image

on:
push:
branches:
- master
tags:
- v*

Expand All @@ -13,6 +11,8 @@ jobs:
steps:
- name: checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: build and push to github packages
uses: docker/build-push-action@v1
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM golang:alpine AS BUILD

# make binary
RUN apk add --no-cache build-base musl-dev git curl make cmake
RUN git clone https://github.com/projecteru2/core.git /go/src/github.com/projecteru2/core
COPY . /go/src/github.com/projecteru2/core
WORKDIR /go/src/github.com/projecteru2/core
ARG KEEP_SYMBOL
RUN make build && ./eru-core --version
Expand Down
23 changes: 3 additions & 20 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/projecteru2/core/source/github"
"github.com/projecteru2/core/source/gitlab"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/store/etcdv3"
"github.com/projecteru2/core/store/redis"
"github.com/projecteru2/core/types"

"github.com/pkg/errors"
Expand All @@ -31,27 +29,16 @@ type Calcium struct {
watcher discovery.Service
wal *WAL
identifier string
selfmon *NodeStatusWatcher
}

// New returns a new cluster config
func New(config types.Config, t *testing.T) (*Calcium, error) {
logger := log.WithField("Calcium", "New").WithField("config", config)

// set store
var store store.Store
var err error
switch config.Store {
case types.Redis:
store, err = redis.New(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}
default:
store, err = etcdv3.New(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}
store, err := store.NewStore(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}

// set scheduler
Expand Down Expand Up @@ -83,10 +70,6 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
cal.selfmon = NewNodeStatusWatcher(cal)

// start node status watcher
go cal.selfmon.run()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}
Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

Expand Down Expand Up @@ -34,7 +33,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
ctx,
// if
func(ctx context.Context) (err error) {
if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr); err == nil {
if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionIncr); err == nil {
log.Infof(ctx, "[DissociateWorkload] Workload %s dissociated", workload.ID)
}
return errors.WithStack(err)
Expand All @@ -48,7 +47,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
if failedByCond {
return nil
}
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionDecr))
},
c.config.GlobalTimeout,
)
Expand Down
61 changes: 26 additions & 35 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"

"github.com/google/uuid"
"github.com/pkg/errors"
)

const (
exitDataPrefix = "[exitcode] "
labelLambdaID = "LambdaID"
)

// RunAndWait implement lambda
Expand All @@ -39,10 +37,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
return workloadIDs, nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin)
}

commit, err := c.walCreateLambda(opts)
if err != nil {
return workloadIDs, nil, logger.Err(ctx, err)
}
createChan, err := c.CreateWorkload(ctx, opts)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Create workload error %+v", err)
Expand All @@ -54,23 +48,40 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
wg = &sync.WaitGroup{}
)

lambda := func(message *types.CreateWorkloadMessage) {
lambda := func(message *types.CreateWorkloadMessage) (attachMessage *types.AttachWorkloadMessage) {
// should Done this waitgroup anyway
defer wg.Done()

defer func() {
runMsgCh <- attachMessage
}()

// if workload is empty, which means error occurred when created workload
// we don't need to remove this non-existing workload
// so just send the error message and return
if message.Error != nil || message.WorkloadID == "" {
logger.Errorf(ctx, "[RunAndWait] Create workload failed %+v", message.Error)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: "",
Data: []byte(fmt.Sprintf("Create workload failed %+v", errors.Unwrap(message.Error))),
StdStreamType: types.EruError,
}
return
}

commit, err := c.walCreateLambda(message)
if err != nil {
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Create wal failed: %s, %+v", message.WorkloadID, logger.Err(ctx, err))),
StdStreamType: types.EruError,
}
}
defer func() {
if err := commit(); err != nil {
logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %s, %v", eventCreateLambda, message.WorkloadID, err)
}
}()

// the workload should be removed if it exists
// no matter the workload exits successfully or not
defer func() {
Expand All @@ -86,12 +97,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
workload, err := c.GetWorkload(ctx, message.WorkloadID)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Get workload failed %+v", err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

// for other cases, we have the workload and it works fine
Expand All @@ -105,12 +115,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
Stderr: true,
}); err != nil {
logger.Errorf(ctx, "[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

splitFunc, split := bufio.ScanLines, byte('\n')
Expand All @@ -121,12 +130,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true)
if err != nil {
logger.Errorf(ctx, "[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
Expand All @@ -148,20 +156,19 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "")
if err != nil {
logger.Errorf(ctx, "[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err)
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))),
StdStreamType: types.EruError,
}
return
}

if r.Code != 0 {
logger.Errorf(ctx, "[RunAndWait] %s run failed %s", utils.ShortID(message.WorkloadID), r.Message)
}

exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code)))
runMsgCh <- &types.AttachWorkloadMessage{
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: exitData,
StdStreamType: types.Stdout,
Expand All @@ -182,29 +189,13 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
utils.SentryGo(func() {
defer close(runMsgCh)
wg.Wait()
if err := commit(); err != nil {
logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err)
}

log.Info("[RunAndWait] Finish run and wait for workloads")
})

return workloadIDs, runMsgCh, nil
}

func (c *Calcium) walCreateLambda(opts *types.DeployOptions) (wal.Commit, error) {
uid, err := uuid.NewRandom()
if err != nil {
return nil, errors.WithStack(err)
}

lambdaID := uid.String()

if opts.Labels != nil {
opts.Labels[labelLambdaID] = lambdaID
} else {
opts.Labels = map[string]string{labelLambdaID: lambdaID}
}

func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return c.wal.logCreateLambda(opts)
}
12 changes: 0 additions & 12 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal"
walmocks "github.com/projecteru2/core/wal/mocks"

"github.com/stretchr/testify/assert"
Expand All @@ -31,12 +30,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {

mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
var walCommitted bool
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", eventCreateLambda, mock.Anything).Return(commit, nil).Once()

opts := &types.DeployOptions{
Name: "zc:name",
Expand All @@ -56,7 +49,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
_, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
assert.NoError(err)
assert.NotNil(ch)
assert.False(walCommitted)
ms := []*types.AttachWorkloadMessage{}
for m := range ch {
ms = append(ms, m)
Expand All @@ -65,10 +57,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
assert.Equal(m.WorkloadID, "")
assert.True(strings.HasPrefix(string(m.Data), "Create workload failed"))

lambdaID, exists := opts.Labels[labelLambdaID]
assert.True(exists)
assert.True(len(lambdaID) > 1)
assert.True(walCommitted)
assert.Equal(m.StdStreamType, types.EruError)
}

Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/projecteru2/core/utils"

"github.com/pkg/errors"
"github.com/sanity-io/litter"
)

// AddNode adds a node
Expand Down Expand Up @@ -127,13 +126,13 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}
var n *types.Node
return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
litter.Dump(opts)
logger.Infof(ctx, "set node")
opts.Normalize(node)
n = node

n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
logger.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if opts.WorkloadsDown {
c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename)
Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

Expand Down Expand Up @@ -42,7 +41,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
ctx,
// if
func(ctx context.Context) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionIncr))
},
// then
func(ctx context.Context) (err error) {
Expand All @@ -56,7 +55,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
if failedByCond {
return nil
}
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionDecr))
},
c.config.GlobalTimeout,
)
Expand Down
Loading