diff --git a/cluster/calcium/lock.go b/cluster/calcium/lock.go index 6c649143a..522322ff6 100644 --- a/cluster/calcium/lock.go +++ b/cluster/calcium/lock.go @@ -38,11 +38,18 @@ func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg s return errors.WithStack(lock.Unlock(ctx)) } -func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) { - for n, lock := range locks { - // force unlock - if err := c.doUnlock(ctx, lock, n); err != nil { - log.Errorf(ctx, "[doUnlockAll] Unlock failed %v", err) +func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock, order []string) { + // unlock in the reverse order + if len(order) != len(locks) { + log.Warn(ctx, "[doUnlockAll] order length not match lock map") + order = []string{} + for key := range locks { + order = append(order, key) + } + } + for _, key := range order { + if err := c.doUnlock(ctx, locks[key], key); err != nil { + log.Errorf(ctx, "[doUnlockAll] Unlock %s failed %v", key, err) continue } } @@ -76,18 +83,13 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func( // sort + unique sort.Strings(ids) - j := 0 - for i, id := range ids { - if i > 0 && ids[i-1] == id { - continue - } - ids[j] = id - j++ - } - ids = ids[:j] + ids = ids[:utils.Unique(ids, func(i int) string { return ids[i] })] defer log.Debugf(ctx, "[withWorkloadsLocked] Workloads %+v unlocked", ids) - defer func() { c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks) }() + defer func() { + utils.Reverse(ids) + c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks, ids) + }() cs, err := c.GetWorkloads(ctx, ids) if err != nil { return err @@ -108,10 +110,14 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func( // withNodesLocked will using NodeFilter `nf` to filter nodes // and lock the corresponding nodes for the callback function `f` to use func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { + nodenames := []string{} nodes := map[string]*types.Node{} locks := map[string]lock.DistributedLock{} defer log.Debugf(ctx, "[withNodesLocked] Nodes %+v unlocked", nf) - defer c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks) + defer func() { + utils.Reverse(nodenames) + c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks, nodenames) + }() ns, err := c.filterNodes(ctx, nf) if err != nil { @@ -132,6 +138,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu return err } nodes[n.Name] = node + nodenames = append(nodenames, n.Name) } return f(ctx, nodes) } diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index 23a186481..ebe802a73 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -185,15 +185,7 @@ func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*t } sort.Slice(ns, func(i, j int) bool { return ns[i].Name <= ns[j].Name }) // unique - j := 0 - for i, n := range ns { - if i > 0 && n.Name == ns[i-1].Name { - continue - } - ns[j] = n - j++ - } - ns = ns[:j] + ns = ns[:utils.Unique(ns, func(i int) string { return ns[i].Name })] }() if len(nf.Includes) != 0 { diff --git a/cluster/calcium/workload.go b/cluster/calcium/workload.go index 4c557be6d..30805e821 100644 --- a/cluster/calcium/workload.go +++ b/cluster/calcium/workload.go @@ -5,7 +5,6 @@ package calcium import ( "context" - "sort" "github.com/pkg/errors" "github.com/projecteru2/core/log" @@ -51,20 +50,6 @@ func (c *Calcium) GetWorkload(ctx context.Context, id string) (workload *types.W // GetWorkloads get workloads func (c *Calcium) GetWorkloads(ctx context.Context, ids []string) (workloads []*types.Workload, err error) { - defer func() { - if len(workloads) == 0 { - return - } - sort.Slice(workloads, func(i, j int) bool { return workloads[i].ID <= workloads[j].ID }) - ws := []*types.Workload{} - for i, w := range workloads { - for i > 0 && w.ID == workloads[i-1].ID { - continue - } - ws = append(ws, w) - } - workloads = ws - }() workloads, err = c.store.GetWorkloads(ctx, ids) return workloads, log.WithField("Calcium", "GetWorkloads").WithField("ids", ids).Err(ctx, errors.WithStack(err)) } diff --git a/utils/utils.go b/utils/utils.go index 2f6e381c6..1b953ff67 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "math/big" "os" + "reflect" "strings" "github.com/pkg/errors" @@ -285,3 +286,28 @@ func safeSplit(s string) []string { return result } + +// Reverse any slice +func Reverse(s interface{}) { + n := reflect.ValueOf(s).Len() + swap := reflect.Swapper(s) + for i, j := 0, n-1; i < j; i, j = i+1, j-1 { + swap(i, j) + } +} + +// Unique return a index, where s[:index] is a unique slice +func Unique(s interface{}, getVal func(int) string) (j int) { + n := reflect.ValueOf(s).Len() + swap := reflect.Swapper(s) + lastVal := "" + for i := 0; i < n; i++ { + if getVal(i) == lastVal && i != 0 { + continue + } + lastVal = getVal(i) + swap(i, j) + j++ + } + return j +}