Skip to content

Commit

Permalink
move selfmon (#540)
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL authored Jan 20, 2022
1 parent 6e331fc commit 0deb6e7
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 84 deletions.
23 changes: 3 additions & 20 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/projecteru2/core/source/github"
"github.com/projecteru2/core/source/gitlab"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/store/etcdv3"
"github.com/projecteru2/core/store/redis"
"github.com/projecteru2/core/types"

"github.com/pkg/errors"
Expand All @@ -31,27 +29,16 @@ type Calcium struct {
watcher discovery.Service
wal *WAL
identifier string
selfmon *NodeStatusWatcher
}

// New returns a new cluster config
func New(config types.Config, t *testing.T) (*Calcium, error) {
logger := log.WithField("Calcium", "New").WithField("config", config)

// set store
var store store.Store
var err error
switch config.Store {
case types.Redis:
store, err = redis.New(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}
default:
store, err = etcdv3.New(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}
store, err := store.NewStore(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}

// set scheduler
Expand Down Expand Up @@ -83,10 +70,6 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
cal.selfmon = NewNodeStatusWatcher(cal)

// start node status watcher
go cal.selfmon.run()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}
Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

Expand Down Expand Up @@ -34,7 +33,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
ctx,
// if
func(ctx context.Context) (err error) {
if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr); err == nil {
if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionIncr); err == nil {
log.Infof(ctx, "[DissociateWorkload] Workload %s dissociated", workload.ID)
}
return errors.WithStack(err)
Expand All @@ -48,7 +47,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
if failedByCond {
return nil
}
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionDecr))
},
c.config.GlobalTimeout,
)
Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

Expand Down Expand Up @@ -42,7 +41,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
ctx,
// if
func(ctx context.Context) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionIncr))
},
// then
func(ctx context.Context) (err error) {
Expand All @@ -56,7 +55,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
if failedByCond {
return nil
}
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionDecr))
},
c.config.GlobalTimeout,
)
Expand Down
4 changes: 4 additions & 0 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/rpc"
pb "github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/selfmon"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/version"

Expand Down Expand Up @@ -133,6 +134,9 @@ func serve(c *cli.Context) error {
ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()

// start node status checker
go selfmon.RunNodeStatusWatcher(ctx, config, cluster, t)

// start engine cache checker
go factory.EngineCacheChecker(ctx, config.ConnectionTimeout)

Expand Down
2 changes: 1 addition & 1 deletion engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
)

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8]
return fmt.Sprintf("%v-%v", endpoint, utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8])
}

// EngineCacheChecker checks if the engine in cache is available
Expand Down
76 changes: 38 additions & 38 deletions cluster/calcium/selfmon.go → selfmon/selfmon.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package calcium
package selfmon

import (
"context"
"math/rand"
"os/signal"
"syscall"
"testing"
"time"

"github.com/pkg/errors"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)

Expand All @@ -19,22 +20,32 @@ const ActiveKey = "/selfmon/active"

// NodeStatusWatcher monitors the changes of node status
type NodeStatusWatcher struct {
id int64
cal *Calcium
id int64
config types.Config
cluster cluster.Cluster
store store.Store
}

// NewNodeStatusWatcher .
func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher {
// RunNodeStatusWatcher .
func RunNodeStatusWatcher(ctx context.Context, config types.Config, cluster cluster.Cluster, t *testing.T) {
rand.Seed(time.Now().UnixNano())
id := rand.Int63n(10000) // nolint
return &NodeStatusWatcher{
id: id,
cal: cal,
store, err := store.NewStore(config, t)
if err != nil {
log.Errorf(context.TODO(), "[RunNodeStatusWatcher] %v failed to create store, err: %v", id, err)
return
}

watcher := &NodeStatusWatcher{
id: id,
config: config,
store: store,
cluster: cluster,
}
watcher.run(ctx)
}

func (n *NodeStatusWatcher) run() {
ctx := n.getSignalContext(context.TODO())
func (n *NodeStatusWatcher) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
Expand All @@ -45,22 +56,11 @@ func (n *NodeStatusWatcher) run() {
log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err)
}
})
time.Sleep(n.cal.config.ConnectionTimeout)
time.Sleep(n.config.ConnectionTimeout)
}
}
}

func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context {
exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
defer cancel()
<-exitCtx.Done()
log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id)
}()

return exitCtx
}

// withActiveLock acquires the active lock synchronously
func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) {
ctx, cancel := context.WithCancel(parentCtx)
Expand Down Expand Up @@ -90,7 +90,7 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
if errors.Is(err, context.Canceled) {
log.Info("[Register] context canceled")
return
} else if !errors.Is(err, coretypes.ErrKeyExists) {
} else if !errors.Is(err, types.ErrKeyExists) {
log.Errorf(ctx, "[Register] failed to re-register: %v", err)
time.Sleep(time.Second)
continue
Expand Down Expand Up @@ -126,20 +126,20 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
}

func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) {
return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval)
return n.store.StartEphemeral(ctx, ActiveKey, n.config.HAKeepaliveInterval)
}

func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
log.Debug(ctx, "[NodeStatusWatcher] init node status started")
nodes := make(chan *coretypes.Node)
nodes := make(chan *types.Node)

go func() {
defer close(nodes)
// Get all nodes which are active status, and regardless of pod.
var err error
var ch <-chan *coretypes.Node
utils.WithTimeout(ctx, n.cal.config.GlobalTimeout, func(ctx context.Context) {
ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{
var ch <-chan *types.Node
utils.WithTimeout(ctx, n.config.GlobalTimeout, func(ctx context.Context) {
ch, err = n.cluster.ListPodNodes(ctx, &types.ListNodesOptions{
Podname: "",
Labels: nil,
All: true,
Expand All @@ -161,9 +161,9 @@ func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
}()

for node := range nodes {
status, err := n.cal.GetNodeStatus(ctx, node.Name)
status, err := n.cluster.GetNodeStatus(ctx, node.Name)
if err != nil {
status = &coretypes.NodeStatus{
status = &types.NodeStatus{
Nodename: node.Name,
Podname: node.Podname,
Alive: false,
Expand All @@ -178,15 +178,15 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
go n.initNodeStatus(ctx)

// monitor node status
messageChan := n.cal.NodeStatusStream(ctx)
messageChan := n.cluster.NodeStatusStream(ctx)
log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id)
defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id)

for {
select {
case message, ok := <-messageChan:
if !ok {
return coretypes.ErrMessageChanClosed
return types.ErrMessageChanClosed
}
go n.dealNodeStatusMessage(ctx, message)
case <-ctx.Done():
Expand All @@ -195,18 +195,18 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
}
}

func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) {
func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *types.NodeStatus) {
if message.Error != nil {
log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message)
return
}

// TODO maybe we need a distributed lock to control concurrency
opts := &coretypes.SetNodeOptions{
opts := &types.SetNodeOptions{
Nodename: message.Nodename,
WorkloadsDown: !message.Alive,
}
if _, err := n.cal.SetNode(ctx, opts); err != nil {
if _, err := n.cluster.SetNode(ctx, opts); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err)
return
}
Expand Down
5 changes: 2 additions & 3 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down Expand Up @@ -178,9 +177,9 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
// UpdateNodeResource update cpu and memory on a node, either add or subtract
func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, resource *types.ResourceMeta, action string) error {
switch action {
case store.ActionIncr:
case types.ActionIncr:
node.RecycleResources(resource)
case store.ActionDecr:
case types.ActionDecr:
node.PreserveResources(resource)
default:
return types.ErrUnknownControlType
Expand Down
5 changes: 2 additions & 3 deletions store/etcdv3/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -223,8 +222,8 @@ func TestUpdateNodeResource(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, node.Name, "test")
assert.Error(t, m.UpdateNodeResource(ctx, node, nil, "wtf"))
assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionIncr))
assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionDecr))
assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionIncr))
assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionDecr))
}

func TestExtractNodename(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

Expand Down Expand Up @@ -169,9 +168,9 @@ func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error
// UpdateNodeResource update cpu and memory on a node, either add or subtract
func (r *Rediaron) UpdateNodeResource(ctx context.Context, node *types.Node, resource *types.ResourceMeta, action string) error {
switch action {
case store.ActionIncr:
case types.ActionIncr:
node.RecycleResources(resource)
case store.ActionDecr:
case types.ActionDecr:
node.PreserveResources(resource)
default:
return types.ErrUnknownControlType
Expand Down
5 changes: 2 additions & 3 deletions store/redis/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"path/filepath"
"time"

"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
)

Expand Down Expand Up @@ -213,8 +212,8 @@ func (s *RediaronTestSuite) TestUpdateNodeResource() {
s.NoError(err)
s.Equal(node.Name, "test")
s.Error(s.rediaron.UpdateNodeResource(ctx, node, nil, "wtf"))
s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionIncr))
s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionDecr))
s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionIncr))
s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionDecr))
}

func (s *RediaronTestSuite) TestExtractNodename() {
Expand Down
Loading

0 comments on commit 0deb6e7

Please sign in to comment.