Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate selfmon to eru-core #528

Merged
merged 2 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch
if err != nil {
return nil, logger.Err(ctx, err)
}

log.Infof(ctx, "[BuildImage] Building image at pod %s node %s", node.Podname, node.Name)

var (
Expand Down
6 changes: 6 additions & 0 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Calcium struct {
watcher discovery.Service
wal *WAL
identifier string
selfmon *NodeStatusWatcher
}

// New returns a new cluster config
Expand Down Expand Up @@ -82,6 +83,11 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
cal.selfmon = NewNodeStatusWatcher(cal)

// start node status watcher
go cal.selfmon.run()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}

Expand Down
9 changes: 6 additions & 3 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func NewTestCluster() *Calcium {
MaxShare: -1,
ShareBase: 100,
},
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
HAKeepaliveInterval: 16 * time.Second,
}
c.store = &storemocks.Store{}
c.scheduler = &schedulermocks.Scheduler{}
Expand All @@ -69,7 +70,7 @@ func NewTestCluster() *Calcium {
}

func TestNewCluster(t *testing.T) {
config := types.Config{WALFile: "/tmp/a"}
config := types.Config{WALFile: "/tmp/a", HAKeepaliveInterval: 16 * time.Second}
_, err := New(config, nil)
assert.Error(t, err)

Expand All @@ -91,6 +92,7 @@ func TestNewCluster(t *testing.T) {
SCMType: "gitlab",
PrivateKey: privFile.Name(),
},
HAKeepaliveInterval: 16 * time.Second,
}
c1, err := New(config1, t)
assert.NoError(t, err)
Expand All @@ -102,6 +104,7 @@ func TestNewCluster(t *testing.T) {
SCMType: "github",
PrivateKey: privFile.Name(),
},
HAKeepaliveInterval: 16 * time.Second,
}
c2, err := New(config2, t)
assert.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
defer wg.Done()
var message []*bytes.Buffer
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {

var err error
switch t {
case cluster.WorkloadStop:
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Calcium) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorklo

workload, err := c.GetWorkload(ctx, opts.WorkloadID)
if err != nil {
logger.Errorf(ctx, "[ExecuteWorkload] Failed to get wordload: %+v", err)
logger.Errorf(ctx, "[ExecuteWorkload] Failed to get workload: %+v", err)
return
}

Expand Down
1 change: 1 addition & 0 deletions cluster/calcium/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (c *Calcium) ListNetworks(ctx context.Context, podname string, driver strin
}

node := nodes[0]

networks, err = node.Engine.NetworkList(ctx, drivers)
return networks, logger.Err(ctx, errors.WithStack(err))
}
Expand Down
12 changes: 2 additions & 10 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions
return func() {
err := node.Info(ctx)
if err != nil {
logger.Errorf(ctx, "failed to get node info: %+v", err)
logger.Errorf(ctx, "failed to get node %v info: %+v", node.Name, err)
}
ch <- node
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func (c *Calcium) setAllWorkloadsOnNodeDown(ctx context.Context, nodename string
}

// SetNode set node available or not
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) { // nolint
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) {
logger := log.WithField("Calcium", "SetNode").WithField("opts", opts)
if err := opts.Validate(); err != nil {
return nil, logger.Err(ctx, err)
Expand All @@ -131,18 +131,10 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
opts.Normalize(node)
n = node

n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if !n.Available {
// remove node status
if err := c.store.SetNodeStatus(ctx, node, -1); err != nil {
// don't return here
logger.Errorf(ctx, "[SetNode] failed to remove node status, err: %+v", errors.WithStack(err))
}
}
if opts.WorkloadsDown {
c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename)
}
Expand Down
15 changes: 7 additions & 8 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,35 +160,34 @@ func TestSetNode(t *testing.T) {
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
// failed by no node name
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test1", StatusOpt: 2})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test1"})
assert.Error(t, err)
// failed by updatenode
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 2})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test"})
assert.Error(t, err)
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil)
// succ when node available
n, err := c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 2, Endpoint: "tcp://127.0.0.1:2379"})
n, err := c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", Endpoint: "tcp://127.0.0.1:2379"})
assert.NoError(t, err)
assert.Equal(t, n.Name, name)
assert.Equal(t, n.Endpoint, "tcp://127.0.0.1:2379")
// not available
// can still set node even if ListNodeWorkloads fails
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 0, WorkloadsDown: true})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", WorkloadsDown: true})
assert.NoError(t, err)
workloads := []*types.Workload{{Name: "wrong_name"}, {Name: "a_b_c"}}
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(workloads, nil)
store.On("SetWorkloadStatus",
mock.Anything, mock.Anything, mock.Anything,
).Return(types.ErrNoETCD)
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 0, WorkloadsDown: true})
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", WorkloadsDown: true})
assert.NoError(t, err)
// test modify
setOpts := &types.SetNodeOptions{
Nodename: "test",
StatusOpt: 1,
Labels: map[string]string{"some": "1"},
Nodename: "test",
Labels: map[string]string{"some": "1"},
}
// set label
n, err = c.SetNode(ctx, setOpts)
Expand Down
1 change: 1 addition & 0 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
return
}
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {

return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
Expand Down
214 changes: 214 additions & 0 deletions cluster/calcium/selfmon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package calcium

import (
"context"
"math/rand"
"os/signal"
"syscall"
"time"

"github.com/pkg/errors"

"github.com/projecteru2/core/log"
coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)

// ActiveKey .
const ActiveKey = "/selfmon/active"

// NodeStatusWatcher monitors the changes of node status
type NodeStatusWatcher struct {
id int64
cal *Calcium
}

// NewNodeStatusWatcher .
func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher {
rand.Seed(time.Now().UnixNano())
id := rand.Int63n(10000) // nolint
jschwinger233 marked this conversation as resolved.
Show resolved Hide resolved
return &NodeStatusWatcher{
id: id,
cal: cal,
}
}

func (n *NodeStatusWatcher) run() {
jschwinger233 marked this conversation as resolved.
Show resolved Hide resolved
ctx := n.getSignalContext(context.TODO())
for {
select {
case <-ctx.Done():
return
default:
n.withActiveLock(ctx, func(ctx context.Context) {
if err := n.monitor(ctx); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err)
}
})
time.Sleep(n.cal.config.ConnectionTimeout)
jschwinger233 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context {
exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
defer cancel()
<-exitCtx.Done()
log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id)
}()

return exitCtx
}

// withActiveLock acquires the active lock synchronously
func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

var expiry <-chan struct{}
var unregister func()
defer func() {
if unregister != nil {
log.Infof(ctx, "[Register] %v unregisters", n.id)
unregister()
}
}()

retryCounter := 0

for {
select {
case <-ctx.Done():
log.Info("[Register] context canceled")
return
default:
}

// try to get the lock
if ne, un, err := n.register(ctx); err != nil {
if errors.Is(err, context.Canceled) {
log.Info("[Register] context canceled")
return
} else if !errors.Is(err, coretypes.ErrKeyExists) {
log.Errorf(ctx, "[Register] failed to re-register: %v", err)
time.Sleep(time.Second)
continue
}
if retryCounter == 0 {
log.Infof(ctx, "[Register] %v failed to register, there has been another active node status watcher", n.id)
}
retryCounter = (retryCounter + 1) % 60
jschwinger233 marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(time.Second)
} else {
log.Infof(ctx, "[Register] node status watcher %v has been active", n.id)
expiry = ne
unregister = un
break
}
}

// cancel the ctx when: 1. selfmon closed 2. lost the active lock
go func() {
defer cancel()

select {
case <-ctx.Done():
log.Info("[Register] context canceled")
return
case <-expiry:
log.Info("[Register] lock expired")
return
}
}()

f(ctx)
}

func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) {
return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval)
}

func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
log.Debug(ctx, "[NodeStatusWatcher] init node status started")
nodes := make(chan *coretypes.Node)

go func() {
defer close(nodes)
// Get all nodes which are active status, and regardless of pod.
var err error
var ch <-chan *coretypes.Node
utils.WithTimeout(ctx, n.cal.config.GlobalTimeout, func(ctx context.Context) {
ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{
Podname: "",
Labels: nil,
All: true,
Info: false,
})
if err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err)
return
}
for node := range ch {
log.Debugf(ctx, "[NodeStatusWatcher] watched %s/%s", node.Name, node.Endpoint)
nodes <- node
}
})
if err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err)
return
}
}()

for node := range nodes {
status, err := n.cal.GetNodeStatus(ctx, node.Name)
if err != nil {
status = &coretypes.NodeStatus{
Nodename: node.Name,
Podname: node.Podname,
Alive: false,
}
}
n.dealNodeStatusMessage(ctx, status)
}
}

func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
// init node status first
go n.initNodeStatus(ctx)

// monitor node status
messageChan := n.cal.NodeStatusStream(ctx)
log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id)
defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id)

for {
select {
case message, ok := <-messageChan:
if !ok {
return coretypes.ErrMessageChanClosed
}
go n.dealNodeStatusMessage(ctx, message)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) {
if message.Error != nil {
log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message)
return
}

// TODO maybe we need a distributed lock to control concurrency
opts := &coretypes.SetNodeOptions{
Nodename: message.Nodename,
WorkloadsDown: !message.Alive,
}
if _, err := n.cal.SetNode(ctx, opts); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err)
return
}
log.Infof(ctx, "[NodeStatusWatcher] set node %s as alive: %v", message.Nodename, message.Alive)
}
Loading