Skip to content

Commit

Permalink
fix(tests): added tests for Renew
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlangzi committed Mar 19, 2024
1 parent 75bf8a9 commit a433a03
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 78 deletions.
7 changes: 7 additions & 0 deletions lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ type Lease struct {
ExpiresOn time.Time `json:"-"`
}

// IsLive check if lease is live on node side
func (l *Lease) IsLive() bool {
return time.Now().Before(time.Unix(l.Since, 0).Add(l.TTL.Duration()))
}

func (l *Lease) IsExpired(start time.Time) bool {
now := time.Now()
l.ExpiresOn = now.Add(l.TTL.Duration() - time.Until(start))
return !now.Before(l.ExpiresOn)
}
194 changes: 126 additions & 68 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dlm
import (
"context"
"errors"
"log/slog"
"math"
"net/rpc"
"strings"
Expand All @@ -27,7 +28,7 @@ func New(id, topic, key string, options ...MutexOption) *Mutex {
o(m)
}

m.consensus = int(math.Ceil(float64(len(m.peers)) / 2))
m.consensus = int(math.Floor(float64(len(m.peers))/2)) + 1

return m
}
Expand All @@ -43,33 +44,29 @@ type Mutex struct {
ttl time.Duration

consensus int
cluster []*rpc.Client
done chan struct{}

lease Lease
}

func (m *Mutex) connect(ctx context.Context) error {
if m.cluster == nil {
a := async.New[*rpc.Client]()
for _, d := range m.peers {
a.Add(func(addr string) func(context.Context) (*rpc.Client, error) {
return func(ctx context.Context) (*rpc.Client, error) {
return connect(ctx, addr, m.timeout)
}
}(d))
}
func (m *Mutex) connect(ctx context.Context) ([]*rpc.Client, error) {

cluster, _, err := a.Wait(ctx)
if len(cluster) >= m.consensus {
m.cluster = cluster
return nil
}
a := async.New[*rpc.Client]()
for _, d := range m.peers {
a.Add(func(addr string) func(context.Context) (*rpc.Client, error) {
return func(ctx context.Context) (*rpc.Client, error) {
return connect(ctx, addr, m.timeout)
}
}(d))
}

return err
cluster, _, err := a.Wait(ctx)
if len(cluster) >= m.consensus {
return cluster, nil
}

return nil
return nil, err

}

func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc, error) {
Expand All @@ -82,12 +79,12 @@ func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc,
ctx, cancel := context.WithTimeout(ctx, m.timeout)
defer cancel()

err := m.connect(ctx)
cluster, err := m.connect(ctx)
if err != nil {
return nil, nil, err
}

for _, c := range m.cluster {
for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (Lease, error) {
return func(ctx context.Context) (Lease, error) {
var t Lease
Expand All @@ -98,17 +95,16 @@ func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc,
}

start := time.Now()
result, _, err := a.WaitN(ctx, m.consensus)
result, errs, err := a.WaitN(ctx, m.consensus)

if err != nil {
return nil, nil, err
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return nil, nil, m.Error(errs, err)
}

t := result[0]
now := time.Now()
t.ExpiresOn = now.Add(t.TTL.Duration() - time.Until(start))

if !now.Before(t.ExpiresOn) {
if t.IsExpired(start) {
return nil, nil, ErrExpiredLease
}

Expand All @@ -123,49 +119,17 @@ func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc,

}

func (m *Mutex) Unlock(ctx context.Context) error {
func (m *Mutex) Renew(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()

a := async.New[bool]()
a := async.New[Lease]()
req := m.createRequest()
for _, c := range m.cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
var t bool
err := c.Call("dlm.ReleaseLock", req, &t)
if err != nil {
return t, err
}

return t, nil
}
}(c))
}
m.done <- struct{}{}

_, _, err := a.WaitN(ctx, m.consensus)
cluster, err := m.connect(ctx)
if err != nil {
return err
}

return nil
}
func (m *Mutex) createRequest() LockRequest {
return LockRequest{
ID: m.id,
Topic: m.topic,
Key: m.key,
TTL: m.ttl,
}
}

func (m *Mutex) Renew(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
a := async.New[Lease]()
req := m.createRequest()
for _, c := range m.cluster {
for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (Lease, error) {
return func(ctx context.Context) (Lease, error) {
var t Lease
Expand All @@ -182,23 +146,65 @@ func (m *Mutex) Renew(ctx context.Context) error {
defer cancel()

start := time.Now()
result, _, err := a.WaitN(ctx, m.consensus)
result, errs, err := a.WaitN(ctx, m.consensus)
if err != nil {
return err
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return m.Error(errs, err)
}

now := time.Now()
t := result[0]
t.ExpiresOn = now.Add(t.TTL.Duration() - time.Until(start))

if !now.After(t.ExpiresOn) {
if t.IsExpired(start) {
return ErrExpiredLease
}

m.lease = t
return nil
}

func (m *Mutex) Unlock(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()

Check warning on line 166 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L164-L166

Added lines #L164 - L166 were not covered by tests

a := async.New[bool]()
req := m.createRequest()

Check warning on line 169 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L168-L169

Added lines #L168 - L169 were not covered by tests

cluster, err := m.connect(ctx)
if err != nil {
return err

Check warning on line 173 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L171-L173

Added lines #L171 - L173 were not covered by tests
}

for _, c := range cluster {
a.Add(func(c *rpc.Client) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
var t bool
err := c.Call("dlm.ReleaseLock", req, &t)
if err != nil {
return t, err

Check warning on line 182 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L176-L182

Added lines #L176 - L182 were not covered by tests
}

return t, nil

Check warning on line 185 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L185

Added line #L185 was not covered by tests
}
}(c))
}
m.done <- struct{}{}

Check warning on line 189 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L189

Added line #L189 was not covered by tests

_, errs, err := a.WaitN(ctx, m.consensus)
if err != nil {
Logger.Warn("dlm: renew lock", slog.Any("err", errs))
return m.Error(errs, err)

Check warning on line 194 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L191-L194

Added lines #L191 - L194 were not covered by tests
}

return nil

Check warning on line 197 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L197

Added line #L197 was not covered by tests
}
func (m *Mutex) createRequest() LockRequest {
return LockRequest{
ID: m.id,
Topic: m.topic,
Key: m.key,
TTL: m.ttl,
}
}

func (m *Mutex) waitExpires(ctx context.Context, cancel context.CancelFunc) {
defer cancel()
var expiresOn time.Time
Expand Down Expand Up @@ -248,3 +254,55 @@ func (m *Mutex) keepalive(ctx context.Context, cancel context.CancelFunc) {
}
}
}

// Error try unwrap consensus known error
func (m *Mutex) Error(errs []error, err error) error {
consensus := make(map[rpc.ServerError]int)

for _, err := range errs {
s, ok := err.(rpc.ServerError)
if ok {
c, ok := consensus[s]
if !ok {
consensus[s] = 1
} else {
consensus[s] = c + 1
}
}
}

max := 0
var msg string

for k, v := range consensus {
if v > max {
max = v
msg = string(k)
}
}

if max < m.consensus {
return err

Check warning on line 285 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L285

Added line #L285 was not covered by tests
}

if !strings.HasPrefix(msg, "dlm:") {
return err

Check warning on line 289 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L289

Added line #L289 was not covered by tests
}

switch msg {
case ErrExpiredLease.Error():
return ErrExpiredLease
case ErrNoLease.Error():
return ErrNoLease
case ErrNotYourLease.Error():
return ErrNotYourLease

Check warning on line 298 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L295-L298

Added lines #L295 - L298 were not covered by tests
case ErrLeaseExists.Error():
return ErrLeaseExists
case ErrFrozenTopic.Error():
return ErrFrozenTopic
case ErrBadDatabase.Error():
return ErrBadDatabase

Check warning on line 304 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L301-L304

Added lines #L301 - L304 were not covered by tests
}

return err

Check warning on line 307 in mutex.go

View check run for this annotation

Codecov / codecov/patch

mutex.go#L307

Added line #L307 was not covered by tests
}
Loading

0 comments on commit a433a03

Please sign in to comment.