Skip to content

Commit

Permalink
refactor: rewrite realloc update process (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Sep 14, 2020
1 parent 28aab81 commit 62079c5
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 51 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ jobs:
run: |
echo "::set-env name=VERSION::$(git describe --tags $(git rev-list --tags --max-count=1))"
- name: Unshallow
run: git fetch --prune --unshallow

- name: Set up Go
uses: actions/setup-go@v2
with:
Expand Down
63 changes: 34 additions & 29 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,37 +233,42 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.
}

func (c *Calcium) updateResource(ctx context.Context, node *types.Node, container *types.Container, newResource *enginetypes.VirtualizationResource) error {
updateResourceErr := node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource)
if updateResourceErr == nil {
oldVolumeSize := container.Volumes.TotalSize()
container.CPU = newResource.CPU
container.Quota = newResource.Quota
container.Memory = newResource.Memory
container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes)
container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan)
container.Storage += container.Volumes.TotalSize() - oldVolumeSize
} else {
log.Errorf("[updateResource] When Realloc container, VirtualizationUpdateResource %s failed %v", container.ID, updateResourceErr)
}
// 成功失败都需要修改 node 的占用
// 成功的话,node 占用为新资源
// 失败的话,node 占用为老资源
node.CPU.Sub(container.CPU)
node.SetCPUUsed(container.Quota, types.IncrUsage)
node.Volume.Sub(container.VolumePlan.IntoVolumeMap())
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage)
node.StorageCap -= container.Storage
node.MemCap -= container.Memory
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
node.DecrNUMANodeMemory(nodeID, container.Memory)
}
// 更新 container 元数据
// since we don't rollback VirutalUpdateResource, client can't interrupt
if err := c.store.UpdateContainer(context.Background(), container); err != nil {
log.Errorf("[updateResource] Realloc finish but update container %s failed %v", container.ID, err)
var updateErr error
if err := utils.Txn(
ctx,
func(ctx context.Context) error {
if updateErr = node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource); updateErr == nil {
oldVolumeSize := container.Volumes.TotalSize()
container.CPU = newResource.CPU
container.Quota = newResource.Quota
container.Memory = newResource.Memory
container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes)
container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan)
container.Storage += container.Volumes.TotalSize() - oldVolumeSize
}
return nil
},
func(ctx context.Context) error {
// 成功失败都需要修改 node 的占用
// 成功的话,node 占用为新资源
// 失败的话,node 占用为老资源
node.CPU.Sub(container.CPU)
node.SetCPUUsed(container.Quota, types.IncrUsage)
node.Volume.Sub(container.VolumePlan.IntoVolumeMap())
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage)
node.StorageCap -= container.Storage
node.MemCap -= container.Memory
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
node.DecrNUMANodeMemory(nodeID, container.Memory)
}
return c.store.UpdateContainer(ctx, container)
},
nil,
c.config.GlobalTimeout,
); err != nil {
return err
}
return updateResourceErr
return updateErr
}

func (c *Calcium) reallocVolume(node *types.Node, containers []*types.Container, vbs types.VolumeBindings) (plans map[*types.Container]types.VolumePlan, err error) {
Expand Down
10 changes: 5 additions & 5 deletions cluster/calcium/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ type serviceWatcher struct {
subs sync.Map
}

func (w *serviceWatcher) Start(s store.Store, pushInterval time.Duration) {
func (w *serviceWatcher) Start(ctx context.Context, s store.Store, pushInterval time.Duration) {
w.once.Do(func() {
w.start(s, pushInterval)
w.start(ctx, s, pushInterval)
})
}

func (w *serviceWatcher) start(s store.Store, pushInterval time.Duration) {
ch, err := s.ServiceStatusStream(context.Background())
func (w *serviceWatcher) start(ctx context.Context, s store.Store, pushInterval time.Duration) {
ch, err := s.ServiceStatusStream(ctx)
if err != nil {
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
return
Expand Down Expand Up @@ -84,7 +84,7 @@ func (w *serviceWatcher) Unsubscribe(id uuid.UUID) {
// WatchServiceStatus returns chan of available service address
func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) {
ch := make(chan types.ServiceStatus)
c.watcher.Start(c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval)
c.watcher.Start(ctx, c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval)
id := c.watcher.Subscribe(ch)
go func() {
<-ctx.Done()
Expand Down
20 changes: 9 additions & 11 deletions core.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -46,7 +45,7 @@ func setupLog(l string) error {
return nil
}

func serve() {
func serve(c *cli.Context) error {
config, err := utils.LoadConfig(configPath)
if err != nil {
log.Fatalf("[main] %v", err)
Expand All @@ -70,7 +69,8 @@ func serve() {
vibranium := rpc.New(cluster, config, rpcch)
s, err := net.Listen("tcp", config.Bind)
if err != nil {
log.Fatalf("[main] %v", err)
log.Errorf("[main] %v", err)
return err
}

opts := []grpc.ServerOption{
Expand All @@ -90,9 +90,10 @@ func serve() {
pb.RegisterCoreRPCServer(grpcServer, vibranium)
go func() {
if err := grpcServer.Serve(s); err != nil {
log.Fatalf("[main] start grpc failed %v", err)
log.Errorf("[main] start grpc failed %v", err)
}
}()

if config.Profile != "" {
http.Handle("/metrics", metrics.Client.ResourceMiddleware(cluster)(promhttp.Handler()))
go func() {
Expand All @@ -102,10 +103,10 @@ func serve() {
}()
}

unregisterService, err := cluster.RegisterService(context.Background())
unregisterService, err := cluster.RegisterService(c.Context)
if err != nil {
log.Errorf("[main] failed to register service: %v", err)
return
return err
}
log.Info("[main] Cluster started successfully.")

Expand All @@ -122,6 +123,7 @@ func serve() {
log.Info("[main] Check if cluster still have running tasks.")
vibranium.Wait()
log.Info("[main] cluster gracefully stopped.")
return nil
}

func main() {
Expand All @@ -147,10 +149,6 @@ func main() {
Destination: &embeddedStorage,
},
}
app.Action = func(c *cli.Context) error {
serve()
return nil
}

app.Action = serve
_ = app.Run(os.Args)
}
4 changes: 1 addition & 3 deletions utils/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ func Txn(ctx context.Context, cond contextFunc, then contextFunc, rollback conte
thenCtx, thenCancel = context.WithTimeout(context.Background(), ttl)
defer thenCancel()
}
if then != nil {
tnxErr = then(thenCtx)
}
tnxErr = then(thenCtx)
}

return tnxErr
Expand Down

0 comments on commit 62079c5

Please sign in to comment.