Skip to content

Commit

Permalink
migrate selfmon to core
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Jan 3, 2022
1 parent d903237 commit 1ab646d
Show file tree
Hide file tree
Showing 46 changed files with 1,541 additions and 1,229 deletions.
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch
if err != nil {
return nil, logger.Err(ctx, err)
}
if node.Engine == nil {
return nil, logger.Err(ctx, types.ErrNilEngine)
}
log.Infof(ctx, "[BuildImage] Building image at pod %s node %s", node.Podname, node.Name)

var (
Expand Down
6 changes: 6 additions & 0 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Calcium struct {
watcher discovery.Service
wal *WAL
identifier string
selfmon *NodeStatusWatcher
}

// New returns a new cluster config
Expand Down Expand Up @@ -82,6 +83,11 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
cal.selfmon = NewNodeStatusWatcher(cal)

// start node status watcher
go cal.selfmon.run()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}

Expand Down
9 changes: 6 additions & 3 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func NewTestCluster() *Calcium {
MaxShare: -1,
ShareBase: 100,
},
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
HAKeepaliveInterval: 16 * time.Second,
}
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
Expand All @@ -69,7 +70,7 @@ func NewTestCluster() *Calcium {
}

func TestNewCluster(t *testing.T) {
config := types.Config{WALFile: "/tmp/a"}
config := types.Config{WALFile: "/tmp/a", HAKeepaliveInterval: 16 * time.Second}
_, err := New(config, nil)
assert.Error(t, err)

Expand All @@ -91,6 +92,7 @@ func TestNewCluster(t *testing.T) {
SCMType: "gitlab",
PrivateKey: privFile.Name(),
},
HAKeepaliveInterval: 16 * time.Second,
}
c1, err := New(config1, t)
assert.NoError(t, err)
Expand All @@ -102,6 +104,7 @@ func TestNewCluster(t *testing.T) {
SCMType: "github",
PrivateKey: privFile.Name(),
},
HAKeepaliveInterval: 16 * time.Second,
}
c2, err := New(config2, t)
assert.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
defer wg.Done()
var message []*bytes.Buffer
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
if workload.Engine == nil {
return types.ErrNilEngine
}
var err error
switch t {
case cluster.WorkloadStop:
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
defer wg.Done()

workload, err := c.GetWorkload(ctx, id)
if err == nil && workload.Engine == nil {
err = types.ErrNilEngine
}
if err != nil {
for _, path := range paths {
ch <- &types.CopyMessage{
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ func (c *Calcium) doGetAndPrepareNode(ctx context.Context, nodename, image strin
if err != nil {
return nil, err
}
if node.Engine == nil {
return nil, types.ErrNilEngine
}

return node, pullImage(ctx, node, image)
}
Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func TestDissociateWorkload(t *testing.T) {
// failed by RemoveWorkload
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
ch, err = c.DissociateWorkload(ctx, []string{"c1"})
assert.NoError(t, err)
for r := range ch {
Expand Down
7 changes: 6 additions & 1 deletion cluster/calcium/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ func (c *Calcium) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorklo

workload, err := c.GetWorkload(ctx, opts.WorkloadID)
if err != nil {
logger.Errorf(ctx, "[ExecuteWorkload] Failed to get wordload: %+v", err)
logger.Errorf(ctx, "[ExecuteWorkload] Failed to get workload: %+v", err)
return
}
if workload.Engine == nil {
logger.Errorf(ctx, "[ExecuteWorkload] failed to get engine of workload %v", workload.ID)
err = types.ErrNilEngine
return
}

Expand Down
7 changes: 6 additions & 1 deletion cluster/calcium/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func (c *Calcium) RemoveImage(ctx context.Context, opts *types.ImageOptions) (ch
Image: image,
Messages: []string{},
}
if removeItems, err := node.Engine.ImageRemove(ctx, image, false, true); err != nil {
if node.Engine == nil {
m.Messages = append(m.Messages, logger.Err(ctx, types.ErrNilEngine).Error())
} else if removeItems, err := node.Engine.ImageRemove(ctx, image, false, true); err != nil {
m.Messages = append(m.Messages, logger.Err(ctx, err).Error())
} else {
m.Success = true
Expand All @@ -57,6 +59,9 @@ func (c *Calcium) RemoveImage(ctx context.Context, opts *types.ImageOptions) (ch
ch <- m
}
if opts.Prune {
if node.Engine == nil {
logger.Errorf(ctx, "[RemoveImage] Prune %s pod %s node failed: %+v", opts.Podname, node.Name, types.ErrNilEngine)
}
if err := node.Engine.ImagesPrune(ctx); err != nil {
logger.Errorf(ctx, "[RemoveImage] Prune %s pod %s node failed: %+v", opts.Podname, node.Name, err)
} else {
Expand Down
9 changes: 9 additions & 0 deletions cluster/calcium/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func (c *Calcium) ListNetworks(ctx context.Context, podname string, driver strin
}

node := nodes[0]
if node.Engine == nil {
return networks, logger.Err(ctx, types.ErrNilEngine)
}
networks, err = node.Engine.NetworkList(ctx, drivers)
return networks, logger.Err(ctx, errors.WithStack(err))
}
Expand All @@ -48,6 +51,9 @@ func (c *Calcium) ConnectNetwork(ctx context.Context, network, target, ipv4, ipv
return nil, logger.Err(ctx, errors.WithStack(err))
}

if workload.Engine == nil {
return nil, logger.Err(ctx, types.ErrNilEngine)
}
networks, err := workload.Engine.NetworkConnect(ctx, network, target, ipv4, ipv6)
return networks, logger.Err(ctx, errors.WithStack(err))
}
Expand All @@ -59,6 +65,9 @@ func (c *Calcium) DisconnectNetwork(ctx context.Context, network, target string,
if err != nil {
return logger.Err(ctx, errors.WithStack(err))
}
if workload.Engine == nil {
return logger.Err(ctx, types.ErrNilEngine)
}

return logger.Err(ctx, errors.WithStack(workload.Engine.NetworkDisconnect(ctx, network, target, force)))
}
10 changes: 1 addition & 9 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *Calcium) setAllWorkloadsOnNodeDown(ctx context.Context, nodename string
}

// SetNode set node available or not
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) { // nolint
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) {
logger := log.WithField("Calcium", "SetNode").WithField("opts", opts)
if err := opts.Validate(); err != nil {
return nil, logger.Err(ctx, err)
Expand All @@ -131,18 +131,10 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
opts.Normalize(node)
n = node

n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if !n.Available {
// remove node status
if err := c.store.SetNodeStatus(ctx, node, -1); err != nil {
// don't return here
logger.Errorf(ctx, "[SetNode] failed to remove node status, err: %+v", errors.WithStack(err))
}
}
if opts.WorkloadsDown {
c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename)
}
Expand Down
15 changes: 7 additions & 8 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,35 +160,34 @@ func TestSetNode(t *testing.T) {
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
// failed by no node name
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test1", StatusOpt: 2})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test1"})
assert.Error(t, err)
// failed by updatenode
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 2})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test"})
assert.Error(t, err)
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil)
// succ when node available
n, err := c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 2, Endpoint: "tcp://127.0.0.1:2379"})
n, err := c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", Endpoint: "tcp://127.0.0.1:2379"})
assert.NoError(t, err)
assert.Equal(t, n.Name, name)
assert.Equal(t, n.Endpoint, "tcp://127.0.0.1:2379")
// not available
// can still set node even if ListNodeWorkloads fails
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 0, WorkloadsDown: true})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", WorkloadsDown: true})
assert.NoError(t, err)
workloads := []*types.Workload{{Name: "wrong_name"}, {Name: "a_b_c"}}
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(workloads, nil)
store.On("SetWorkloadStatus",
mock.Anything, mock.Anything, mock.Anything,
).Return(types.ErrNoETCD)
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 0, WorkloadsDown: true})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", WorkloadsDown: true})
assert.NoError(t, err)
// test modify
setOpts := &types.SetNodeOptions{
Nodename: "test",
StatusOpt: 1,
Labels: map[string]string{"some": "1"},
Nodename: "test",
Labels: map[string]string{"some": "1"},
}
// set label
n, err = c.SetNode(ctx, setOpts)
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
return
}
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
if node.Engine == nil {
return logger.Err(ctx, types.ErrNilEngine)
}
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestRemoveWorkload(t *testing.T) {
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Twice()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
assert.NoError(t, err)
for r := range ch {
Expand Down
4 changes: 4 additions & 0 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (c *Calcium) ReplaceWorkload(ctx context.Context, opts *types.ReplaceOption
removeMessage := &types.RemoveWorkloadMessage{WorkloadID: id}
var err error
if err = c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
if workload.Engine == nil {
log.Errorf(ctx, "[ReplaceWorkload] engine of workload %v is nil", workload.ID)
return types.ErrNilEngine
}
if opts.Podname != "" && workload.Podname != opts.Podname {
log.Warnf(ctx, "[ReplaceWorkload] Skip not in pod workload %s", workload.ID)
return errors.WithStack(types.NewDetailedErr(types.ErrIgnoreWorkload,
Expand Down
7 changes: 7 additions & 0 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo
nr.Diffs = append(nr.Diffs, fmt.Sprintf("init storage < init volumes: %d < %d", node.InitStorageCap, node.InitVolume.Total()))
}

if node.Engine == nil {
return types.ErrNilEngine
}

if err := node.Engine.ResourceValidate(ctx, cpuByWorkloads, cpumapByWorkloads, memoryByWorkloads, storageByWorkloads); err != nil {
nr.Diffs = append(nr.Diffs, err.Error())
}
Expand Down Expand Up @@ -211,6 +215,9 @@ func (c *Calcium) doAllocResource(ctx context.Context, nodeMap map[string]*types
// called on changes of resource binding, such as cpu binding
// as an internal api, remap doesn't lock node, the responsibility of that should be taken on by caller
func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch <-chan enginetypes.VirtualizationRemapMessage, err error) {
if node.Engine == nil {
return nil, types.ErrNilEngine
}
workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return
Expand Down
Loading

0 comments on commit 1ab646d

Please sign in to comment.