Skip to content

Commit

Permalink
accelerate workload force stop (#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored Mar 1, 2021
1 parent cbb0e3c commit 76081c8
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c *Calcium) doStopWorkload(ctx context.Context, workload *types.Workload,
// 这里 block 的问题很严重,按照目前的配置是 5 分钟一级的 block
// 一个简单的处理方法是相信 ctx 不相信 engine 自身的处理
// 另外我怀疑 engine 自己的 timeout 实现是完全的等 timeout 而非结束了就退出
if err = workload.Stop(ctx); err != nil {
if err = workload.Stop(ctx, force); err != nil {
message = append(message, bytes.NewBufferString(err.Error()))
}
return message, errors.WithStack(err)
Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ func TestControlStop(t *testing.T) {
// stop failed
workload.Hook.Force = false
ch, err = c.ControlWorkload(ctx, []string{"id1"}, cluster.WorkloadStop, false)
engine.On("VirtualizationStop", mock.Anything, mock.Anything).Return(types.ErrNilEngine).Once()
engine.On("VirtualizationStop", mock.Anything, mock.Anything, mock.Anything).Return(types.ErrNilEngine).Once()
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
engine.On("VirtualizationStop", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationStop", mock.Anything, mock.Anything, mock.Anything).Return(nil)
// stop success
ch, err = c.ControlWorkload(ctx, []string{"id1"}, cluster.WorkloadStop, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestControlRestart(t *testing.T) {
}
workload.Hook = nil
// success
engine.On("VirtualizationStop", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationStop", mock.Anything, mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
ch, err = c.ControlWorkload(ctx, []string{"id1"}, cluster.WorkloadRestart, false)
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestReplaceWorkload(t *testing.T) {
engine.On("VirtualizationCopyFrom", mock.Anything, mock.Anything, mock.Anything).Return(ioutil.NopCloser(bytes.NewReader([]byte{})), "", nil)
opts.DeployOptions.Data = map[string]types.ReaderManager{}
// failed by Stop
engine.On("VirtualizationStop", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
engine.On("VirtualizationStop", mock.Anything, mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
ch, err = c.ReplaceWorkload(ctx, opts)
assert.NoError(t, err)
Expand All @@ -147,7 +147,7 @@ func TestReplaceWorkload(t *testing.T) {
assert.NotNil(t, r.Remove)
assert.False(t, r.Remove.Success)
}
engine.On("VirtualizationStop", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationStop", mock.Anything, mock.Anything, mock.Anything).Return(nil)
// failed by VirtualizationCreate
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, types.ErrCannotGetEngine).Once()
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (c *Calcium) GetWorkloadsStatus(ctx context.Context, ids []string) ([]*type

// SetWorkloadsStatus set workloads status
func (c *Calcium) SetWorkloadsStatus(ctx context.Context, status []*types.StatusMeta, ttls map[string]int64) ([]*types.StatusMeta, error) {
logger := log.WithField("Calcium", "SetWorkloadsStatus").WithField("status", status).WithField("ttls", ttls)
logger := log.WithField("Calcium", "SetWorkloadsStatus").WithField("status", status[0]).WithField("ttls", ttls)
r := []*types.StatusMeta{}
for _, workloadStatus := range status {
workload, err := c.store.GetWorkload(ctx, workloadStatus.ID)
Expand Down
9 changes: 7 additions & 2 deletions engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/docker/go-connections/nat"
"github.com/docker/go-units"
Expand Down Expand Up @@ -236,8 +237,12 @@ func (e *Engine) VirtualizationStart(ctx context.Context, ID string) error {
}

// VirtualizationStop stop virtualization
func (e *Engine) VirtualizationStop(ctx context.Context, ID string) error {
return e.client.ContainerStop(ctx, ID, nil)
func (e *Engine) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error {
timeout := &gracefulTimeout
if gracefulTimeout <= 0 {
timeout = nil
}
return e.client.ContainerStop(ctx, ID, timeout)
}

// VirtualizationRemove remove virtualization
Expand Down
3 changes: 2 additions & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package engine
import (
"context"
"io"
"time"

enginetypes "github.com/projecteru2/core/engine/types"
coresource "github.com/projecteru2/core/source"
Expand Down Expand Up @@ -37,7 +38,7 @@ type API interface {
VirtualizationCreate(ctx context.Context, opts *enginetypes.VirtualizationCreateOptions) (*enginetypes.VirtualizationCreated, error)
VirtualizationCopyTo(ctx context.Context, ID, target string, content io.Reader, AllowOverwriteDirWithFile, CopyUIDGID bool) error
VirtualizationStart(ctx context.Context, ID string) error
VirtualizationStop(ctx context.Context, ID string) error
VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error
VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error
VirtualizationInspect(ctx context.Context, ID string) (*enginetypes.VirtualizationInfo, error)
VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (stdout, stderr io.ReadCloser, err error)
Expand Down
12 changes: 7 additions & 5 deletions engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/mocks/fakeengine/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint
e.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(vc, nil)
e.On("VirtualizationCopyTo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
e.On("VirtualizationStop", mock.Anything, mock.Anything).Return(nil)
e.On("VirtualizationStop", mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
vcJSON := &enginetypes.VirtualizationInfo{ID: ID, Image: "mock-image", Running: true, Networks: map[string]string{"mock-network": "1.1.1.1"}}
e.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(vcJSON, nil)
Expand Down
5 changes: 3 additions & 2 deletions engine/systemd/virtualization.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"
enginetypes "github.com/projecteru2/core/engine/types"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *SSHClient) VirtualizationStart(ctx context.Context, ID string) (err err
}

// VirtualizationStop stops a systemd service
func (s *SSHClient) VirtualizationStop(ctx context.Context, ID string) (err error) {
func (s *SSHClient) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) (err error) {
// systemctl stop $ID
_, stderr, err := s.runSingleCommand(ctx, fmt.Sprintf(cmdSystemdStop, ID), nil)
return errors.Wrap(err, stderr.String())
Expand All @@ -89,7 +90,7 @@ func (s *SSHClient) VirtualizationStop(ctx context.Context, ID string) (err erro
// VirtualizationRemove removes a systemd service
func (s *SSHClient) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) (err error) {
if force {
_ = s.VirtualizationStop(ctx, ID)
_ = s.VirtualizationStop(ctx, ID, -1)
}

// rm -f $FILE
Expand Down
3 changes: 2 additions & 1 deletion engine/virt/virt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"path/filepath"
"strings"
"time"

"github.com/projecteru2/core/log"

Expand Down Expand Up @@ -185,7 +186,7 @@ func (v *Virt) VirtualizationStart(ctx context.Context, ID string) (err error) {
}

// VirtualizationStop stops it.
func (v *Virt) VirtualizationStop(ctx context.Context, ID string) (err error) {
func (v *Virt) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) (err error) {
_, err = v.client.StopGuest(ctx, ID)
return
}
Expand Down
9 changes: 7 additions & 2 deletions types/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"context"
"time"

engine "github.com/projecteru2/core/engine"
enginetypes "github.com/projecteru2/core/engine/types"
Expand Down Expand Up @@ -60,11 +61,15 @@ func (c *Workload) Start(ctx context.Context) error {
}

// Stop a workload
func (c *Workload) Stop(ctx context.Context) error {
func (c *Workload) Stop(ctx context.Context, force bool) error {
if c.Engine == nil {
return ErrNilEngine
}
return c.Engine.VirtualizationStop(ctx, c.ID)
gracefulTimeout := time.Duration(-1) // -1 indicates use engine default timeout
if force {
gracefulTimeout = 0 // don't wait, kill -15 && kill -9
}
return c.Engine.VirtualizationStop(ctx, c.ID, gracefulTimeout)
}

// Remove a workload
Expand Down
6 changes: 3 additions & 3 deletions types/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ func TestWorkloadInspect(t *testing.T) {
func TestWorkloadControl(t *testing.T) {
mockEngine := &mocks.API{}
mockEngine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
mockEngine.On("VirtualizationStop", mock.Anything, mock.Anything).Return(nil)
mockEngine.On("VirtualizationStop", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockEngine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

ctx := context.Background()
c := Workload{}
err := c.Start(ctx)
assert.Error(t, err)
err = c.Stop(ctx)
err = c.Stop(ctx, true)
assert.Error(t, err)
err = c.Remove(ctx, true)
assert.Error(t, err)

c.Engine = mockEngine
err = c.Start(ctx)
assert.NoError(t, err)
err = c.Stop(ctx)
err = c.Stop(ctx, true)
assert.NoError(t, err)
err = c.Remove(ctx, true)
assert.NoError(t, err)
Expand Down

0 comments on commit 76081c8

Please sign in to comment.