From bb2960ed158e6b47e243d0563de71fd385b4296b Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Fri, 14 Apr 2023 12:46:57 -0400 Subject: [PATCH] Revert "Remove locking" This reverts commit e22b756b1fd4fe6aefc20365eceba08442465a6e. Signed-off-by: Byron Ruth --- pkg/drivers/nats/nats.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index 7d449f4f..3d8606c5 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -11,6 +11,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/k3s-io/kine/pkg/drivers/nats/kv" @@ -59,6 +60,9 @@ type Driver struct { js nats.JetStreamContext kv nats.KeyValue + dirMu *sync.RWMutex + subMus map[string]*sync.RWMutex + slowThreshold time.Duration } @@ -78,6 +82,23 @@ func getTopLevelKey(key string) string { return "" } +func (d *Driver) lockFolder(key string) (unlock func()) { + lockFolder := getTopLevelKey(key) + if lockFolder == "" { + return func() {} + } + + d.dirMu.Lock() + mu, ok := d.subMus[lockFolder] + if !ok { + mu = &sync.RWMutex{} + d.subMus[lockFolder] = mu + } + d.dirMu.Unlock() + mu.Lock() + return mu.Unlock +} + type JSValue struct { KV *server.KeyValue PrevRevision int64 @@ -197,6 +218,8 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac return &Driver{ kv: kvB, + dirMu: &sync.RWMutex{}, + subMus: make(map[string]*sync.RWMutex), js: js, slowThreshold: config.slowThreshold, }, nil @@ -457,6 +480,9 @@ func (d *Driver) Create(ctx context.Context, key string, value []byte, lease int d.logMethod(dur, fStr, key, len(value), lease, revRet, errRet, dur) }() + // Lock the folder containing this key. + defer d.lockFolder(key)() + // check if key exists already rev, prevKV, err := d.get(ctx, key, 0, true) if err != nil && err != nats.ErrKeyNotFound { @@ -510,6 +536,9 @@ func (d *Driver) Delete(ctx context.Context, key string, revision int64) (revRet d.logMethod(dur, fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, dur) }() + // Lock the folder containing this key. + defer d.lockFolder(key)() + rev, value, err := d.get(ctx, key, 0, true) if err != nil { if err == nats.ErrKeyNotFound { @@ -763,6 +792,9 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision, d.logMethod(dur, fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, dur) }() + // Lock the folder containing the key. + defer d.lockFolder(key)() + rev, prevKV, err := d.get(ctx, key, 0, false) if err != nil {