-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
17 changed files
with
1,433 additions
and
1,206 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
package calcium | ||
|
||
import ( | ||
"context" | ||
"math/rand" | ||
"os/signal" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/projecteru2/core/log" | ||
coretypes "github.com/projecteru2/core/types" | ||
"github.com/projecteru2/core/utils" | ||
) | ||
|
||
// ActiveKey . | ||
const ActiveKey = "/selfmon/active" | ||
|
||
// NodeStatusWatcher monitors the changes of node status | ||
type NodeStatusWatcher struct { | ||
id int64 | ||
cal *Calcium | ||
} | ||
|
||
// NewNodeStatusWatcher . | ||
func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher { | ||
rand.Seed(time.Now().UnixNano()) | ||
id := rand.Int63n(10000) // nolint | ||
return &NodeStatusWatcher{ | ||
id: id, | ||
cal: cal, | ||
} | ||
} | ||
|
||
func (n *NodeStatusWatcher) run() { | ||
ctx := n.getSignalContext(context.TODO()) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
n.withActiveLock(ctx, func(ctx context.Context) { | ||
if err := n.watch(ctx); err != nil { | ||
log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err) | ||
} | ||
}) | ||
time.Sleep(n.cal.config.ConnectionTimeout) | ||
} | ||
} | ||
} | ||
|
||
func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context { | ||
exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) | ||
go func() { | ||
defer cancel() | ||
<-exitCtx.Done() | ||
log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id) | ||
}() | ||
|
||
return exitCtx | ||
} | ||
|
||
// withActiveLock acquires the active lock synchronously | ||
func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) { | ||
ctx, cancel := context.WithCancel(parentCtx) | ||
defer cancel() | ||
|
||
var expiry <-chan struct{} | ||
var unregister func() | ||
defer func() { | ||
if unregister != nil { | ||
log.Infof(ctx, "[Register] %v unregisters", n.id) | ||
unregister() | ||
} | ||
}() | ||
|
||
retryCounter := 0 | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
log.Info("[Register] context canceled") | ||
return | ||
default: | ||
} | ||
|
||
// try to get the lock | ||
if ne, un, err := n.register(ctx); err != nil { | ||
if errors.Is(err, context.Canceled) { | ||
log.Info("[Register] context canceled") | ||
return | ||
} else if !errors.Is(err, coretypes.ErrKeyExists) { | ||
log.Errorf(ctx, "[Register] failed to re-register: %v", err) | ||
time.Sleep(time.Second) | ||
continue | ||
} | ||
if retryCounter == 0 { | ||
log.Infof(ctx, "[Register] %v there has been another active node status watcher", n.id) | ||
} | ||
retryCounter = (retryCounter + 1) % 60 | ||
time.Sleep(time.Second) | ||
} else { | ||
log.Infof(ctx, "[Register] node status watcher %v has been active", n.id) | ||
expiry = ne | ||
unregister = un | ||
break | ||
} | ||
} | ||
|
||
// cancel the ctx when: 1. selfmon closed 2. lost the active lock | ||
go func() { | ||
defer cancel() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
log.Info("[Register] context canceled") | ||
return | ||
case <-expiry: | ||
log.Info("[Register] lock expired") | ||
return | ||
} | ||
}() | ||
|
||
f(ctx) | ||
} | ||
|
||
func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) { | ||
return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval) | ||
} | ||
|
||
func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) { | ||
log.Debug(ctx, "[NodeStatusWatcher] init node status started") | ||
nodes := make(chan *coretypes.Node) | ||
|
||
go func() { | ||
defer close(nodes) | ||
// Get all nodes which are active status, and regardless of pod. | ||
var err error | ||
var ch <-chan *coretypes.Node | ||
utils.WithTimeout(ctx, n.cal.config.ConnectionTimeout, func(ctx context.Context) { | ||
ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{ | ||
Podname: "", | ||
Labels: nil, | ||
All: true, | ||
Info: false, | ||
}) | ||
if err != nil { | ||
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err) | ||
return | ||
} | ||
for node := range ch { | ||
log.Debugf(ctx, "[NodeStatusWatcher] watched %s/%s", node.Name, node.Endpoint) | ||
nodes <- node | ||
} | ||
}) | ||
if err != nil { | ||
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err) | ||
return | ||
} | ||
}() | ||
|
||
for node := range nodes { | ||
status, err := n.cal.GetNodeStatus(ctx, node.Name) | ||
if err != nil { | ||
status = &coretypes.NodeStatus{ | ||
Nodename: node.Name, | ||
Podname: node.Podname, | ||
Alive: false, | ||
} | ||
} | ||
n.dealNodeStatusMessage(ctx, status) | ||
} | ||
} | ||
|
||
func (n *NodeStatusWatcher) watch(ctx context.Context) error { | ||
// init node status first | ||
go n.initNodeStatus(ctx) | ||
|
||
// watch node status | ||
messageChan := n.cal.NodeStatusStream(ctx) | ||
log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id) | ||
defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id) | ||
|
||
for { | ||
select { | ||
case message, ok := <-messageChan: | ||
if !ok { | ||
return coretypes.ErrMessageChanClosed | ||
} | ||
go n.dealNodeStatusMessage(ctx, message) | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) { | ||
if message.Error != nil { | ||
log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message) | ||
return | ||
} | ||
|
||
// TODO maybe we need a distributed lock to control concurrency | ||
opts := &coretypes.SetNodeOptions{ | ||
Nodename: message.Nodename, | ||
WorkloadsDown: !message.Alive, | ||
} | ||
if _, err := n.cal.SetNode(ctx, opts); err != nil { | ||
log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err) | ||
return | ||
} | ||
log.Infof(ctx, "[NodeStatusWatcher] set node %s as alive: %v", message.Nodename, message.Alive) | ||
} |
Oops, something went wrong.