Skip to content

Commit

Permalink
migrate selfmon to core
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Dec 30, 2021
1 parent d903237 commit 1b77385
Show file tree
Hide file tree
Showing 13 changed files with 1,416 additions and 1,195 deletions.
6 changes: 6 additions & 0 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Calcium struct {
watcher discovery.Service
wal *WAL
identifier string
selfmon *NodeStatusWatcher
}

// New returns a new cluster config
Expand Down Expand Up @@ -82,6 +83,11 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
cal.selfmon = NewNodeStatusWatcher(cal)

// start node status watcher
go cal.selfmon.run()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}

Expand Down
10 changes: 1 addition & 9 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *Calcium) setAllWorkloadsOnNodeDown(ctx context.Context, nodename string
}

// SetNode set node available or not
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) { // nolint
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) {
logger := log.WithField("Calcium", "SetNode").WithField("opts", opts)
if err := opts.Validate(); err != nil {
return nil, logger.Err(ctx, err)
Expand All @@ -131,18 +131,10 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
opts.Normalize(node)
n = node

n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if !n.Available {
// remove node status
if err := c.store.SetNodeStatus(ctx, node, -1); err != nil {
// don't return here
logger.Errorf(ctx, "[SetNode] failed to remove node status, err: %+v", errors.WithStack(err))
}
}
if opts.WorkloadsDown {
c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename)
}
Expand Down
212 changes: 212 additions & 0 deletions cluster/calcium/selfmon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package calcium

import (
"context"
"math/rand"
"os/signal"
"syscall"
"time"

"github.com/pkg/errors"

"github.com/projecteru2/core/log"
coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)

// ActiveKey .
const ActiveKey = "/selfmon/active"

type NodeStatusWatcher struct {
id int64
cal *Calcium
}

func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher {
rand.Seed(time.Now().UnixNano())
id := rand.Int63n(10000) // nolint
return &NodeStatusWatcher{
id: id,
cal: cal,
}
}

func (n *NodeStatusWatcher) run() {
ctx := n.getSignalContext(context.TODO())
for {
select {
case <-ctx.Done():
return
default:
n.withActiveLock(ctx, func(ctx context.Context) {
if err := n.watch(ctx); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err)
}
})
time.Sleep(n.cal.config.ConnectionTimeout)
}
}
}

func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context {
exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
defer cancel()
<-exitCtx.Done()
log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id)
}()

return exitCtx
}

// withActiveLock acquires the active lock synchronously
func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

var expiry <-chan struct{}
var unregister func()
defer func() {
if unregister != nil {
log.Infof(ctx, "[Register] %v unregisters", n.id)
unregister()
}
}()

retryCounter := 0

for {
select {
case <-ctx.Done():
log.Info("[Register] context canceled")
return
default:
}

// try to get the lock
if ne, un, err := n.register(ctx); err != nil {
if errors.Is(err, context.Canceled) {
log.Info("[Register] context canceled")
return
} else if !errors.Is(err, coretypes.ErrKeyExists) {
log.Errorf(ctx, "[Register] failed to re-register: %v", err)
time.Sleep(time.Second)
continue
}
if retryCounter == 0 {
log.Infof(ctx, "[Register] %v there has been another active node status watcher", n.id)
}
retryCounter = (retryCounter + 1) % 60
time.Sleep(time.Second)
} else {
log.Infof(ctx, "[Register] node status watcher %v has been active", n.id)
expiry = ne
unregister = un
break
}
}

// cancel the ctx when: 1. selfmon closed 2. lost the active lock
go func() {
defer cancel()

select {
case <-ctx.Done():
log.Info("[Register] context canceled")
return
case <-expiry:
log.Info("[Register] lock expired")
return
}
}()

f(ctx)
}

func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) {
return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval)
}

func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
log.Debug(ctx, "[NodeStatusWatcher] init node status started")
nodes := make(chan *coretypes.Node)

go func() {
defer close(nodes)
// Get all nodes which are active status, and regardless of pod.
var err error
var ch <-chan *coretypes.Node
utils.WithTimeout(ctx, n.cal.config.ConnectionTimeout, func(ctx context.Context) {
ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{
Podname: "",
Labels: nil,
All: true,
Info: false,
})
if err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err)
return
}
for node := range ch {
log.Debugf(ctx, "[NodeStatusWatcher] watched %s/%s", node.Name, node.Endpoint)
nodes <- node
}
})
if err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err)
return
}
}()

for node := range nodes {
status, err := n.cal.GetNodeStatus(ctx, node.Name)
if err != nil {
status = &coretypes.NodeStatus{
Nodename: node.Name,
Podname: node.Podname,
Alive: false,
}
}
n.dealNodeStatusMessage(ctx, status)
}
}

func (n *NodeStatusWatcher) watch(ctx context.Context) error {
// init node status first
go n.initNodeStatus(ctx)

// watch node status
messageChan := n.cal.NodeStatusStream(ctx)
log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id)
defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id)

for {
select {
case message, ok := <-messageChan:
if !ok {
return coretypes.ErrMessageChanClosed
}
go n.dealNodeStatusMessage(ctx, message)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) {
if message.Error != nil {
log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message)
return
}

// TODO maybe we need a distributed lock to control concurrency
opts := &coretypes.SetNodeOptions{
Nodename: message.Nodename,
WorkloadsDown: !message.Alive,
}
if _, err := n.cal.SetNode(ctx, opts); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err)
return
}
log.Infof(ctx, "[NodeStatusWatcher] set node %s as alive: %v", message.Nodename, message.Alive)
}
Loading

0 comments on commit 1b77385

Please sign in to comment.