Skip to content

Commit

Permalink
node management in cluster and store
Browse files Browse the repository at this point in the history
  • Loading branch information
zc committed Dec 19, 2019
1 parent d2be5aa commit 229bf5f
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 10 deletions.
21 changes: 20 additions & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
)

// AddNode add a node in pod
/*
func (c *Calcium) AddNode(ctx context.Context, nodename, endpoint, podname, ca, cert, key string,
cpu, share int, memory, storage int64, labels map[string]string,
numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error) {
return c.store.AddNode(ctx, nodename, endpoint, podname, ca, cert, key, cpu, share, memory, storage, labels, numa, numaMemory)
*/
func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*types.Node, error) {
return c.store.AddNode(ctx, opts.Nodename, opts.Endpoint, opts.Podname, opts.Ca, opts.Cert, opts.Key, opts.Cpu, opts.Share, opts.Memory, opts.Storage, opts.Labels, opts.Numa, opts.NumaMemory, opts.Volume)
}

// RemoveNode remove a node
Expand Down Expand Up @@ -126,6 +129,22 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}
}
}
// update volume
for volumeDir, changeCap := range opts.DeltaVolume {
if _, ok := n.Volume[volumeDir]; !ok && changeCap > 0 {
n.Volume[volumeDir] = changeCap
n.InitVolume[volumeDir] = changeCap
} else if ok && changeCap == 0 {
delete(n.Volume, volumeDir)
delete(n.InitVolume, volumeDir)
} else if ok {
n.Volume[volumeDir] += changeCap
n.InitVolume[volumeDir] += changeCap
if n.Volume[volumeDir] < 0 {
return types.ErrBadVolume
}
}
}
return c.store.UpdateNode(ctx, n)
})
}
1 change: 1 addition & 0 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, node *types.Node) (*typ
nr.CPUPercent = cpus / float64(len(node.InitCPU))
nr.MemoryPercent = float64(memory) / float64(node.InitMemCap)
nr.NUMAMemoryPercent = map[string]float64{}
nr.VolumePercent = float64(node.VolumeUsed) / float64(len(node.InitVolume))
for nodeID, nmemory := range node.NUMAMemory {
if initMemory, ok := node.InitNUMAMemory[nodeID]; ok {
nr.NUMAMemoryPercent[nodeID] = float64(nmemory) / float64(initMemory)
Expand Down
4 changes: 1 addition & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ type Cluster interface {
PodResource(ctx context.Context, podname string) (*types.PodResource, error)
ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
// meta node
AddNode(ctx context.Context, nodename, endpoint, podname, ca, cert, key string,
cpu, share int, memory, storage int64, labels map[string]string,
numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error)
AddNode(context.Context, *types.AddNodeOptions) (*types.Node, error)
RemoveNode(ctx context.Context, nodename string) error
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
GetNode(ctx context.Context, nodename string) (*types.Node, error)
Expand Down
2 changes: 1 addition & 1 deletion rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (v *Vibranium) AddPod(ctx context.Context, opts *pb.AddPodOptions) (*pb.Pod
// Method must be called synchronously, or nothing will be returned
func (v *Vibranium) AddNode(ctx context.Context, opts *pb.AddNodeOptions) (*pb.Node, error) {
addNodeOpts := toCoreAddNodeOptions(opts)
n, err := v.cluster.AddNode(addNodeOpts)
n, err := v.cluster.AddNode(ctx, addNodeOpts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func toRPCCPUMap(m types.CPUMap) map[string]int32 {

func toRPCVolumeMap(m types.VolumeMap) map[string]int64 {
volume := make(map[string]int64)
for dir, capacity := range volume {
for dir, capacity := range m {
volume[dir] = int64(capacity)
}
return volume
Expand Down
8 changes: 5 additions & 3 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// node->pod path in etcd is `/node/pod/:nodename`
func (m *Mercury) AddNode(ctx context.Context, name, endpoint, podname, ca, cert, key string,
cpu, share int, memory, storage int64, labels map[string]string,
numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error) {
numa types.NUMA, numaMemory types.NUMAMemory, volume types.VolumeMap) (*types.Node, error) {
_, err := m.GetPod(ctx, podname)
if err != nil {
return nil, err
Expand Down Expand Up @@ -71,7 +71,7 @@ func (m *Mercury) AddNode(ctx context.Context, name, endpoint, podname, ca, cert
}
}

return m.doAddNode(ctx, name, endpoint, podname, ca, cert, key, cpu, share, memory, storage, labels, numa, numaMemory)
return m.doAddNode(ctx, name, endpoint, podname, ca, cert, key, cpu, share, memory, storage, labels, numa, numaMemory, volume)
}

// RemoveNode delete a node
Expand Down Expand Up @@ -192,7 +192,7 @@ func (m *Mercury) makeClient(ctx context.Context, node *types.Node, force bool)
return client, nil
}

func (m *Mercury) doAddNode(ctx context.Context, name, endpoint, podname, ca, cert, key string, cpu, share int, memory, storage int64, labels map[string]string, numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error) {
func (m *Mercury) doAddNode(ctx context.Context, name, endpoint, podname, ca, cert, key string, cpu, share int, memory, storage int64, labels map[string]string, numa types.NUMA, numaMemory types.NUMAMemory, volumemap types.VolumeMap) (*types.Node, error) {
data := map[string]string{}
// 如果有tls的证书需要保存就保存一下
if ca != "" && cert != "" && key != "" {
Expand All @@ -213,10 +213,12 @@ func (m *Mercury) doAddNode(ctx context.Context, name, endpoint, podname, ca, ce
CPU: cpumap,
MemCap: memory,
StorageCap: storage,
Volume: volumemap,
InitCPU: cpumap,
InitMemCap: memory,
InitStorageCap: storage,
InitNUMAMemory: numaMemory,
InitVolume: volumemap,
Available: true,
Labels: labels,
NUMA: numa,
Expand Down
2 changes: 1 addition & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Store interface {
// node
AddNode(ctx context.Context, name, endpoint, podname, ca, cert, key string,
cpu, share int, memory, storage int64, labels map[string]string,
numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error)
numa types.NUMA, numaMemory types.NUMAMemory, volume types.VolumeMap) (*types.Node, error)
RemoveNode(ctx context.Context, node *types.Node) error
GetNode(ctx context.Context, nodename string) (*types.Node, error)
GetNodes(ctx context.Context, nodenames []string) ([]*types.Node, error)
Expand Down
1 change: 1 addition & 0 deletions types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
ErrBadMemory = errors.New("bad `Memory` value")
ErrBadCPU = errors.New("bad `CPU` value")
ErrBadStorage = errors.New("bad `Storage` value")
ErrBadVolume = errors.New("bad `Volume` value")
ErrBadCount = errors.New("bad `Count` value")

ErrPodHasNodes = errors.New("pod has nodes")
Expand Down

0 comments on commit 229bf5f

Please sign in to comment.