Skip to content

Commit

Permalink
Revert "Remove locking"
Browse files Browse the repository at this point in the history
This reverts commit e22b756.
  • Loading branch information
bruth committed Apr 14, 2023
1 parent e22b756 commit f1e5e06
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions pkg/drivers/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/k3s-io/kine/pkg/drivers/nats/kv"
Expand Down Expand Up @@ -59,6 +60,9 @@ type Driver struct {
js nats.JetStreamContext
kv nats.KeyValue

dirMu *sync.RWMutex
subMus map[string]*sync.RWMutex

slowThreshold time.Duration
}

Expand All @@ -78,6 +82,23 @@ func getTopLevelKey(key string) string {
return ""
}

func (d *Driver) lockFolder(key string) (unlock func()) {
lockFolder := getTopLevelKey(key)
if lockFolder == "" {
return func() {}
}

d.dirMu.Lock()
mu, ok := d.subMus[lockFolder]
if !ok {
mu = &sync.RWMutex{}
d.subMus[lockFolder] = mu
}
d.dirMu.Unlock()
mu.Lock()
return mu.Unlock
}

type JSValue struct {
KV *server.KeyValue
PrevRevision int64
Expand Down Expand Up @@ -197,6 +218,8 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac

return &Driver{
kv: kvB,
dirMu: &sync.RWMutex{},
subMus: make(map[string]*sync.RWMutex),
js: js,
slowThreshold: config.slowThreshold,
}, nil
Expand Down Expand Up @@ -457,6 +480,9 @@ func (d *Driver) Create(ctx context.Context, key string, value []byte, lease int
d.logMethod(dur, fStr, key, len(value), lease, revRet, errRet, dur)
}()

// Lock the folder containing this key.
defer d.lockFolder(key)()

// check if key exists already
rev, prevKV, err := d.get(ctx, key, 0, true)
if err != nil && err != nats.ErrKeyNotFound {
Expand Down Expand Up @@ -510,6 +536,9 @@ func (d *Driver) Delete(ctx context.Context, key string, revision int64) (revRet
d.logMethod(dur, fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, dur)
}()

// Lock the folder containing this key.
defer d.lockFolder(key)()

rev, value, err := d.get(ctx, key, 0, true)
if err != nil {
if err == nats.ErrKeyNotFound {
Expand Down Expand Up @@ -763,6 +792,9 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision,
d.logMethod(dur, fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, dur)
}()

// Lock the folder containing the key.
defer d.lockFolder(key)()

rev, prevKV, err := d.get(ctx, key, 0, false)

if err != nil {
Expand Down

0 comments on commit f1e5e06

Please sign in to comment.