Skip to content

Commit

Permalink
Merge branch 'add-grpc-tests' into 'master'
Browse files Browse the repository at this point in the history
Add grpc tests

See merge request !111
  • Loading branch information
CMGS committed Jul 24, 2017
2 parents e559fef + 544d9e2 commit 5bc135e
Show file tree
Hide file tree
Showing 10 changed files with 666 additions and 63 deletions.
4 changes: 4 additions & 0 deletions cluster/calcium/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ func New(config types.Config) (*calcium, error) {

return &calcium{store: store, config: config, scheduler: scheduler, network: titanium, source: source}, nil
}

func (c *calcium) ResetSotre(s store.Store) {
c.store = s
}
4 changes: 4 additions & 0 deletions cluster/calcium/create_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,10 @@ func (c *calcium) UpgradeContainer(ids []string, image string) (chan *types.Upgr
return ch, fmt.Errorf("No container ids given")
}

if image == "" {
return ch, fmt.Errorf("Image is empty")
}

containers, err := c.GetContainers(ids)
if err != nil {
return ch, err
Expand Down
2 changes: 2 additions & 0 deletions rpc/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (v *vibranium) taskAdd(name string, verbose bool) {
log.Infof("task [%s] added", name)
}
v.counter.Add(1)
v.TaskNum++
}

// 完成一个任务, 在任务执行完之后调用一次.
Expand All @@ -25,6 +26,7 @@ func (v *vibranium) taskDone(name string, verbose bool) {
log.Infof("task [%s] done", name)
}
v.counter.Done()
v.TaskNum--
}

// 会在外面graceful之后调用.
Expand Down
92 changes: 35 additions & 57 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type vibranium struct {
cluster cluster.Cluster
config types.Config
counter sync.WaitGroup
TaskNum int
}

// Implementations for grpc server interface
Expand Down Expand Up @@ -70,8 +71,7 @@ func (v *vibranium) AddNode(ctx context.Context, opts *pb.AddNodeOptions) (*pb.N
return toRPCNode(n, v.cluster.GetZone()), nil
}

// AddNode saves a node and returns it to client
// Method must be called synchronously, or nothing will be returned
// RemoveNode removes the node from etcd
func (v *vibranium) RemoveNode(ctx context.Context, opts *pb.RemoveNodeOptions) (*pb.Pod, error) {
p, err := v.cluster.RemoveNode(opts.Nodename, opts.Podname)
if err != nil {
Expand Down Expand Up @@ -128,6 +128,7 @@ func (v *vibranium) GetContainer(ctx context.Context, id *pb.ContainerID) (*pb.C

// GetContainers
// like GetContainer, information should be returned

func (v *vibranium) GetContainers(ctx context.Context, cids *pb.ContainerIDs) (*pb.Containers, error) {
ids := []string{}
for _, id := range cids.Ids {
Expand Down Expand Up @@ -182,26 +183,25 @@ func (v *vibranium) SetNodeAvailable(ctx context.Context, opts *pb.NodeAvailable
// streamed returned functions
// caller must ensure that timeout will not be too short because these actions take a little time
func (v *vibranium) BuildImage(opts *pb.BuildImageOptions, stream pb.CoreRPC_BuildImageServer) error {
v.taskAdd("BuildImage", true)
defer v.taskDone("BuildImage", true)

ch, err := v.cluster.BuildImage(opts.Repo, opts.Version, opts.Uid, opts.Artifact)
if err != nil {
return err
}

for m := range ch {
if err := stream.Send(toRPCBuildImageMessage(m)); err != nil {
go func() {
for r := range ch {
log.Infof("[BuildImage] Unsent streamed message: %v", r)
}
}()
return err
if err = stream.Send(toRPCBuildImageMessage(m)); err != nil {
v.logUnsentMessages("BuildImage", m)
}
}
return nil
return err
}

func (v *vibranium) CreateContainer(opts *pb.DeployOptions, stream pb.CoreRPC_CreateContainerServer) error {
v.taskAdd("CreateContainer", true)
defer v.taskDone("CreateContainer", true)

specs, err := types.LoadSpecs(opts.Specs)
if err != nil {
Expand All @@ -214,19 +214,12 @@ func (v *vibranium) CreateContainer(opts *pb.DeployOptions, stream pb.CoreRPC_Cr
}

for m := range ch {
if err := stream.Send(toRPCCreateContainerMessage(m)); err != nil {
go func() {
defer v.taskDone("CreateContainer", true)
for r := range ch {
log.Infof("[CreateContainer] Unsent streamed message: %v", r)
}
}()
return err
if err = stream.Send(toRPCCreateContainerMessage(m)); err != nil {
v.logUnsentMessages("CreateContainer", m)
}
}

v.taskDone("CreateContainer", true)
return nil
return err
}

func (v *vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error {
Expand All @@ -249,8 +242,8 @@ func (v *vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error {
defer stdinReader.Close()
ch, err := v.cluster.RunAndWait(specs, toCoreDeployOptions(opts), stdinReader)
if err != nil {
// `ch` is nil now
log.Errorf("[RunAndWait] Start run and wait failed %s", err)
stream.Send(toRPCRunAndWaitMessage(<-ch))
return err
}

Expand All @@ -277,22 +270,18 @@ func (v *vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error {
}

for m := range ch {
if err := stream.Send(toRPCRunAndWaitMessage(m)); err != nil {
if err = stream.Send(toRPCRunAndWaitMessage(m)); err != nil {
log.Errorf("[RunAndWait] Send msg error: %s", err)
go func() {
for r := range ch {
log.Infof("[RunAndWait] Unsent streamed message: %s", r.Data)
}
}()
return err
v.logUnsentMessages("RunAndWait", m)
}
}

return nil
return err
}

func (v *vibranium) UpgradeContainer(opts *pb.UpgradeOptions, stream pb.CoreRPC_UpgradeContainerServer) error {
v.taskAdd("UpgradeContainer", true)
defer v.taskDone("UpgradeContainer", true)

ids := []string{}
for _, id := range opts.Ids {
Expand All @@ -305,23 +294,17 @@ func (v *vibranium) UpgradeContainer(opts *pb.UpgradeOptions, stream pb.CoreRPC_
}

for m := range ch {
if err := stream.Send(toRPCUpgradeContainerMessage(m)); err != nil {
go func() {
defer v.taskDone("UpgradeContainer", true)
for r := range ch {
log.Infof("[UpgradeContainer] Unsent streamed message: %v", r)
}
}()
return err
if err = stream.Send(toRPCUpgradeContainerMessage(m)); err != nil {
v.logUnsentMessages("UpgradeContainer", m)
}
}

v.taskDone("UpgradeContainer", true)
return nil
return err
}

func (v *vibranium) RemoveContainer(cids *pb.ContainerIDs, stream pb.CoreRPC_RemoveContainerServer) error {
v.taskAdd("RemoveContainer", true)
defer v.taskDone("RemoveContainer", true)

ids := []string{}
for _, id := range cids.Ids {
Expand All @@ -338,38 +321,29 @@ func (v *vibranium) RemoveContainer(cids *pb.ContainerIDs, stream pb.CoreRPC_Rem
}

for m := range ch {
if err := stream.Send(toRPCRemoveContainerMessage(m)); err != nil {
go func() {
defer v.taskDone("RemoveContainer", true)
for r := range ch {
log.Infof("[RemoveContainer] Unsent streamed message: %v", r)
}
}()
return err
if err = stream.Send(toRPCRemoveContainerMessage(m)); err != nil {
v.logUnsentMessages("RemoveContainer", m)
}
}

v.taskDone("RemoveContainer", true)
return nil
return err
}

func (v *vibranium) RemoveImage(opts *pb.RemoveImageOptions, stream pb.CoreRPC_RemoveImageServer) error {
v.taskAdd("RemoveImage", true)
defer v.taskDone("RemoveImage", true)

ch, err := v.cluster.RemoveImage(opts.Podname, opts.Nodename, opts.Images)
if err != nil {
return err
}

for m := range ch {
if err := stream.Send(toRPCRemoveImageMessage(m)); err != nil {
go func() {
for r := range ch {
log.Infof("[RemoveImage] Unsent streamed message: %v", r)
}
}()
return err
if err = stream.Send(toRPCRemoveImageMessage(m)); err != nil {
v.logUnsentMessages("RemoveImage", m)
}
}
return nil
return err
}

func (v *vibranium) Backup(ctx context.Context, opts *pb.BackupOptions) (*pb.BackupMessage, error) {
Expand All @@ -383,6 +357,10 @@ func (v *vibranium) Backup(ctx context.Context, opts *pb.BackupOptions) (*pb.Bac
return toRPCBackupMessage(backupMessage), nil
}

func (v *vibranium) logUnsentMessages(msgType string, msg interface{}) {
log.Infof("Unsent %s streamed message: %v", msgType, msg)
}

func New(cluster cluster.Cluster, config types.Config) *vibranium {
return &vibranium{cluster: cluster, config: config, counter: sync.WaitGroup{}}
}
Loading

0 comments on commit 5bc135e

Please sign in to comment.