Skip to content

Commit

Permalink
migrate selfmon to core
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Dec 30, 2021
1 parent d903237 commit 1100709
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 39 deletions.
6 changes: 6 additions & 0 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Calcium struct {
watcher discovery.Service
wal *WAL
identifier string
selfmon *NodeStatusWatcher
}

// New returns a new cluster config
Expand Down Expand Up @@ -82,6 +83,11 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
cal.selfmon = NewNodeStatusWatcher(cal)

// start node status watcher
go cal.selfmon.run()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}

Expand Down
9 changes: 1 addition & 8 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,11 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
opts.Normalize(node)
n = node

n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
//n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if !n.Available {
// remove node status
if err := c.store.SetNodeStatus(ctx, node, -1); err != nil {
// don't return here
logger.Errorf(ctx, "[SetNode] failed to remove node status, err: %+v", errors.WithStack(err))
}
}
if opts.WorkloadsDown {
c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename)
}
Expand Down
222 changes: 222 additions & 0 deletions cluster/calcium/selfmon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package calcium

import (
"context"
"math/rand"
"os/signal"
"syscall"
"time"

"github.com/pkg/errors"

"github.com/projecteru2/core/log"
coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)

// ActiveKey .
const ActiveKey = "/selfmon/active"

type NodeStatusWatcher struct {
id int64
cal *Calcium
}

func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher {
rand.Seed(time.Now().UnixNano())
id := rand.Int63n(10000)
return &NodeStatusWatcher{
id: id,
cal: cal,
}
}

func (n *NodeStatusWatcher) run() {
ctx := n.getSignalContext(context.TODO())
for {
select {
case <-ctx.Done():
return
default:
n.withActiveLock(ctx, func(ctx context.Context) {
if err := n.watch(ctx); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err)
}
})
time.Sleep(n.cal.config.ConnectionTimeout)
}
}
}

func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context {
exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
defer cancel()
for {
select {
case <-exitCtx.Done():
log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id)
return
}
}
}()

return exitCtx
}

// withActiveLock acquires the active lock synchronously
func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

var expiry <-chan struct{}
var unregister func()
defer func() {
if unregister != nil {
log.Infof(ctx, "[Register] %v unregisters", n.id)
unregister()
}
}()

retryCounter := 0

for {
select {
case <-ctx.Done():
log.Info("[Register] context canceled")
return
default:
}

// try to get the lock
if ne, un, err := n.register(ctx); err != nil {
if errors.Is(err, context.Canceled) {
log.Info("[Register] context canceled")
return
} else if !errors.Is(err, coretypes.ErrKeyExists) {
log.Errorf(ctx, "[Register] failed to re-register: %v", err)
time.Sleep(time.Second)
continue
}
if retryCounter == 0 {
log.Infof(ctx, "[Register] %v there has been another active node status watcher", n.id)
}
retryCounter = (retryCounter + 1) % 60
time.Sleep(time.Second)
} else {
log.Infof(ctx, "[Register] node status watcher %v has been active", n.id)
retryCounter = 0
expiry = ne
unregister = un
break
}
}

// cancel the ctx when: 1. selfmon closed 2. lost the active lock
go func() {
defer cancel()

select {
case <-ctx.Done():
log.Info("[Register] context canceled")
return
case <-expiry:
log.Info("[Register] lock expired")
return
}
}()

f(ctx)
}

func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) {
return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval)
}

func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
log.Debug(ctx, "[NodeStatusWatcher] init node status started")
nodes := make(chan *coretypes.Node)

go func() {
defer close(nodes)
// Get all nodes which are active status, and regardless of pod.
var err error
var ch <-chan *coretypes.Node
utils.WithTimeout(ctx, n.cal.config.ConnectionTimeout, func(ctx context.Context) {
ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{
Podname: "",
Labels: nil,
All: true,
Info: false,
})
if err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err)
return
}
for node := range ch {
log.Debugf(ctx, "[NodeStatusWatcher] watched %s/%s", node.Name, node.Endpoint)
nodes <- node
}
})
if err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err)
return
}
}()

for node := range nodes {
status, err := n.cal.GetNodeStatus(ctx, node.Name)
if err != nil {
status = &coretypes.NodeStatus{
Nodename: node.Name,
Podname: node.Podname,
Alive: false,
}
}
n.dealNodeStatusMessage(ctx, status)
}
}

func (n *NodeStatusWatcher) watch(ctx context.Context) error {
// init node status first
go n.initNodeStatus(ctx)

// watch node status
messageChan := n.cal.NodeStatusStream(ctx)
log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id)
defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id)

for {
select {
case message, ok := <-messageChan:
if !ok {
return coretypes.ErrMessageChanClosed
}
go n.dealNodeStatusMessage(ctx, message)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) {
if message.Error != nil {
log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message)
return
}

// TODO maybe we need a distributed lock to control concurrency
opts := &coretypes.SetNodeOptions{
Nodename: message.Nodename,
StatusOpt: coretypes.TriTrue,
WorkloadsDown: !message.Alive,
}
if !message.Alive {
opts.StatusOpt = coretypes.TriFalse
}
if _, err := n.cal.SetNode(ctx, opts); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err)
return
}
log.Infof(ctx, "[NodeStatusWatcher] set node %s as alive: %v", message.Nodename, message.Alive)
}
11 changes: 6 additions & 5 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ func (v *Vibranium) ListPodNodes(ctx context.Context, opts *pb.ListNodesOptions)
ctx = v.taskAdd(ctx, "ListPodNodes", false)
defer v.taskDone(ctx, "ListPodNodes", false)

// default timeout is 10s
timeout := time.Duration(opts.TimeoutInSecond) * time.Second
if opts.TimeoutInSecond <= 0 {
opts.TimeoutInSecond = 10
timeout = v.config.ConnectionTimeout
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(opts.TimeoutInSecond)*time.Second)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

ch, err := v.cluster.ListPodNodes(ctx, toCoreListNodesOptions(opts))
Expand All @@ -223,10 +223,11 @@ func (v *Vibranium) PodNodesStream(opts *pb.ListNodesOptions, stream pb.CoreRPC_
ctx := v.taskAdd(stream.Context(), "PodNodesStream", false)
defer v.taskDone(ctx, "PodNodesStream", false)

timeout := time.Duration(opts.TimeoutInSecond) * time.Second
if opts.TimeoutInSecond <= 0 {
opts.TimeoutInSecond = 10
timeout = v.config.ConnectionTimeout
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(opts.TimeoutInSecond)*time.Second)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

ch, err := v.cluster.ListPodNodes(ctx, toCoreListNodesOptions(opts))
Expand Down
23 changes: 15 additions & 8 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"

"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -300,19 +299,27 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
node.Init()
nodes = append(nodes, node)
}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
go func(node *types.Node) {
defer wg.Done()

pool := utils.NewGoroutinePool(int(m.config.MaxConcurrency))

for _, n := range nodes {
node := n
pool.Go(ctx, func() {
_, err := m.GetNodeStatus(ctx, node.Name)
if err != nil && !errors.Is(err, types.ErrBadCount) {
log.Errorf(ctx, "[doGetNodes] failed to get node status of %v, err: %v", node.Name, err)
}
node.Available = err == nil

if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) {
if node.Engine, err = m.makeClient(ctx, node); err != nil {
return
}
}
}(node)
})
}
wg.Wait()
pool.Wait(ctx)

return nodes, nil
}

Expand Down
28 changes: 21 additions & 7 deletions store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,29 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels
return nil, err
}
node.Init()
if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) {
engine, err := r.makeClient(ctx, node)
if err != nil {
return nil, err
nodes = append(nodes, node)
}

pool := utils.NewGoroutinePool(int(r.config.MaxConcurrency))

for _, n := range nodes {
node := n
pool.Go(ctx, func() {
_, err := r.GetNodeStatus(ctx, node.Name)
if err != nil && !isRedisNoKeyError(err) {
log.Errorf(ctx, "[doGetNodes] failed to get node status of %v, err: %v", node.Name, err)
}
node.Engine = engine
nodes = append(nodes, node)
}
node.Available = err == nil

if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) {
if node.Engine, err = r.makeClient(ctx, node); err != nil {
return
}
}
})
}
pool.Wait(ctx)

return nodes, nil
}

Expand Down
21 changes: 11 additions & 10 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ const (

// Config holds eru-core config
type Config struct {
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
ConnectionTimeout time.Duration `yaml:"connection_timeout" default:"10s"` // timeout for connections
Statsd string `yaml:"statsd"` // statsd host and port
Profile string `yaml:"profile"` // profile ip:port
CertPath string `yaml:"cert_path"` // docker cert files path
MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time
Store string `yaml:"store" default:"etcd"` // store type
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
ConnectionTimeout time.Duration `yaml:"connection_timeout" default:"10s"` // timeout for connections
HAKeepaliveInterval time.Duration `yaml:"ha_keepalive_interval" default:"16s"` // interval for node status watcher
Statsd string `yaml:"statsd"` // statsd host and port
Profile string `yaml:"profile"` // profile ip:port
CertPath string `yaml:"cert_path"` // docker cert files path
MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time
Store string `yaml:"store" default:"etcd"` // store type

Auth AuthConfig `yaml:"auth"` // grpc auth
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
Expand Down
Loading

0 comments on commit 1100709

Please sign in to comment.