Skip to content

Commit

Permalink
refactor: eliminate potential race condition in fragment creation #99
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jun 20, 2021
1 parent ad678ec commit 99ec290
Show file tree
Hide file tree
Showing 18 changed files with 173 additions and 97 deletions.
6 changes: 3 additions & 3 deletions internal/cluster/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owners

for _, owner := range owners {
b.log.V(2).Printf("[INFO] Moving %s: %s (kind: %s) on PartID: %d to %s",
u.Name(), name, part.Kind(), part.Id(), owner)
u.Name(), name, part.Kind(), part.ID(), owner)

err := u.Move(part.Id(), part.Kind(), name.(string), owner)
err := u.Move(part.ID(), part.Kind(), name.(string), owner)

if err != nil {
b.log.V(2).Printf("[ERROR] Failed to move %s: %s on PartID: %d to %s: %v",
u.Name(), name, part.Id(), owner, err)
u.Name(), name, part.ID(), owner, err)
clean = false
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/cluster/partitions/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ type Partition struct {

id uint64
kind Kind
smap *sync.Map
m *sync.Map
owners atomic.Value
}

func (p *Partition) Kind() Kind {
return p.kind
}

func (p *Partition) Id() uint64 {
func (p *Partition) ID() uint64 {
return p.id
}

func (p *Partition) Map() *sync.Map {
return p.smap
return p.m
}

// Owner returns partition Owner. It's not thread-safe.
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/partitions/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestPartition(t *testing.T) {
p := Partition{
id: 1,
kind: PRIMARY,
smap: &sync.Map{},
m: &sync.Map{},
}

tmp := []discovery.Member{{
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/partitions/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func New(count uint64, kind Kind) *Partitions {
ps.m[i] = &Partition{
id: i,
kind: kind,
smap: &sync.Map{},
m: &sync.Map{},
}
}
return ps
Expand Down
8 changes: 4 additions & 4 deletions internal/cluster/partitions/partitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func TestPartitions(t *testing.T) {
t.Run("PartitionById", func(t *testing.T) {
for partID := uint64(0); partID < partitionCount; partID++ {
part := ps.PartitionByID(partID)
if part.Id() != partID {
t.Fatalf("Expected PartID: %d. Got: %d", partID, part.Id())
if part.ID() != partID {
t.Fatalf("Expected PartID: %d. Got: %d", partID, part.ID())
}
if part.Kind() != PRIMARY {
t.Fatalf("Expected Kind: %s. Got: %s", PRIMARY, part.Kind())
Expand All @@ -48,8 +48,8 @@ func TestPartitions(t *testing.T) {
t.Run("PartitionByHKey", func(t *testing.T) {
// 1 % 271 = 1
part := ps.PartitionByHKey(1)
if part.Id() != 1 {
t.Fatalf("Expected PartID: 1. Got: %d", part.Id())
if part.ID() != 1 {
t.Fatalf("Expected PartID: 1. Got: %d", part.ID())
}
})

Expand Down
5 changes: 1 addition & 4 deletions internal/dmap/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ func (dm *DMap) selectVersionForMerge(f *fragment, hkey uint64, entry storage.En
}

func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) error {
f, err := dm.loadFragmentFromPartition(part)
if errors.Is(err, errFragmentNotFound) {
f, err = dm.createFragmentOnPartition(part)
}
f, err := dm.loadOrCreateFragment(part)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/dmap/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func TestDMap_Merge_Fragments(t *testing.T) {
}
}
hkey := partitions.HKey("mymap", key)
f, err := dm.getFragment(hkey, partitions.PRIMARY)
part := dm.getPartitionByHKey(hkey, partitions.PRIMARY)
f, err := dm.loadFragment(part)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down
12 changes: 9 additions & 3 deletions internal/dmap/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ var (

func (dm *DMap) deleteBackupFromFragment(key string, kind partitions.Kind) error {
hkey := partitions.HKey(dm.name, key)
f, err := dm.getFragment(hkey, kind)
part := dm.getPartitionByHKey(hkey, kind)
f, err := dm.loadFragment(part)
if errors.Is(err, errFragmentNotFound) {
// key doesn't exist
return nil
}
if err != nil {
return err
}

f.Lock()
defer f.Unlock()

Expand Down Expand Up @@ -140,8 +142,12 @@ func (dm *DMap) deleteKey(key string) error {
return err
}

// notice that "delete" operation is run on the cluster.
f, err := dm.getOrCreateFragment(hkey, partitions.PRIMARY)
part := dm.getPartitionByHKey(hkey, partitions.PRIMARY)
f, err := dm.loadFragment(part)
if errors.Is(err, errFragmentNotFound) {
// notice that "delete" operation is run on the cluster.
err = nil
}
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/dmap/destroy_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package dmap

import (
"errors"

"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/protocol"
Expand All @@ -37,8 +39,8 @@ func (s *Service) destroyOperation(w, r protocol.EncodeDecoder) {
}

func (dm *DMap) destroyFragmentOnPartition(part *partitions.Partition) error {
f, err := dm.loadFragmentFromPartition(part)
if err == errFragmentNotFound {
f, err := dm.loadFragment(part)
if errors.Is(err, errFragmentNotFound) {
// not exists
return nil
}
Expand Down
49 changes: 1 addition & 48 deletions internal/dmap/dmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package dmap

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -38,7 +37,7 @@ var (
ErrDMapNotFound = errors.New("dmap not found")
)

// DMap implements a single hop distributed hash table.
// DMap implements a single-hop distributed hash table.
type DMap struct {
name string
s *Service
Expand Down Expand Up @@ -110,31 +109,6 @@ func (s *Service) getOrCreateDMap(name string) (*DMap, error) {
return dm, err
}

func (dm *DMap) loadFragmentFromPartition(part *partitions.Partition) (*fragment, error) {
f, ok := part.Map().Load(dm.name)
if !ok {
return nil, errFragmentNotFound
}
return f.(*fragment), nil
}

func (dm *DMap) createFragmentOnPartition(part *partitions.Partition) (*fragment, error) {
ctx, cancel := context.WithCancel(context.Background())
f := &fragment{
service: dm.s,
accessLog: newAccessLog(),
ctx: ctx,
cancel: cancel,
}
var err error
f.storage, err = dm.engine.Fork(nil)
if err != nil {
return nil, err
}
part.Map().Store(dm.name, f)
return f, nil
}

func (dm *DMap) getPartitionByHKey(hkey uint64, kind partitions.Kind) *partitions.Partition {
var part *partitions.Partition
switch {
Expand All @@ -148,27 +122,6 @@ func (dm *DMap) getPartitionByHKey(hkey uint64, kind partitions.Kind) *partition
return part
}

func (dm *DMap) getFragment(hkey uint64, kind partitions.Kind) (*fragment, error) {
part := dm.getPartitionByHKey(hkey, kind)
part.Lock()
defer part.Unlock()
return dm.loadFragmentFromPartition(part)
}

func (dm *DMap) getOrCreateFragment(hkey uint64, kind partitions.Kind) (*fragment, error) {
part := dm.getPartitionByHKey(hkey, kind)
part.Lock()
defer part.Unlock()

// try to get
f, err := dm.loadFragmentFromPartition(part)
if errors.Is(err, errFragmentNotFound) {
// create the fragment and return
return dm.createFragmentOnPartition(part)
}
return f, err
}

func timeoutToTTL(timeout time.Duration) int64 {
if timeout.Seconds() == 0 {
return 0
Expand Down
3 changes: 2 additions & 1 deletion internal/dmap/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (dm *DMap) isKeyIdleOnFragment(hkey uint64, f *fragment) bool {
}

func (dm *DMap) isKeyIdle(hkey uint64) bool {
f, err := dm.getFragment(hkey, partitions.PRIMARY)
part := dm.getPartitionByHKey(hkey, partitions.PRIMARY)
f, err := dm.loadFragment(part)
if errors.Is(err, errFragmentNotFound) {
// it's no possible to know whether the key is idle or not.
return false
Expand Down
13 changes: 9 additions & 4 deletions internal/dmap/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dmap

import (
"errors"
"fmt"
"time"

Expand All @@ -26,16 +27,19 @@ import (
)

func (dm *DMap) localExpireOnReplica(e *env) error {
f, err := dm.getFragment(e.hkey, partitions.BACKUP)
if err == errFragmentNotFound {
part := dm.getPartitionByHKey(e.hkey, partitions.BACKUP)
f, err := dm.loadFragment(part)
if errors.Is(err, errFragmentNotFound) {
return ErrKeyNotFound
}
if err != nil {
return err
}

e.fragment = f
f.Lock()
defer f.Unlock()

return dm.localExpire(e)
}

Expand Down Expand Up @@ -107,8 +111,9 @@ func (dm *DMap) syncExpireOnCluster(e *env) error {
}

func (dm *DMap) callExpireOnCluster(e *env) error {
f, err := dm.getFragment(e.hkey, partitions.PRIMARY)
if err == errFragmentNotFound {
part := dm.getPartitionByHKey(e.hkey, partitions.PRIMARY)
f, err := dm.loadFragment(part)
if errors.Is(err, errFragmentNotFound) {
return ErrKeyNotFound
}
if err != nil {
Expand Down
45 changes: 45 additions & 0 deletions internal/dmap/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,49 @@ func (f *fragment) Move(partID uint64, kind partitions.Kind, name string, owner
return err
}

func (dm *DMap) newFragment() (*fragment, error) {
str, err := dm.engine.Fork(nil)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
return &fragment{
service: dm.s,
accessLog: newAccessLog(),
storage: str,
ctx: ctx,
cancel: cancel,
}, nil
}

func (dm *DMap) loadOrCreateFragment(part *partitions.Partition) (*fragment, error) {
part.Lock()
defer part.Unlock()

// Creating a new fragment is our critical section here.
// It should be protected by a lock.

fg, ok := part.Map().Load(dm.name)
if ok {
return fg.(*fragment), nil
}

f, err := dm.newFragment()
if err != nil {
return nil, err
}

part.Map().Store(dm.name, f)
return f, nil
}

func (dm *DMap) loadFragment(part *partitions.Partition) (*fragment, error) {
f, ok := part.Map().Load(dm.name)
if !ok {
return nil, errFragmentNotFound
}
return f.(*fragment), nil
}

var _ partitions.Fragment = (*fragment)(nil)
Loading

0 comments on commit 99ec290

Please sign in to comment.