Skip to content

Commit

Permalink
unlock the mutex in the reverse order
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Jun 22, 2021
1 parent 8316d50 commit 9e24bc2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 40 deletions.
39 changes: 23 additions & 16 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg s
return errors.WithStack(lock.Unlock(ctx))
}

func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) {
for n, lock := range locks {
// force unlock
if err := c.doUnlock(ctx, lock, n); err != nil {
log.Errorf(ctx, "[doUnlockAll] Unlock failed %v", err)
func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock, order []string) {
// unlock in the reverse order
if len(order) != len(locks) {
log.Warn(ctx, "[doUnlockAll] order length not match lock map")
order = []string{}
for key := range locks {
order = append(order, key)
}
}
for _, key := range order {
if err := c.doUnlock(ctx, locks[key], key); err != nil {
log.Errorf(ctx, "[doUnlockAll] Unlock %s failed %v", key, err)
continue
}
}
Expand Down Expand Up @@ -76,18 +83,13 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(

// sort + unique
sort.Strings(ids)
j := 0
for i, id := range ids {
if i > 0 && ids[i-1] == id {
continue
}
ids[j] = id
j++
}
ids = ids[:j]
ids = ids[:utils.Unique(ids, func(i int) string { return ids[i] })]

defer log.Debugf(ctx, "[withWorkloadsLocked] Workloads %+v unlocked", ids)
defer func() { c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks) }()
defer func() {
utils.Reverse(ids)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks, ids)
}()
cs, err := c.GetWorkloads(ctx, ids)
if err != nil {
return err
Expand All @@ -108,10 +110,14 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
// withNodesLocked will using NodeFilter `nf` to filter nodes
// and lock the corresponding nodes for the callback function `f` to use
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
nodenames := []string{}
nodes := map[string]*types.Node{}
locks := map[string]lock.DistributedLock{}
defer log.Debugf(ctx, "[withNodesLocked] Nodes %+v unlocked", nf)
defer c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks)
defer func() {
utils.Reverse(nodenames)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks, nodenames)
}()

ns, err := c.filterNodes(ctx, nf)
if err != nil {
Expand All @@ -132,6 +138,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu
return err
}
nodes[n.Name] = node
nodenames = append(nodenames, n.Name)
}
return f(ctx, nodes)
}
10 changes: 1 addition & 9 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,7 @@ func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*t
}
sort.Slice(ns, func(i, j int) bool { return ns[i].Name <= ns[j].Name })
// unique
j := 0
for i, n := range ns {
if i > 0 && n.Name == ns[i-1].Name {
continue
}
ns[j] = n
j++
}
ns = ns[:j]
ns = ns[:utils.Unique(ns, func(i int) string { return ns[i].Name })]
}()

if len(nf.Includes) != 0 {
Expand Down
15 changes: 0 additions & 15 deletions cluster/calcium/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package calcium

import (
"context"
"sort"

"github.com/pkg/errors"
"github.com/projecteru2/core/log"
Expand Down Expand Up @@ -51,20 +50,6 @@ func (c *Calcium) GetWorkload(ctx context.Context, id string) (workload *types.W

// GetWorkloads get workloads
func (c *Calcium) GetWorkloads(ctx context.Context, ids []string) (workloads []*types.Workload, err error) {
defer func() {
if len(workloads) == 0 {
return
}
sort.Slice(workloads, func(i, j int) bool { return workloads[i].ID <= workloads[j].ID })
ws := []*types.Workload{}
for i, w := range workloads {
for i > 0 && w.ID == workloads[i-1].ID {
continue
}
ws = append(ws, w)
}
workloads = ws
}()
workloads, err = c.store.GetWorkloads(ctx, ids)
return workloads, log.WithField("Calcium", "GetWorkloads").WithField("ids", ids).Err(ctx, errors.WithStack(err))
}
26 changes: 26 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io/ioutil"
"math/big"
"os"
"reflect"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -285,3 +286,28 @@ func safeSplit(s string) []string {

return result
}

// Reverse any slice
func Reverse(s interface{}) {
n := reflect.ValueOf(s).Len()
swap := reflect.Swapper(s)
for i, j := 0, n-1; i < j; i, j = i+1, j-1 {
swap(i, j)
}
}

// Unique return a index, where s[:index] is a unique slice
func Unique(s interface{}, getVal func(int) string) (j int) {
n := reflect.ValueOf(s).Len()
swap := reflect.Swapper(s)
lastVal := ""
for i := 0; i < n; i++ {
if getVal(i) == lastVal && i != 0 {
continue
}
lastVal = getVal(i)
swap(i, j)
j++
}
return j
}

0 comments on commit 9e24bc2

Please sign in to comment.