Skip to content

Commit

Permalink
migrate selfmon to core
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Jan 4, 2022
1 parent d903237 commit dc43c96
Show file tree
Hide file tree
Showing 41 changed files with 1,684 additions and 1,246 deletions.
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch
if err != nil {
return nil, logger.Err(ctx, err)
}

log.Infof(ctx, "[BuildImage] Building image at pod %s node %s", node.Podname, node.Name)

var (
Expand Down
6 changes: 6 additions & 0 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Calcium struct {
watcher discovery.Service
wal *WAL
identifier string
selfmon *NodeStatusWatcher
}

// New returns a new cluster config
Expand Down Expand Up @@ -82,6 +83,11 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
cal.selfmon = NewNodeStatusWatcher(cal)

// start node status watcher
go cal.selfmon.run()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}

Expand Down
9 changes: 6 additions & 3 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func NewTestCluster() *Calcium {
MaxShare: -1,
ShareBase: 100,
},
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
HAKeepaliveInterval: 16 * time.Second,
}
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
Expand All @@ -69,7 +70,7 @@ func NewTestCluster() *Calcium {
}

func TestNewCluster(t *testing.T) {
config := types.Config{WALFile: "/tmp/a"}
config := types.Config{WALFile: "/tmp/a", HAKeepaliveInterval: 16 * time.Second}
_, err := New(config, nil)
assert.Error(t, err)

Expand All @@ -91,6 +92,7 @@ func TestNewCluster(t *testing.T) {
SCMType: "gitlab",
PrivateKey: privFile.Name(),
},
HAKeepaliveInterval: 16 * time.Second,
}
c1, err := New(config1, t)
assert.NoError(t, err)
Expand All @@ -102,6 +104,7 @@ func TestNewCluster(t *testing.T) {
SCMType: "github",
PrivateKey: privFile.Name(),
},
HAKeepaliveInterval: 16 * time.Second,
}
c2, err := New(config2, t)
assert.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
defer wg.Done()
var message []*bytes.Buffer
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {

var err error
switch t {
case cluster.WorkloadStop:
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Calcium) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorklo

workload, err := c.GetWorkload(ctx, opts.WorkloadID)
if err != nil {
logger.Errorf(ctx, "[ExecuteWorkload] Failed to get wordload: %+v", err)
logger.Errorf(ctx, "[ExecuteWorkload] Failed to get workload: %+v", err)
return
}

Expand Down
1 change: 1 addition & 0 deletions cluster/calcium/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (c *Calcium) ListNetworks(ctx context.Context, podname string, driver strin
}

node := nodes[0]

networks, err = node.Engine.NetworkList(ctx, drivers)
return networks, logger.Err(ctx, errors.WithStack(err))
}
Expand Down
10 changes: 1 addition & 9 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *Calcium) setAllWorkloadsOnNodeDown(ctx context.Context, nodename string
}

// SetNode set node available or not
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) { // nolint
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) {
logger := log.WithField("Calcium", "SetNode").WithField("opts", opts)
if err := opts.Validate(); err != nil {
return nil, logger.Err(ctx, err)
Expand All @@ -131,18 +131,10 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
opts.Normalize(node)
n = node

n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if !n.Available {
// remove node status
if err := c.store.SetNodeStatus(ctx, node, -1); err != nil {
// don't return here
logger.Errorf(ctx, "[SetNode] failed to remove node status, err: %+v", errors.WithStack(err))
}
}
if opts.WorkloadsDown {
c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename)
}
Expand Down
15 changes: 7 additions & 8 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,35 +160,34 @@ func TestSetNode(t *testing.T) {
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
// failed by no node name
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test1", StatusOpt: 2})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test1"})
assert.Error(t, err)
// failed by updatenode
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 2})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test"})
assert.Error(t, err)
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil)
// succ when node available
n, err := c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 2, Endpoint: "tcp://127.0.0.1:2379"})
n, err := c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", Endpoint: "tcp://127.0.0.1:2379"})
assert.NoError(t, err)
assert.Equal(t, n.Name, name)
assert.Equal(t, n.Endpoint, "tcp://127.0.0.1:2379")
// not available
// can still set node even if ListNodeWorkloads fails
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 0, WorkloadsDown: true})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", WorkloadsDown: true})
assert.NoError(t, err)
workloads := []*types.Workload{{Name: "wrong_name"}, {Name: "a_b_c"}}
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(workloads, nil)
store.On("SetWorkloadStatus",
mock.Anything, mock.Anything, mock.Anything,
).Return(types.ErrNoETCD)
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 0, WorkloadsDown: true})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", WorkloadsDown: true})
assert.NoError(t, err)
// test modify
setOpts := &types.SetNodeOptions{
Nodename: "test",
StatusOpt: 1,
Labels: map[string]string{"some": "1"},
Nodename: "test",
Labels: map[string]string{"some": "1"},
}
// set label
n, err = c.SetNode(ctx, setOpts)
Expand Down
1 change: 1 addition & 0 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
return
}
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {

return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
Expand Down
214 changes: 214 additions & 0 deletions cluster/calcium/selfmon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package calcium

import (
"context"
"math/rand"
"os/signal"
"syscall"
"time"

"github.com/pkg/errors"

"github.com/projecteru2/core/log"
coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)

// ActiveKey .
const ActiveKey = "/selfmon/active"

// NodeStatusWatcher monitors the changes of node status
type NodeStatusWatcher struct {
id int64
cal *Calcium
}

// NewNodeStatusWatcher .
func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher {
rand.Seed(time.Now().UnixNano())
id := rand.Int63n(10000) // nolint
return &NodeStatusWatcher{
id: id,
cal: cal,
}
}

func (n *NodeStatusWatcher) run() {
ctx := n.getSignalContext(context.TODO())
for {
select {
case <-ctx.Done():
return
default:
n.withActiveLock(ctx, func(ctx context.Context) {
if err := n.monitor(ctx); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err)
}
})
time.Sleep(n.cal.config.ConnectionTimeout)
}
}
}

func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context {
exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
defer cancel()
<-exitCtx.Done()
log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id)
}()

return exitCtx
}

// withActiveLock acquires the active lock synchronously
func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

var expiry <-chan struct{}
var unregister func()
defer func() {
if unregister != nil {
log.Infof(ctx, "[Register] %v unregisters", n.id)
unregister()
}
}()

retryCounter := 0

for {
select {
case <-ctx.Done():
log.Info("[Register] context canceled")
return
default:
}

// try to get the lock
if ne, un, err := n.register(ctx); err != nil {
if errors.Is(err, context.Canceled) {
log.Info("[Register] context canceled")
return
} else if !errors.Is(err, coretypes.ErrKeyExists) {
log.Errorf(ctx, "[Register] failed to re-register: %v", err)
time.Sleep(time.Second)
continue
}
if retryCounter == 0 {
log.Infof(ctx, "[Register] %v failed to register, there has been another active node status watcher", n.id)
}
retryCounter = (retryCounter + 1) % 60
time.Sleep(time.Second)
} else {
log.Infof(ctx, "[Register] node status watcher %v has been active", n.id)
expiry = ne
unregister = un
break
}
}

// cancel the ctx when: 1. selfmon closed 2. lost the active lock
go func() {
defer cancel()

select {
case <-ctx.Done():
log.Info("[Register] context canceled")
return
case <-expiry:
log.Info("[Register] lock expired")
return
}
}()

f(ctx)
}

func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) {
return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval)
}

func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
log.Debug(ctx, "[NodeStatusWatcher] init node status started")
nodes := make(chan *coretypes.Node)

go func() {
defer close(nodes)
// Get all nodes which are active status, and regardless of pod.
var err error
var ch <-chan *coretypes.Node
utils.WithTimeout(ctx, n.cal.config.GlobalTimeout, func(ctx context.Context) {
ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{
Podname: "",
Labels: nil,
All: true,
Info: false,
})
if err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err)
return
}
for node := range ch {
log.Debugf(ctx, "[NodeStatusWatcher] watched %s/%s", node.Name, node.Endpoint)
nodes <- node
}
})
if err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err)
return
}
}()

for node := range nodes {
status, err := n.cal.GetNodeStatus(ctx, node.Name)
if err != nil {
status = &coretypes.NodeStatus{
Nodename: node.Name,
Podname: node.Podname,
Alive: false,
}
}
n.dealNodeStatusMessage(ctx, status)
}
}

func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
// init node status first
go n.initNodeStatus(ctx)

// monitor node status
messageChan := n.cal.NodeStatusStream(ctx)
log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id)
defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id)

for {
select {
case message, ok := <-messageChan:
if !ok {
return coretypes.ErrMessageChanClosed
}
go n.dealNodeStatusMessage(ctx, message)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) {
if message.Error != nil {
log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message)
return
}

// TODO maybe we need a distributed lock to control concurrency
opts := &coretypes.SetNodeOptions{
Nodename: message.Nodename,
WorkloadsDown: !message.Alive,
}
if _, err := n.cal.SetNode(ctx, opts); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err)
return
}
log.Infof(ctx, "[NodeStatusWatcher] set node %s as alive: %v", message.Nodename, message.Alive)
}
2 changes: 1 addition & 1 deletion cluster/mocks/Cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit dc43c96

Please sign in to comment.