diff --git a/cluster/calcium/calcium.go b/cluster/calcium/calcium.go index 9bad18ee9..69f1f4e3b 100644 --- a/cluster/calcium/calcium.go +++ b/cluster/calcium/calcium.go @@ -31,6 +31,7 @@ type Calcium struct { watcher discovery.Service wal *WAL identifier string + selfmon *NodeStatusWatcher } // New returns a new cluster config @@ -82,6 +83,11 @@ func New(config types.Config, t *testing.T) (*Calcium, error) { cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher} cal.wal, err = newCalciumWAL(cal) cal.identifier = config.Identifier() + cal.selfmon = NewNodeStatusWatcher(cal) + + // start node status watcher + go cal.selfmon.run() + return cal, logger.Err(nil, errors.WithStack(err)) //nolint } diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index 30c15a048..4007f2316 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -131,18 +131,11 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ opts.Normalize(node) n = node - n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available) + //n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available) n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass) if n.IsDown() { log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename) } - if !n.Available { - // remove node status - if err := c.store.SetNodeStatus(ctx, node, -1); err != nil { - // don't return here - logger.Errorf(ctx, "[SetNode] failed to remove node status, err: %+v", errors.WithStack(err)) - } - } if opts.WorkloadsDown { c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename) } diff --git a/cluster/calcium/selfmon.go b/cluster/calcium/selfmon.go new file mode 100644 index 000000000..168bf5d3b --- /dev/null +++ b/cluster/calcium/selfmon.go @@ -0,0 +1,222 @@ +package calcium + +import ( + "context" + "math/rand" + "os/signal" + "syscall" + "time" + + "github.com/pkg/errors" + + "github.com/projecteru2/core/log" + coretypes "github.com/projecteru2/core/types" + "github.com/projecteru2/core/utils" +) + +// ActiveKey . +const ActiveKey = "/selfmon/active" + +type NodeStatusWatcher struct { + id int64 + cal *Calcium +} + +func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher { + rand.Seed(time.Now().UnixNano()) + id := rand.Int63n(10000) + return &NodeStatusWatcher{ + id: id, + cal: cal, + } +} + +func (n *NodeStatusWatcher) run() { + ctx := n.getSignalContext(context.TODO()) + for { + select { + case <-ctx.Done(): + return + default: + n.withActiveLock(ctx, func(ctx context.Context) { + if err := n.watch(ctx); err != nil { + log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err) + } + }) + time.Sleep(n.cal.config.ConnectionTimeout) + } + } +} + +func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context { + exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + go func() { + defer cancel() + for { + select { + case <-exitCtx.Done(): + log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id) + return + } + } + }() + + return exitCtx +} + +// withActiveLock acquires the active lock synchronously +func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) { + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + var expiry <-chan struct{} + var unregister func() + defer func() { + if unregister != nil { + log.Infof(ctx, "[Register] %v unregisters", n.id) + unregister() + } + }() + + retryCounter := 0 + + for { + select { + case <-ctx.Done(): + log.Info("[Register] context canceled") + return + default: + } + + // try to get the lock + if ne, un, err := n.register(ctx); err != nil { + if errors.Is(err, context.Canceled) { + log.Info("[Register] context canceled") + return + } else if !errors.Is(err, coretypes.ErrKeyExists) { + log.Errorf(ctx, "[Register] failed to re-register: %v", err) + time.Sleep(time.Second) + continue + } + if retryCounter == 0 { + log.Infof(ctx, "[Register] %v there has been another active node status watcher", n.id) + } + retryCounter = (retryCounter + 1) % 60 + time.Sleep(time.Second) + } else { + log.Infof(ctx, "[Register] node status watcher %v has been active", n.id) + retryCounter = 0 + expiry = ne + unregister = un + break + } + } + + // cancel the ctx when: 1. selfmon closed 2. lost the active lock + go func() { + defer cancel() + + select { + case <-ctx.Done(): + log.Info("[Register] context canceled") + return + case <-expiry: + log.Info("[Register] lock expired") + return + } + }() + + f(ctx) +} + +func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) { + return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval) +} + +func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) { + log.Debug(ctx, "[NodeStatusWatcher] init node status started") + nodes := make(chan *coretypes.Node) + + go func() { + defer close(nodes) + // Get all nodes which are active status, and regardless of pod. + var err error + var ch <-chan *coretypes.Node + utils.WithTimeout(ctx, n.cal.config.ConnectionTimeout, func(ctx context.Context) { + ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{ + Podname: "", + Labels: nil, + All: true, + Info: false, + }) + if err != nil { + log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err) + return + } + for node := range ch { + log.Debugf(ctx, "[NodeStatusWatcher] watched %s/%s", node.Name, node.Endpoint) + nodes <- node + } + }) + if err != nil { + log.Errorf(ctx, "[NodeStatusWatcher] get pod nodes failed %v", err) + return + } + }() + + for node := range nodes { + status, err := n.cal.GetNodeStatus(ctx, node.Name) + if err != nil { + status = &coretypes.NodeStatus{ + Nodename: node.Name, + Podname: node.Podname, + Alive: false, + } + } + n.dealNodeStatusMessage(ctx, status) + } +} + +func (n *NodeStatusWatcher) watch(ctx context.Context) error { + // init node status first + go n.initNodeStatus(ctx) + + // watch node status + messageChan := n.cal.NodeStatusStream(ctx) + log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id) + defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id) + + for { + select { + case message, ok := <-messageChan: + if !ok { + return coretypes.ErrMessageChanClosed + } + go n.dealNodeStatusMessage(ctx, message) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) { + if message.Error != nil { + log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message) + return + } + + // TODO maybe we need a distributed lock to control concurrency + opts := &coretypes.SetNodeOptions{ + Nodename: message.Nodename, + StatusOpt: coretypes.TriTrue, + WorkloadsDown: !message.Alive, + } + if !message.Alive { + opts.StatusOpt = coretypes.TriFalse + } + if _, err := n.cal.SetNode(ctx, opts); err != nil { + log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err) + return + } + log.Infof(ctx, "[NodeStatusWatcher] set node %s as alive: %v", message.Nodename, message.Alive) +} diff --git a/rpc/rpc.go b/rpc/rpc.go index 5ce9c376c..912b4fb69 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -198,11 +198,11 @@ func (v *Vibranium) ListPodNodes(ctx context.Context, opts *pb.ListNodesOptions) ctx = v.taskAdd(ctx, "ListPodNodes", false) defer v.taskDone(ctx, "ListPodNodes", false) - // default timeout is 10s + timeout := time.Duration(opts.TimeoutInSecond) * time.Second if opts.TimeoutInSecond <= 0 { - opts.TimeoutInSecond = 10 + timeout = v.config.ConnectionTimeout } - ctx, cancel := context.WithTimeout(ctx, time.Duration(opts.TimeoutInSecond)*time.Second) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() ch, err := v.cluster.ListPodNodes(ctx, toCoreListNodesOptions(opts)) @@ -223,10 +223,11 @@ func (v *Vibranium) PodNodesStream(opts *pb.ListNodesOptions, stream pb.CoreRPC_ ctx := v.taskAdd(stream.Context(), "PodNodesStream", false) defer v.taskDone(ctx, "PodNodesStream", false) + timeout := time.Duration(opts.TimeoutInSecond) * time.Second if opts.TimeoutInSecond <= 0 { - opts.TimeoutInSecond = 10 + timeout = v.config.ConnectionTimeout } - ctx, cancel := context.WithTimeout(ctx, time.Duration(opts.TimeoutInSecond)*time.Second) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() ch, err := v.cluster.ListPodNodes(ctx, toCoreListNodesOptions(opts)) diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index f7f6ca29a..57cd37be1 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -7,7 +7,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "github.com/pkg/errors" "go.etcd.io/etcd/api/v3/mvccpb" @@ -300,19 +299,27 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels node.Init() nodes = append(nodes, node) } - wg := &sync.WaitGroup{} - wg.Add(len(nodes)) - for _, node := range nodes { - go func(node *types.Node) { - defer wg.Done() + + pool := utils.NewGoroutinePool(int(m.config.MaxConcurrency)) + + for _, n := range nodes { + node := n + pool.Go(ctx, func() { + _, err := m.GetNodeStatus(ctx, node.Name) + if err != nil && !errors.Is(err, types.ErrBadCount) { + log.Errorf(ctx, "[doGetNodes] failed to get node status of %v, err: %v", node.Name, err) + } + node.Available = err == nil + if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) { if node.Engine, err = m.makeClient(ctx, node); err != nil { return } } - }(node) + }) } - wg.Wait() + pool.Wait(ctx) + return nodes, nil } diff --git a/store/redis/node.go b/store/redis/node.go index c273203ba..b33f9186b 100644 --- a/store/redis/node.go +++ b/store/redis/node.go @@ -287,15 +287,29 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels return nil, err } node.Init() - if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) { - engine, err := r.makeClient(ctx, node) - if err != nil { - return nil, err + nodes = append(nodes, node) + } + + pool := utils.NewGoroutinePool(int(r.config.MaxConcurrency)) + + for _, n := range nodes { + node := n + pool.Go(ctx, func() { + _, err := r.GetNodeStatus(ctx, node.Name) + if err != nil && !isRedisNoKeyError(err) { + log.Errorf(ctx, "[doGetNodes] failed to get node status of %v, err: %v", node.Name, err) } - node.Engine = engine - nodes = append(nodes, node) - } + node.Available = err == nil + + if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) { + if node.Engine, err = r.makeClient(ctx, node); err != nil { + return + } + } + }) } + pool.Wait(ctx) + return nodes, nil } diff --git a/types/config.go b/types/config.go index 39c0794d4..7bce93d1a 100644 --- a/types/config.go +++ b/types/config.go @@ -17,16 +17,17 @@ const ( // Config holds eru-core config type Config struct { - LogLevel string `yaml:"log_level" required:"true" default:"INFO"` - Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address - LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl) - GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second - ConnectionTimeout time.Duration `yaml:"connection_timeout" default:"10s"` // timeout for connections - Statsd string `yaml:"statsd"` // statsd host and port - Profile string `yaml:"profile"` // profile ip:port - CertPath string `yaml:"cert_path"` // docker cert files path - MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time - Store string `yaml:"store" default:"etcd"` // store type + LogLevel string `yaml:"log_level" required:"true" default:"INFO"` + Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address + LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl) + GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second + ConnectionTimeout time.Duration `yaml:"connection_timeout" default:"10s"` // timeout for connections + HAKeepaliveInterval time.Duration `yaml:"ha_keepalive_interval" default:"16s"` // interval for node status watcher + Statsd string `yaml:"statsd"` // statsd host and port + Profile string `yaml:"profile"` // profile ip:port + CertPath string `yaml:"cert_path"` // docker cert files path + MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time + Store string `yaml:"store" default:"etcd"` // store type Auth AuthConfig `yaml:"auth"` // grpc auth GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config diff --git a/types/errors.go b/types/errors.go index 26417b230..1da3922cb 100644 --- a/types/errors.go +++ b/types/errors.go @@ -92,6 +92,8 @@ var ( ErrLockSessionDone = errors.New("lock session done") ErrRollbackMapIsNotEmpty = errors.New("rollback map is not empty") + + ErrMessageChanClosed = errors.New("message chan closed") ) type detailedErr struct { diff --git a/types/node.go b/types/node.go index e02e599d2..0a4fec1c6 100644 --- a/types/node.go +++ b/types/node.go @@ -66,7 +66,7 @@ type Node struct { // Bypass if bypass is true, it will not participate in future scheduling Bypass bool `json:"bypass,omitempty"` - Available bool `json:"available"` + Available bool `json:"-"` Engine engine.API `json:"-"` }