Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename all container to workload #271

Merged
merged 2 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/interceptor/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func NewUnaryRetry(retryOpts RetryOptions) grpc.UnaryClientInterceptor {

// RPCNeedRetry records rpc stream methods to retry
var RPCNeedRetry = map[string]struct{}{
"/pb.CoreRPC/ContainerStatusStream": {},
"/pb.CoreRPC/WatchServiceStatus": {},
"/pb.CoreRPC/WorkloadStatusStream": {},
"/pb.CoreRPC/WatchServiceStatus": {},
}

// NewStreamRetry make specific stream retry on error
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Calcium) buildFromContent(ctx context.Context, node *types.Node, refs [

func (c *Calcium) buildFromExist(ctx context.Context, ref, existID string) (chan *types.BuildImageMessage, error) {
return withImageBuiltChannel(func(ch chan *types.BuildImageMessage) {
node, err := c.getContainerNode(ctx, existID)
node, err := c.getWorkloadNode(ctx, existID)
if err != nil {
ch <- buildErrMsg(err)
return
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestBuild(t *testing.T) {
// build from exist not implemented
opts.BuildMethod = types.BuildFromExist
engine.On("ImageBuildFromExist", mock.Anything, mock.Anything, mock.Anything).Return("", types.ErrEngineNotImplemented).Once()
store.On("GetContainer", mock.Anything, mock.Anything).Return(&types.Container{}, nil)
store.On("GetWorkload", mock.Anything, mock.Anything).Return(&types.Workload{}, nil)
store.On("GetNode", mock.Anything, mock.Anything).Return(&types.Node{Engine: engine}, nil)
ch, err = c.BuildImage(ctx, opts)
assert.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func NewTestCluster() *Calcium {
Git: types.GitConfig{
CloneTimeout: 300 * time.Second,
},
Scheduler: types.SchedConfig{
MaxShare: -1,
ShareBase: 100,
},
}
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
Expand Down
38 changes: 0 additions & 38 deletions cluster/calcium/container.go

This file was deleted.

56 changes: 0 additions & 56 deletions cluster/calcium/container_test.go

This file was deleted.

62 changes: 31 additions & 31 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
log "github.com/sirupsen/logrus"
)

// ControlContainer control containers status
func (c *Calcium) ControlContainer(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlContainerMessage, error) {
ch := make(chan *types.ControlContainerMessage)
// ControlWorkload control workloads status
func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
ch := make(chan *types.ControlWorkloadMessage)

go func() {
defer close(ch)
Expand All @@ -23,35 +23,35 @@ func (c *Calcium) ControlContainer(ctx context.Context, IDs []string, t string,
go func(ID string) {
defer wg.Done()
var message []*bytes.Buffer
err := c.withContainerLocked(ctx, ID, func(container *types.Container) error {
err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error {
var err error
switch t {
case cluster.ContainerStop:
message, err = c.doStopContainer(ctx, container, force)
case cluster.WorkloadStop:
message, err = c.doStopWorkload(ctx, workload, force)
return err
case cluster.ContainerStart:
message, err = c.doStartContainer(ctx, container, force)
case cluster.WorkloadStart:
message, err = c.doStartWorkload(ctx, workload, force)
return err
case cluster.ContainerRestart:
message, err = c.doStopContainer(ctx, container, force)
case cluster.WorkloadRestart:
message, err = c.doStopWorkload(ctx, workload, force)
if err != nil {
return err
}
startHook, err := c.doStartContainer(ctx, container, force)
startHook, err := c.doStartWorkload(ctx, workload, force)
message = append(message, startHook...)
return err
}
return types.ErrUnknownControlType
})
if err == nil {
log.Infof("[ControlContainer] Container %s %s", ID, t)
log.Info("[ControlContainer] Hook Output:")
log.Infof("[ControlWorkload] Workload %s %s", ID, t)
log.Info("[ControlWorkload] Hook Output:")
log.Info(string(utils.MergeHookOutputs(message)))
}
ch <- &types.ControlContainerMessage{
ContainerID: ID,
Error: err,
Hook: message,
ch <- &types.ControlWorkloadMessage{
WorkloadID: ID,
Error: err,
Hook: message,
}
}(ID)
}
Expand All @@ -61,31 +61,31 @@ func (c *Calcium) ControlContainer(ctx context.Context, IDs []string, t string,
return ch, nil
}

func (c *Calcium) doStartContainer(ctx context.Context, container *types.Container, force bool) (message []*bytes.Buffer, err error) {
if err = container.Start(ctx); err != nil {
func (c *Calcium) doStartWorkload(ctx context.Context, workload *types.Workload, force bool) (message []*bytes.Buffer, err error) {
if err = workload.Start(ctx); err != nil {
return message, err
}
// TODO healthcheck first
if container.Hook != nil && len(container.Hook.AfterStart) > 0 {
if workload.Hook != nil && len(workload.Hook.AfterStart) > 0 {
message, err = c.doHook(
ctx,
container.ID, container.User,
container.Hook.AfterStart, container.Env,
container.Hook.Force, container.Privileged,
force, container.Engine,
workload.ID, workload.User,
workload.Hook.AfterStart, workload.Env,
workload.Hook.Force, workload.Privileged,
force, workload.Engine,
)
}
return message, err
}

func (c *Calcium) doStopContainer(ctx context.Context, container *types.Container, force bool) (message []*bytes.Buffer, err error) {
if container.Hook != nil && len(container.Hook.BeforeStop) > 0 {
func (c *Calcium) doStopWorkload(ctx context.Context, workload *types.Workload, force bool) (message []*bytes.Buffer, err error) {
if workload.Hook != nil && len(workload.Hook.BeforeStop) > 0 {
message, err = c.doHook(
ctx,
container.ID, container.User,
container.Hook.BeforeStop, container.Env,
container.Hook.Force, container.Privileged,
force, container.Engine,
workload.ID, workload.User,
workload.Hook.BeforeStop, workload.Env,
workload.Hook.Force, workload.Privileged,
force, workload.Engine,
)
if err != nil {
return message, err
Expand All @@ -95,7 +95,7 @@ func (c *Calcium) doStopContainer(ctx context.Context, container *types.Containe
// 这里 block 的问题很严重,按照目前的配置是 5 分钟一级的 block
// 一个简单的处理方法是相信 ctx 不相信 engine 自身的处理
// 另外我怀疑 engine 自己的 timeout 实现是完全的等 timeout 而非结束了就退出
if err = container.Stop(ctx); err != nil {
if err = workload.Stop(ctx); err != nil {
message = append(message, bytes.NewBufferString(err.Error()))
}
return message, err
Expand Down
Loading