Skip to content

Commit

Permalink
refactor container status APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Nov 22, 2019
1 parent 71bfd82 commit 1b26d64
Show file tree
Hide file tree
Showing 21 changed files with 1,551 additions and 1,049 deletions.
8 changes: 2 additions & 6 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package calcium
import (
"bytes"
"context"
"fmt"
"sync"

"github.com/projecteru2/core/cluster"
Expand Down Expand Up @@ -38,12 +37,9 @@ func (c *Calcium) ControlContainer(ctx context.Context, IDs []string, t string,
return err
}
container.RuntimeMeta.Running = false
m2, e2 := c.doStartContainer(ctx, container, force)
m2, err := c.doStartContainer(ctx, container, force)
message = append(message, m2...)
if e2 != nil {
return fmt.Errorf("%w", e2)
}
return nil
return err
}
return types.ErrUnknownControlType
})
Expand Down
6 changes: 2 additions & 4 deletions cluster/calcium/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ func TestControlStart(t *testing.T) {
ID: "cid",
Running: false,
}
b, err := json.Marshal(runtimeMeta)
_, err = json.Marshal(runtimeMeta)
assert.NoError(t, err)
container := &types.Container{
ID: "cid",
Privileged: true,
StatusData: b,
}
engine := &enginemocks.API{}
container.Engine = engine
Expand Down Expand Up @@ -131,12 +130,11 @@ func TestControlStop(t *testing.T) {
ID: "cid",
Running: true,
}
b, err := json.Marshal(runtimeMeta)
_, err := json.Marshal(runtimeMeta)
assert.NoError(t, err)
container := &types.Container{
ID: "cid",
Privileged: true,
StatusData: b,
RuntimeMeta: runtimeMeta,
}
engine := &enginemocks.API{}
Expand Down
13 changes: 6 additions & 7 deletions cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ func TestDissociateContainer(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)

c1 := &types.Container{
ID: "c1",
Podname: "p1",
Memory: 5 * int64(units.MiB),
Quota: 0.9,
CPU: types.CPUMap{"2": 90},
Nodename: "node1",
StatusData: []byte("{}"),
ID: "c1",
Podname: "p1",
Memory: 5 * int64(units.MiB),
Quota: 0.9,
CPU: types.CPUMap{"2": 90},
Nodename: "node1",
}

node1 := &types.Node{
Expand Down
5 changes: 2 additions & 3 deletions cluster/calcium/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ func TestWithContainerLocked(t *testing.T) {
assert.Error(t, err)
engine := &enginemocks.API{}
container := &types.Container{
ID: "c1",
Engine: engine,
StatusData: []byte("{}"),
ID: "c1",
Engine: engine,
}
store.On("GetContainer", mock.Anything, mock.Anything).Return(container, nil)
// success
Expand Down
15 changes: 1 addition & 14 deletions cluster/calcium/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/sanity-io/litter"

"github.com/projecteru2/core/utils"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -112,12 +110,6 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
return err
}
for _, container := range containers {
appname, entrypoint, _, err := utils.ParseContainerName(container.Name)
if err != nil {
log.Errorf("[SetNodeAvailable] Get container %s on node %s failed %v", container.ID, opts.Nodename, err)
continue
}

// subscriber should query eru to get other metas
meta := &types.RuntimeMeta{
ID: container.ID,
Expand All @@ -131,7 +123,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}

// mark container which belongs to this node as unhealthy
if err := c.ContainerDeployed(ctx, container.ID, appname, entrypoint, container.Nodename, b, 0); err != nil {
if err = c.store.SetContainerStatus(ctx, container, b, 0); err != nil {
log.Errorf("[SetNodeAvailable] Set container %s on node %s inactive failed %v", container.ID, opts.Nodename, err)
}
}
Expand Down Expand Up @@ -194,8 +186,3 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
func (c *Calcium) GetNodeByName(ctx context.Context, nodename string) (*types.Node, error) {
return c.store.GetNodeByName(ctx, nodename)
}

// ContainerDeployed set container deploy status
func (c *Calcium) ContainerDeployed(ctx context.Context, ID, appname, entrypoint, nodename string, data []byte, ttl int64) error {
return c.store.ContainerDeployed(ctx, ID, appname, entrypoint, nodename, data, ttl)
}
21 changes: 1 addition & 20 deletions cluster/calcium/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ func TestSetNode(t *testing.T) {
assert.Error(t, err)
containers := []*types.Container{&types.Container{Name: "wrong_name"}, &types.Container{Name: "a_b_c"}}
store.On("ListNodeContainers", mock.Anything, mock.Anything).Return(containers, nil)
store.On("ContainerDeployed",
mock.Anything, mock.Anything, mock.Anything,
store.On("SetContainerStatus",
mock.Anything, mock.Anything, mock.Anything, mock.Anything,
).Return(types.ErrNoETCD)
_, err = c.SetNode(ctx, &types.SetNodeOptions{Status: 0})
Expand Down Expand Up @@ -340,21 +339,3 @@ func TestSetNode(t *testing.T) {
assert.Equal(t, len(n.CPU), 2)
assert.Equal(t, len(n.InitCPU), 2)
}

func TestContainerDeployed(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()

store := &storemocks.Store{}
store.On("ContainerDeployed",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
c.store = store

assert.NoError(t, c.ContainerDeployed(ctx, "", "", "", "", []byte{}, 0))
}
33 changes: 33 additions & 0 deletions cluster/calcium/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,39 @@ import (
"github.com/projecteru2/core/types"
)

// GetContainersStatus get containers status
func (c *Calcium) GetContainersStatus(ctx context.Context, IDs []string) (map[string][]byte, error) {
result := map[string][]byte{}
for _, ID := range IDs {
status, err := c.store.GetContainerStatus(ctx, ID)
if err != nil {
return nil, err
}
result[ID] = status
}
return result, nil
}

// SetContainersStatus set containers status
func (c *Calcium) SetContainersStatus(ctx context.Context, status map[string][]byte, ttls map[string]int64) (map[string][]byte, error) {
result := map[string][]byte{}
for ID, containerStatus := range status {
container, err := c.store.GetContainer(ctx, ID)
if err != nil {
return nil, err
}
ttl, ok := ttls[ID]
if !ok {
ttl = 0
}
if err = c.store.SetContainerStatus(ctx, container, containerStatus, ttl); err != nil {
return nil, err
}
result[ID] = containerStatus
}
return result, nil
}

// DeployStatusStream watch deploy status
func (c *Calcium) DeployStatusStream(ctx context.Context, appname, entrypoint, nodename string) chan *types.DeployStatus {
return c.store.WatchDeployStatus(ctx, appname, entrypoint, nodename)
Expand Down
57 changes: 53 additions & 4 deletions cluster/calcium/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,65 @@ import (
"github.com/stretchr/testify/mock"
)

func TestDeployStatusStream(t *testing.T) {
func TestGetContainersStatus(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := c.store.(*storemocks.Store)

// failed
store.On("GetContainerStatus", mock.AnythingOfType("*context.emptyCtx"), mock.Anything).Return(nil, types.ErrBadCount).Once()
_, err := c.GetContainersStatus(ctx, []string{"123"})
assert.Error(t, err)
status := []byte("abc")
store.On("GetContainerStatus", mock.AnythingOfType("*context.emptyCtx"), mock.Anything).Return(status, nil)
// success
r, err := c.GetContainersStatus(ctx, []string{"123"})
assert.NoError(t, err)
assert.Len(t, r, 1)
}

func TestSetContainersStatus(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := c.store.(*storemocks.Store)

// failed
store.On("GetContainer", mock.AnythingOfType("*context.emptyCtx"), mock.Anything).Return(nil, types.ErrBadCount).Once()
_, err := c.SetContainersStatus(ctx, map[string][]byte{"123": []byte{}}, nil)
assert.Error(t, err)
container := &types.Container{
ID: "123",
Name: "a_b_c",
}
store.On("GetContainer", mock.AnythingOfType("*context.emptyCtx"), mock.Anything).Return(container, nil)
// failed by SetContainerStatus
store.On("SetContainerStatus",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(types.ErrBadCount).Once()
_, err = c.SetContainersStatus(ctx, map[string][]byte{"123": []byte{}}, nil)
assert.Error(t, err)
// success
store.On("SetContainerStatus",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil)
r, err := c.SetContainersStatus(ctx, map[string][]byte{"123": []byte{}}, nil)
assert.NoError(t, err)
assert.Len(t, r, 1)
}

func TestDeployStatusStream(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
dataCh := make(chan *types.DeployStatus)
store := c.store.(*storemocks.Store)

store := &storemocks.Store{}
store.On("WatchDeployStatus", mock.AnythingOfType("*context.emptyCtx"), mock.Anything, mock.Anything, mock.Anything).Return(dataCh)
c.store = store

ID := "wtf"
go func() {
msg := &types.DeployStatus{
Expand Down
46 changes: 25 additions & 21 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,46 +50,50 @@ const (

// Cluster define all interface
type Cluster interface {
// meta data methods
// meta networks
ListNetworks(ctx context.Context, podname string, driver string) ([]*enginetypes.Network, error)
// meta pod
ListPods(ctx context.Context) ([]*types.Pod, error)
AddPod(ctx context.Context, podname, desc string) (*types.Pod, error)
RemovePod(ctx context.Context, podname string) error
GetPod(ctx context.Context, podname string) (*types.Pod, error)
PodResource(ctx context.Context, podname string) (*types.PodResource, error)
ListPodNodes(ctx context.Context, podname string, all bool) ([]*types.Node, error)
// meta node
AddNode(ctx context.Context, nodename, endpoint, podname, ca, cert, key string,
cpu, share int, memory, storage int64, labels map[string]string,
numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error)
RemovePod(ctx context.Context, podname string) error
RemoveNode(ctx context.Context, podname, nodename string) error
ListPods(ctx context.Context) ([]*types.Pod, error)
ListPodNodes(ctx context.Context, podname string, all bool) ([]*types.Node, error)
ListContainers(ctx context.Context, opts *types.ListContainersOptions) ([]*types.Container, error)
ListNodeContainers(ctx context.Context, nodename string) ([]*types.Container, error)
ListNetworks(ctx context.Context, podname string, driver string) ([]*enginetypes.Network, error)
GetPod(ctx context.Context, podname string) (*types.Pod, error)
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
GetNode(ctx context.Context, podname, nodename string) (*types.Node, error)
NodeResource(ctx context.Context, podname, nodename string) (*types.NodeResource, error)
GetNodeByName(ctx context.Context, nodename string) (*types.Node, error)
// meta containers
GetContainersStatus(ctx context.Context, IDs []string) (map[string][]byte, error)
SetContainersStatus(ctx context.Context, status map[string][]byte, ttls map[string]int64) (map[string][]byte, error)
GetContainer(ctx context.Context, ID string) (*types.Container, error)
GetContainers(ctx context.Context, IDs []string) ([]*types.Container, error)
// used by agent
GetNodeByName(ctx context.Context, nodename string) (*types.Node, error)
ContainerDeployed(ctx context.Context, ID, appname, entrypoint, nodename string, data []byte, ttl int64) error
ListContainers(ctx context.Context, opts *types.ListContainersOptions) ([]*types.Container, error)
ListNodeContainers(ctx context.Context, nodename string) ([]*types.Container, error)
// cluster methods
PodResource(ctx context.Context, podname string) (*types.PodResource, error)
NodeResource(ctx context.Context, podname, nodename string) (*types.NodeResource, error)
ControlContainer(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlContainerMessage, error)
Copy(ctx context.Context, opts *types.CopyOptions) (chan *types.CopyMessage, error)
Send(ctx context.Context, opts *types.SendOptions) (chan *types.SendMessage, error)
// image methods
BuildImage(ctx context.Context, opts *enginetypes.BuildOptions) (chan *types.BuildImageMessage, error)
CacheImage(ctx context.Context, podname, nodenmae string, images []string, step int) (chan *types.CacheImageMessage, error)
RemoveImage(ctx context.Context, podname, nodename string, images []string, step int, prune bool) (chan *types.RemoveImageMessage, error)
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachContainerMessage, error)
// for watch
DeployStatusStream(ctx context.Context, appname, entrypoint, nodename string) chan *types.DeployStatus
ExecuteContainer(ctx context.Context, opts *types.ExecuteContainerOptions, inCh <-chan []byte) chan *types.AttachContainerMessage
// build methods
BuildImage(ctx context.Context, opts *enginetypes.BuildOptions) (chan *types.BuildImageMessage, error)
// this methods will not interrupt by client
// container methods
CreateContainer(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error)
ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64) (chan *types.ReallocResourceMessage, error)
ReplaceContainer(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceContainerMessage, error)
RemoveContainer(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveContainerMessage, error)
DissociateContainer(ctx context.Context, IDs []string) (chan *types.DissociateContainerMessage, error)
ReplaceContainer(ctx context.Context, opts *types.ReplaceOptions) (chan *types.ReplaceContainerMessage, error)
ControlContainer(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlContainerMessage, error)
ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64) (chan *types.ReallocResourceMessage, error)
LogStream(ctx context.Context, ID string) (chan *types.LogStreamMessage, error)
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachContainerMessage, error)
ExecuteContainer(ctx context.Context, opts *types.ExecuteContainerOptions, inCh <-chan []byte) chan *types.AttachContainerMessage
// finalizer
Finalizer()
}
Loading

0 comments on commit 1b26d64

Please sign in to comment.