Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move selfmon #540

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/projecteru2/core/source/github"
"github.com/projecteru2/core/source/gitlab"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/store/etcdv3"
"github.com/projecteru2/core/store/redis"
"github.com/projecteru2/core/types"

"github.com/pkg/errors"
Expand All @@ -31,27 +29,16 @@ type Calcium struct {
watcher discovery.Service
wal *WAL
identifier string
selfmon *NodeStatusWatcher
}

// New returns a new cluster config
func New(config types.Config, t *testing.T) (*Calcium, error) {
logger := log.WithField("Calcium", "New").WithField("config", config)

// set store
var store store.Store
var err error
switch config.Store {
case types.Redis:
store, err = redis.New(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}
default:
store, err = etcdv3.New(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}
store, err := store.NewStore(config, t)
if err != nil {
return nil, logger.Err(context.TODO(), errors.WithStack(err))
}

// set scheduler
Expand Down Expand Up @@ -83,10 +70,6 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.identifier = config.Identifier()
cal.selfmon = NewNodeStatusWatcher(cal)

// start node status watcher
go cal.selfmon.run()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
}
Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

Expand Down Expand Up @@ -34,7 +33,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
ctx,
// if
func(ctx context.Context) (err error) {
if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr); err == nil {
if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionIncr); err == nil {
log.Infof(ctx, "[DissociateWorkload] Workload %s dissociated", workload.ID)
}
return errors.WithStack(err)
Expand All @@ -48,7 +47,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
if failedByCond {
return nil
}
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionDecr))
},
c.config.GlobalTimeout,
)
Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

Expand Down Expand Up @@ -42,7 +41,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
ctx,
// if
func(ctx context.Context) error {
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionIncr))
},
// then
func(ctx context.Context) (err error) {
Expand All @@ -56,7 +55,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
if failedByCond {
return nil
}
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionDecr))
},
c.config.GlobalTimeout,
)
Expand Down
4 changes: 4 additions & 0 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/rpc"
pb "github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/selfmon"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/version"

Expand Down Expand Up @@ -133,6 +134,9 @@ func serve(c *cli.Context) error {
ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()

// start node status checker
go selfmon.RunNodeStatusWatcher(ctx, config, cluster, t)

// start engine cache checker
go factory.EngineCacheChecker(ctx, config.ConnectionTimeout)

Expand Down
2 changes: 1 addition & 1 deletion engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
)

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8]
return fmt.Sprintf("%v-%v", endpoint, utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8])
}

// EngineCacheChecker checks if the engine in cache is available
Expand Down
76 changes: 38 additions & 38 deletions cluster/calcium/selfmon.go → selfmon/selfmon.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package calcium
package selfmon

import (
"context"
"math/rand"
"os/signal"
"syscall"
"testing"
"time"

"github.com/pkg/errors"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)

Expand All @@ -19,22 +20,32 @@ const ActiveKey = "/selfmon/active"

// NodeStatusWatcher monitors the changes of node status
type NodeStatusWatcher struct {
id int64
cal *Calcium
id int64
config types.Config
cluster cluster.Cluster
store store.Store
}

// NewNodeStatusWatcher .
func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher {
// RunNodeStatusWatcher .
func RunNodeStatusWatcher(ctx context.Context, config types.Config, cluster cluster.Cluster, t *testing.T) {
rand.Seed(time.Now().UnixNano())
id := rand.Int63n(10000) // nolint
return &NodeStatusWatcher{
id: id,
cal: cal,
store, err := store.NewStore(config, t)
if err != nil {
log.Errorf(context.TODO(), "[RunNodeStatusWatcher] %v failed to create store, err: %v", id, err)
return
}

watcher := &NodeStatusWatcher{
id: id,
config: config,
store: store,
cluster: cluster,
}
watcher.run(ctx)
}

func (n *NodeStatusWatcher) run() {
ctx := n.getSignalContext(context.TODO())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那这个函数可以干掉了吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯嗯

func (n *NodeStatusWatcher) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
Expand All @@ -45,22 +56,11 @@ func (n *NodeStatusWatcher) run() {
log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err)
}
})
time.Sleep(n.cal.config.ConnectionTimeout)
time.Sleep(n.config.ConnectionTimeout)
}
}
}

func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context {
exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
defer cancel()
<-exitCtx.Done()
log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id)
}()

return exitCtx
}

// withActiveLock acquires the active lock synchronously
func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) {
ctx, cancel := context.WithCancel(parentCtx)
Expand Down Expand Up @@ -90,7 +90,7 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
if errors.Is(err, context.Canceled) {
log.Info("[Register] context canceled")
return
} else if !errors.Is(err, coretypes.ErrKeyExists) {
} else if !errors.Is(err, types.ErrKeyExists) {
log.Errorf(ctx, "[Register] failed to re-register: %v", err)
time.Sleep(time.Second)
continue
Expand Down Expand Up @@ -126,20 +126,20 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
}

func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) {
return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval)
return n.store.StartEphemeral(ctx, ActiveKey, n.config.HAKeepaliveInterval)
}

func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
log.Debug(ctx, "[NodeStatusWatcher] init node status started")
nodes := make(chan *coretypes.Node)
nodes := make(chan *types.Node)

go func() {
defer close(nodes)
// Get all nodes which are active status, and regardless of pod.
var err error
var ch <-chan *coretypes.Node
utils.WithTimeout(ctx, n.cal.config.GlobalTimeout, func(ctx context.Context) {
ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{
var ch <-chan *types.Node
utils.WithTimeout(ctx, n.config.GlobalTimeout, func(ctx context.Context) {
ch, err = n.cluster.ListPodNodes(ctx, &types.ListNodesOptions{
Podname: "",
Labels: nil,
All: true,
Expand All @@ -161,9 +161,9 @@ func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
}()

for node := range nodes {
status, err := n.cal.GetNodeStatus(ctx, node.Name)
status, err := n.cluster.GetNodeStatus(ctx, node.Name)
if err != nil {
status = &coretypes.NodeStatus{
status = &types.NodeStatus{
Nodename: node.Name,
Podname: node.Podname,
Alive: false,
Expand All @@ -178,15 +178,15 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
go n.initNodeStatus(ctx)

// monitor node status
messageChan := n.cal.NodeStatusStream(ctx)
messageChan := n.cluster.NodeStatusStream(ctx)
log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id)
defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id)

for {
select {
case message, ok := <-messageChan:
if !ok {
return coretypes.ErrMessageChanClosed
return types.ErrMessageChanClosed
}
go n.dealNodeStatusMessage(ctx, message)
case <-ctx.Done():
Expand All @@ -195,18 +195,18 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
}
}

func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) {
func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *types.NodeStatus) {
if message.Error != nil {
log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message)
return
}

// TODO maybe we need a distributed lock to control concurrency
opts := &coretypes.SetNodeOptions{
opts := &types.SetNodeOptions{
Nodename: message.Nodename,
WorkloadsDown: !message.Alive,
}
if _, err := n.cal.SetNode(ctx, opts); err != nil {
if _, err := n.cluster.SetNode(ctx, opts); err != nil {
log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err)
return
}
Expand Down
5 changes: 2 additions & 3 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down Expand Up @@ -178,9 +177,9 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
// UpdateNodeResource update cpu and memory on a node, either add or subtract
func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, resource *types.ResourceMeta, action string) error {
switch action {
case store.ActionIncr:
case types.ActionIncr:
node.RecycleResources(resource)
case store.ActionDecr:
case types.ActionDecr:
node.PreserveResources(resource)
default:
return types.ErrUnknownControlType
Expand Down
5 changes: 2 additions & 3 deletions store/etcdv3/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -223,8 +222,8 @@ func TestUpdateNodeResource(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, node.Name, "test")
assert.Error(t, m.UpdateNodeResource(ctx, node, nil, "wtf"))
assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionIncr))
assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionDecr))
assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionIncr))
assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionDecr))
}

func TestExtractNodename(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

Expand Down Expand Up @@ -169,9 +168,9 @@ func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error
// UpdateNodeResource update cpu and memory on a node, either add or subtract
func (r *Rediaron) UpdateNodeResource(ctx context.Context, node *types.Node, resource *types.ResourceMeta, action string) error {
switch action {
case store.ActionIncr:
case types.ActionIncr:
node.RecycleResources(resource)
case store.ActionDecr:
case types.ActionDecr:
node.PreserveResources(resource)
default:
return types.ErrUnknownControlType
Expand Down
5 changes: 2 additions & 3 deletions store/redis/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"path/filepath"
"time"

"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
)

Expand Down Expand Up @@ -213,8 +212,8 @@ func (s *RediaronTestSuite) TestUpdateNodeResource() {
s.NoError(err)
s.Equal(node.Name, "test")
s.Error(s.rediaron.UpdateNodeResource(ctx, node, nil, "wtf"))
s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionIncr))
s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionDecr))
s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionIncr))
s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionDecr))
}

func (s *RediaronTestSuite) TestExtractNodename() {
Expand Down
Loading