Skip to content

Commit

Permalink
add SetNodeStatus and NodeStatusStream API (#323)
Browse files Browse the repository at this point in the history
* add SetNodeStatus and NodeStatusStream API

* IDs -> ids, use /status prefix instead

* regenerate mock
  • Loading branch information
tonicmuroq authored Jan 20, 2021
1 parent fa0fab2 commit 6aab298
Show file tree
Hide file tree
Showing 22 changed files with 2,008 additions and 1,387 deletions.
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ GO_LDFLAGS ?= -s -X $(REPO_PATH)/version.REVISION=$(REVISION) \
-X $(REPO_PATH)/version.VERSION=$(VERSION)

grpc:
cd ./rpc/gen/; protoc --go_out=plugins=grpc:. core.proto
protoc --proto_path=./rpc/gen --go_out=plugins=grpc:./rpc/gen --go_opt=module=github.com/projecteru2/core/rpc/gen core.proto

deps:
env GO111MODULE=on go mod download
Expand All @@ -23,8 +23,10 @@ build: deps binary
test: deps unit-test

mock: deps
mockery -dir ./vendor/google.golang.org/grpc -name ServerStream -output 3rdmocks
mockery -dir vendor/github.com/docker/docker/client -name APIClient -output engine/docker/mocks
mockery --dir ./vendor/google.golang.org/grpc --name ServerStream --output 3rdmocks
mockery --dir vendor/github.com/docker/docker/client --name APIClient --output engine/docker/mocks
mockery --dir store --output store/mocks --name Store
mockery --dir cluster --output cluster/mocks --name Cluster

.ONESHELL:

Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ func withImageBuiltChannel(f func(chan *types.BuildImageMessage)) chan *types.Bu
return ch
}

func cleanupNodeImages(node *types.Node, IDs []string, ttl time.Duration) {
func cleanupNodeImages(node *types.Node, ids []string, ttl time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), ttl)
defer cancel()
for _, ID := range IDs {
if _, err := node.Engine.ImageRemove(ctx, ID, false, true); err != nil {
for _, id := range ids {
if _, err := node.Engine.ImageRemove(ctx, id, false, true); err != nil {
log.Errorf("[BuildImage] Remove image error: %s", err)
}
}
Expand Down
14 changes: 7 additions & 7 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ import (
)

// ControlWorkload control workloads status
func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
ch := make(chan *types.ControlWorkloadMessage)

go func() {
defer close(ch)
wg := sync.WaitGroup{}
for _, ID := range IDs {
for _, id := range ids {
wg.Add(1)
go func(ID string) {
go func(id string) {
defer wg.Done()
var message []*bytes.Buffer
err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
var err error
switch t {
case cluster.WorkloadStop:
Expand All @@ -44,16 +44,16 @@ func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, t string, f
return types.ErrUnknownControlType
})
if err == nil {
log.Infof("[ControlWorkload] Workload %s %s", ID, t)
log.Infof("[ControlWorkload] Workload %s %s", id, t)
log.Info("[ControlWorkload] Hook Output:")
log.Info(string(utils.MergeHookOutputs(message)))
}
ch <- &types.ControlWorkloadMessage{
WorkloadID: ID,
WorkloadID: id,
Error: err,
Hook: message,
}
}(ID)
}(id)
}
wg.Wait()
}()
Expand Down
10 changes: 5 additions & 5 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
)

// DissociateWorkload dissociate workload from eru, return it resource but not modity it
func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *types.DissociateWorkloadMessage, error) {
func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error) {
ch := make(chan *types.DissociateWorkloadMessage)
go func() {
defer close(ch)
for _, ID := range IDs {
err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
for _, id := range ids {
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
return utils.Txn(
ctx,
Expand All @@ -35,9 +35,9 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t
})
})
if err != nil {
log.Errorf("[DissociateWorkload] Dissociate workload %s failed, err: %v", ID, err)
log.Errorf("[DissociateWorkload] Dissociate workload %s failed, err: %v", id, err)
}
ch <- &types.DissociateWorkloadMessage{WorkloadID: ID, Error: err}
ch <- &types.DissociateWorkloadMessage{WorkloadID: id, Error: err}
}
}()
return ch, nil
Expand Down
10 changes: 5 additions & 5 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.Distrib
}
}

func (c *Calcium) withWorkloadLocked(ctx context.Context, ID string, f func(context.Context, *types.Workload) error) error {
return c.withWorkloadsLocked(ctx, []string{ID}, func(ctx context.Context, workloads map[string]*types.Workload) error {
if c, ok := workloads[ID]; ok {
func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(context.Context, *types.Workload) error) error {
return c.withWorkloadsLocked(ctx, []string{id}, func(ctx context.Context, workloads map[string]*types.Workload) error {
if c, ok := workloads[id]; ok {
return f(ctx, c)
}
return types.ErrWorkloadNotExists
Expand All @@ -53,11 +53,11 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(co
})
}

func (c *Calcium) withWorkloadsLocked(ctx context.Context, IDs []string, f func(context.Context, map[string]*types.Workload) error) error {
func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error {
workloads := map[string]*types.Workload{}
locks := map[string]lock.DistributedLock{}
defer func() { c.doUnlockAll(context.Background(), locks) }()
cs, err := c.GetWorkloads(ctx, IDs)
cs, err := c.GetWorkloads(ctx, ids)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// RemoveWorkload remove workloads
// returns a channel that contains removing responses
func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
ch := make(chan *types.RemoveWorkloadMessage)
if step < 1 {
step = 1
Expand All @@ -24,12 +24,12 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool,
defer close(ch)
wg := sync.WaitGroup{}
defer wg.Wait()
for i, ID := range IDs {
for i, id := range ids {
wg.Add(1)
go func(ID string) {
go func(id string) {
defer wg.Done()
ret := &types.RemoveWorkloadMessage{WorkloadID: ID, Success: false, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
ret := &types.RemoveWorkloadMessage{WorkloadID: id, Success: false, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
return utils.Txn(
ctx,
Expand All @@ -48,13 +48,13 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool,
)
})
}); err != nil {
log.Errorf("[RemoveWorkload] Remove workload %s failed, err: %v", ID, err)
log.Errorf("[RemoveWorkload] Remove workload %s failed, err: %v", id, err)
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
} else {
ret.Success = true
}
ch <- ret
}(ID)
}(id)
if (i+1)%step == 0 {
log.Info("[RemoveWorkload] Wait for previous tasks done")
wg.Wait()
Expand Down Expand Up @@ -83,8 +83,8 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload
}

// 同步地删除容器, 在某些需要等待的场合异常有用!
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, IDs []string) error {
ch, err := c.RemoveWorkload(ctx, IDs, true, 1)
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, ids []string) error {
ch, err := c.RemoveWorkload(ctx, ids, true, 1)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions cluster/calcium/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
defer close(ch)
wg := &sync.WaitGroup{}

for _, ID := range opts.IDs {
log.Infof("[Send] Send files to %s", ID)
for _, id := range opts.IDs {
log.Infof("[Send] Send files to %s", id)
wg.Add(1)
go func(ID string) {
go func(id string) {
defer wg.Done()
if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
for dst, content := range opts.Data {
err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true)
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
ch <- &types.SendMessage{ID: id, Path: dst, Error: err}
}
return nil
}); err != nil {
ch <- &types.SendMessage{ID: ID, Error: err}
ch <- &types.SendMessage{ID: id, Error: err}
}
}(ID)
}(id)
}
wg.Wait()
}()
Expand Down
15 changes: 15 additions & 0 deletions cluster/calcium/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,18 @@ func (c *Calcium) SetWorkloadsStatus(ctx context.Context, status []*types.Status
func (c *Calcium) WorkloadStatusStream(ctx context.Context, appname, entrypoint, nodename string, labels map[string]string) chan *types.WorkloadStatus {
return c.store.WorkloadStatusStream(ctx, appname, entrypoint, nodename, labels)
}

// SetNodeStatus set status of a node
// it's used to report whether a node is still alive
func (c *Calcium) SetNodeStatus(ctx context.Context, nodename string, ttl int64) error {
node, err := c.store.GetNode(ctx, nodename)
if err != nil {
return err
}
return c.store.SetNodeStatus(ctx, node, ttl)
}

// NodeStatusStream returns a stream of node status for subscribing
func (c *Calcium) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
return c.store.NodeStatusStream(ctx)
}
54 changes: 54 additions & 0 deletions cluster/calcium/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,57 @@ func TestWorkloadStatusStream(t *testing.T) {
assert.Equal(t, c.Delete, true)
}
}

func TestSetNodeStatus(t *testing.T) {
assert := assert.New(t)
c := NewTestCluster()
ctx := context.Background()
store := c.store.(*storemocks.Store)

node := &types.Node{
NodeMeta: types.NodeMeta{
Name: "testname",
Endpoint: "ep",
},
}
// failed
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
// failed by SetWorkloadStatus
store.On("SetNodeStatus",
mock.Anything,
mock.Anything,
mock.Anything,
).Return(types.ErrBadCount).Once()
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
// success
store.On("SetNodeStatus",
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil)
assert.NoError(c.SetNodeStatus(ctx, node.Name, 10))
}

func TestNodeStatusStream(t *testing.T) {
assert := assert.New(t)
c := NewTestCluster()
ctx := context.Background()
dataCh := make(chan *types.NodeStatus)
store := c.store.(*storemocks.Store)

store.On("NodeStatusStream", mock.Anything).Return(dataCh)
go func() {
msg := &types.NodeStatus{
Alive: true,
}
dataCh <- msg
close(dataCh)
}()

ch := c.NodeStatusStream(ctx)
for c := range ch {
assert.True(c.Alive)
}
}
14 changes: 8 additions & 6 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ type Cluster interface {
ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
GetNode(ctx context.Context, nodename string) (*types.Node, error)
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
SetNodeStatus(ctx context.Context, nodename string, ttl int64) error
NodeStatusStream(ctx context.Context) chan *types.NodeStatus
// node resource
NodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error)
// calculate capacity
CalculateCapacity(context.Context, *types.DeployOptions) (*types.CapacityMessage, error)
// meta workloads
GetWorkload(ctx context.Context, ID string) (*types.Workload, error)
GetWorkloads(ctx context.Context, IDs []string) ([]*types.Workload, error)
GetWorkload(ctx context.Context, id string) (*types.Workload, error)
GetWorkloads(ctx context.Context, ids []string) ([]*types.Workload, error)
ListWorkloads(ctx context.Context, opts *types.ListWorkloadsOptions) ([]*types.Workload, error)
ListNodeWorkloads(ctx context.Context, nodename string, labels map[string]string) ([]*types.Workload, error)
GetWorkloadsStatus(ctx context.Context, IDs []string) ([]*types.StatusMeta, error)
GetWorkloadsStatus(ctx context.Context, ids []string) ([]*types.StatusMeta, error)
SetWorkloadsStatus(ctx context.Context, status []*types.StatusMeta, ttls map[string]int64) ([]*types.StatusMeta, error)
WorkloadStatusStream(ctx context.Context, appname, entrypoint, nodename string, labels map[string]string) chan *types.WorkloadStatus
// file methods
Expand All @@ -73,9 +75,9 @@ type Cluster interface {
// workload methods
CreateWorkload(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateWorkloadMessage, error)
ReplaceWorkload(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceWorkloadMessage, error)
RemoveWorkload(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error)
DissociateWorkload(ctx context.Context, IDs []string) (chan *types.DissociateWorkloadMessage, error)
ControlWorkload(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlWorkloadMessage, error)
RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error)
DissociateWorkload(ctx context.Context, ids []string) (chan *types.DissociateWorkloadMessage, error)
ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error)
ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorkloadOptions, inCh <-chan []byte) chan *types.AttachWorkloadMessage
ReallocResource(ctx context.Context, opts *types.ReallocOptions) error
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
Expand Down
Loading

0 comments on commit 6aab298

Please sign in to comment.