Skip to content

Commit

Permalink
Merge pull request #133 from rfyiamcool/feat/with_fail_fast_mode
Browse files Browse the repository at this point in the history
feat: add with fail fast mode
  • Loading branch information
hjr265 authored Sep 10, 2023
2 parents 2b94174 + f672726 commit 8d8c82b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
25 changes: 21 additions & 4 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Mutex struct {
value string
until time.Time
shuffle bool
failFast bool

pools []redis.Pool
}
Expand Down Expand Up @@ -263,17 +264,21 @@ func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, erro
err error
}

ch := make(chan result)
ch := make(chan result, len(m.pools))
for node, pool := range m.pools {
go func(node int, pool redis.Pool) {
r := result{node: node}
r.statusOK, r.err = actFn(pool)
ch <- r
}(node, pool)
}
n := 0
var taken []int
var err error

var (
n = 0
taken []int
err error
)

for range m.pools {
r := <-ch
if r.statusOK {
Expand All @@ -284,6 +289,18 @@ func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, erro
taken = append(taken, r.node)
err = multierror.Append(err, &ErrNodeTaken{Node: r.node})
}

if m.failFast {
// fast retrun
if n >= m.quorum {
return n, err
}

// fail fast
if len(taken) >= m.quorum {
return n, &ErrTaken{Nodes: taken}
}
}
}

if len(taken) >= m.quorum {
Expand Down
9 changes: 9 additions & 0 deletions redsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ func WithValue(v string) Option {
})
}

// WithFailFast can be used to quickly acquire and release the locker.
// When some redis servers are blocking, we do not need to wait for all redis servers response. as long as the quorum is met, it will be returned immediately.
// The effect of this parameter is to achieve low latency, avoid redis blocking causing lock/unlock to not return for a long time.
func WithFailFast(b bool) Option {
return OptionFunc(func(m *Mutex) {
m.failFast = b
})
}

// WithShufflePools can be used to shuffle Redis pools to reduce centralized access in concurrent scenarios.
func WithShufflePools(b bool) Option {
return OptionFunc(func(m *Mutex) {
Expand Down

0 comments on commit 8d8c82b

Please sign in to comment.