From 99ec290868dc9516a5712aa94cb944b2aea00a3c Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Sun, 20 Jun 2021 15:18:10 +0300 Subject: [PATCH] refactor: eliminate potential race condition in fragment creation #99 --- internal/cluster/balancer/balancer.go | 6 +- internal/cluster/partitions/partition.go | 6 +- internal/cluster/partitions/partition_test.go | 2 +- internal/cluster/partitions/partitions.go | 2 +- .../cluster/partitions/partitions_test.go | 8 +- internal/dmap/balance.go | 5 +- internal/dmap/balance_test.go | 3 +- internal/dmap/delete.go | 12 ++- internal/dmap/destroy_operations.go | 6 +- internal/dmap/dmap.go | 49 +---------- internal/dmap/eviction.go | 3 +- internal/dmap/expire.go | 13 ++- internal/dmap/fragment.go | 45 ++++++++++ internal/dmap/fragment_test.go | 83 ++++++++++++++++--- internal/dmap/get.go | 9 +- internal/dmap/janitor.go | 4 +- internal/dmap/put.go | 9 +- internal/dmap/query_pipeline.go | 5 +- 18 files changed, 173 insertions(+), 97 deletions(-) diff --git a/internal/cluster/balancer/balancer.go b/internal/cluster/balancer/balancer.go index 7a7c2ef2..f181a0ae 100644 --- a/internal/cluster/balancer/balancer.go +++ b/internal/cluster/balancer/balancer.go @@ -70,13 +70,13 @@ func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owners for _, owner := range owners { b.log.V(2).Printf("[INFO] Moving %s: %s (kind: %s) on PartID: %d to %s", - u.Name(), name, part.Kind(), part.Id(), owner) + u.Name(), name, part.Kind(), part.ID(), owner) - err := u.Move(part.Id(), part.Kind(), name.(string), owner) + err := u.Move(part.ID(), part.Kind(), name.(string), owner) if err != nil { b.log.V(2).Printf("[ERROR] Failed to move %s: %s on PartID: %d to %s: %v", - u.Name(), name, part.Id(), owner, err) + u.Name(), name, part.ID(), owner, err) clean = false } } diff --git a/internal/cluster/partitions/partition.go b/internal/cluster/partitions/partition.go index 7f81ab4b..58a4fd48 100644 --- a/internal/cluster/partitions/partition.go +++ b/internal/cluster/partitions/partition.go @@ -27,7 +27,7 @@ type Partition struct { id uint64 kind Kind - smap *sync.Map + m *sync.Map owners atomic.Value } @@ -35,12 +35,12 @@ func (p *Partition) Kind() Kind { return p.kind } -func (p *Partition) Id() uint64 { +func (p *Partition) ID() uint64 { return p.id } func (p *Partition) Map() *sync.Map { - return p.smap + return p.m } // Owner returns partition Owner. It's not thread-safe. diff --git a/internal/cluster/partitions/partition_test.go b/internal/cluster/partitions/partition_test.go index 62f1d68b..c02c79f6 100644 --- a/internal/cluster/partitions/partition_test.go +++ b/internal/cluster/partitions/partition_test.go @@ -59,7 +59,7 @@ func TestPartition(t *testing.T) { p := Partition{ id: 1, kind: PRIMARY, - smap: &sync.Map{}, + m: &sync.Map{}, } tmp := []discovery.Member{{ diff --git a/internal/cluster/partitions/partitions.go b/internal/cluster/partitions/partitions.go index ea9c2002..2069a29a 100644 --- a/internal/cluster/partitions/partitions.go +++ b/internal/cluster/partitions/partitions.go @@ -54,7 +54,7 @@ func New(count uint64, kind Kind) *Partitions { ps.m[i] = &Partition{ id: i, kind: kind, - smap: &sync.Map{}, + m: &sync.Map{}, } } return ps diff --git a/internal/cluster/partitions/partitions_test.go b/internal/cluster/partitions/partitions_test.go index b7acbf94..c618f3fe 100644 --- a/internal/cluster/partitions/partitions_test.go +++ b/internal/cluster/partitions/partitions_test.go @@ -28,8 +28,8 @@ func TestPartitions(t *testing.T) { t.Run("PartitionById", func(t *testing.T) { for partID := uint64(0); partID < partitionCount; partID++ { part := ps.PartitionByID(partID) - if part.Id() != partID { - t.Fatalf("Expected PartID: %d. Got: %d", partID, part.Id()) + if part.ID() != partID { + t.Fatalf("Expected PartID: %d. Got: %d", partID, part.ID()) } if part.Kind() != PRIMARY { t.Fatalf("Expected Kind: %s. Got: %s", PRIMARY, part.Kind()) @@ -48,8 +48,8 @@ func TestPartitions(t *testing.T) { t.Run("PartitionByHKey", func(t *testing.T) { // 1 % 271 = 1 part := ps.PartitionByHKey(1) - if part.Id() != 1 { - t.Fatalf("Expected PartID: 1. Got: %d", part.Id()) + if part.ID() != 1 { + t.Fatalf("Expected PartID: 1. Got: %d", part.ID()) } }) diff --git a/internal/dmap/balance.go b/internal/dmap/balance.go index 5ad53564..b8492d8f 100644 --- a/internal/dmap/balance.go +++ b/internal/dmap/balance.go @@ -47,10 +47,7 @@ func (dm *DMap) selectVersionForMerge(f *fragment, hkey uint64, entry storage.En } func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) error { - f, err := dm.loadFragmentFromPartition(part) - if errors.Is(err, errFragmentNotFound) { - f, err = dm.createFragmentOnPartition(part) - } + f, err := dm.loadOrCreateFragment(part) if err != nil { return err } diff --git a/internal/dmap/balance_test.go b/internal/dmap/balance_test.go index f7918f82..35f2c1aa 100644 --- a/internal/dmap/balance_test.go +++ b/internal/dmap/balance_test.go @@ -44,7 +44,8 @@ func TestDMap_Merge_Fragments(t *testing.T) { } } hkey := partitions.HKey("mymap", key) - f, err := dm.getFragment(hkey, partitions.PRIMARY) + part := dm.getPartitionByHKey(hkey, partitions.PRIMARY) + f, err := dm.loadFragment(part) if err != nil { t.Fatalf("Expected nil. Got: %v", err) } diff --git a/internal/dmap/delete.go b/internal/dmap/delete.go index 9a69d73d..1097a959 100644 --- a/internal/dmap/delete.go +++ b/internal/dmap/delete.go @@ -35,7 +35,8 @@ var ( func (dm *DMap) deleteBackupFromFragment(key string, kind partitions.Kind) error { hkey := partitions.HKey(dm.name, key) - f, err := dm.getFragment(hkey, kind) + part := dm.getPartitionByHKey(hkey, kind) + f, err := dm.loadFragment(part) if errors.Is(err, errFragmentNotFound) { // key doesn't exist return nil @@ -43,6 +44,7 @@ func (dm *DMap) deleteBackupFromFragment(key string, kind partitions.Kind) error if err != nil { return err } + f.Lock() defer f.Unlock() @@ -140,8 +142,12 @@ func (dm *DMap) deleteKey(key string) error { return err } - // notice that "delete" operation is run on the cluster. - f, err := dm.getOrCreateFragment(hkey, partitions.PRIMARY) + part := dm.getPartitionByHKey(hkey, partitions.PRIMARY) + f, err := dm.loadFragment(part) + if errors.Is(err, errFragmentNotFound) { + // notice that "delete" operation is run on the cluster. + err = nil + } if err != nil { return err } diff --git a/internal/dmap/destroy_operations.go b/internal/dmap/destroy_operations.go index 2b31def5..89fa8580 100644 --- a/internal/dmap/destroy_operations.go +++ b/internal/dmap/destroy_operations.go @@ -15,6 +15,8 @@ package dmap import ( + "errors" + "github.com/buraksezer/olric/config" "github.com/buraksezer/olric/internal/cluster/partitions" "github.com/buraksezer/olric/internal/protocol" @@ -37,8 +39,8 @@ func (s *Service) destroyOperation(w, r protocol.EncodeDecoder) { } func (dm *DMap) destroyFragmentOnPartition(part *partitions.Partition) error { - f, err := dm.loadFragmentFromPartition(part) - if err == errFragmentNotFound { + f, err := dm.loadFragment(part) + if errors.Is(err, errFragmentNotFound) { // not exists return nil } diff --git a/internal/dmap/dmap.go b/internal/dmap/dmap.go index f6e64813..c6434672 100644 --- a/internal/dmap/dmap.go +++ b/internal/dmap/dmap.go @@ -15,7 +15,6 @@ package dmap import ( - "context" "errors" "fmt" "time" @@ -38,7 +37,7 @@ var ( ErrDMapNotFound = errors.New("dmap not found") ) -// DMap implements a single hop distributed hash table. +// DMap implements a single-hop distributed hash table. type DMap struct { name string s *Service @@ -110,31 +109,6 @@ func (s *Service) getOrCreateDMap(name string) (*DMap, error) { return dm, err } -func (dm *DMap) loadFragmentFromPartition(part *partitions.Partition) (*fragment, error) { - f, ok := part.Map().Load(dm.name) - if !ok { - return nil, errFragmentNotFound - } - return f.(*fragment), nil -} - -func (dm *DMap) createFragmentOnPartition(part *partitions.Partition) (*fragment, error) { - ctx, cancel := context.WithCancel(context.Background()) - f := &fragment{ - service: dm.s, - accessLog: newAccessLog(), - ctx: ctx, - cancel: cancel, - } - var err error - f.storage, err = dm.engine.Fork(nil) - if err != nil { - return nil, err - } - part.Map().Store(dm.name, f) - return f, nil -} - func (dm *DMap) getPartitionByHKey(hkey uint64, kind partitions.Kind) *partitions.Partition { var part *partitions.Partition switch { @@ -148,27 +122,6 @@ func (dm *DMap) getPartitionByHKey(hkey uint64, kind partitions.Kind) *partition return part } -func (dm *DMap) getFragment(hkey uint64, kind partitions.Kind) (*fragment, error) { - part := dm.getPartitionByHKey(hkey, kind) - part.Lock() - defer part.Unlock() - return dm.loadFragmentFromPartition(part) -} - -func (dm *DMap) getOrCreateFragment(hkey uint64, kind partitions.Kind) (*fragment, error) { - part := dm.getPartitionByHKey(hkey, kind) - part.Lock() - defer part.Unlock() - - // try to get - f, err := dm.loadFragmentFromPartition(part) - if errors.Is(err, errFragmentNotFound) { - // create the fragment and return - return dm.createFragmentOnPartition(part) - } - return f, err -} - func timeoutToTTL(timeout time.Duration) int64 { if timeout.Seconds() == 0 { return 0 diff --git a/internal/dmap/eviction.go b/internal/dmap/eviction.go index 2382d94e..ab6be895 100644 --- a/internal/dmap/eviction.go +++ b/internal/dmap/eviction.go @@ -48,7 +48,8 @@ func (dm *DMap) isKeyIdleOnFragment(hkey uint64, f *fragment) bool { } func (dm *DMap) isKeyIdle(hkey uint64) bool { - f, err := dm.getFragment(hkey, partitions.PRIMARY) + part := dm.getPartitionByHKey(hkey, partitions.PRIMARY) + f, err := dm.loadFragment(part) if errors.Is(err, errFragmentNotFound) { // it's no possible to know whether the key is idle or not. return false diff --git a/internal/dmap/expire.go b/internal/dmap/expire.go index d03acdfc..84557cf7 100644 --- a/internal/dmap/expire.go +++ b/internal/dmap/expire.go @@ -15,6 +15,7 @@ package dmap import ( + "errors" "fmt" "time" @@ -26,16 +27,19 @@ import ( ) func (dm *DMap) localExpireOnReplica(e *env) error { - f, err := dm.getFragment(e.hkey, partitions.BACKUP) - if err == errFragmentNotFound { + part := dm.getPartitionByHKey(e.hkey, partitions.BACKUP) + f, err := dm.loadFragment(part) + if errors.Is(err, errFragmentNotFound) { return ErrKeyNotFound } if err != nil { return err } + e.fragment = f f.Lock() defer f.Unlock() + return dm.localExpire(e) } @@ -107,8 +111,9 @@ func (dm *DMap) syncExpireOnCluster(e *env) error { } func (dm *DMap) callExpireOnCluster(e *env) error { - f, err := dm.getFragment(e.hkey, partitions.PRIMARY) - if err == errFragmentNotFound { + part := dm.getPartitionByHKey(e.hkey, partitions.PRIMARY) + f, err := dm.loadFragment(part) + if errors.Is(err, errFragmentNotFound) { return ErrKeyNotFound } if err != nil { diff --git a/internal/dmap/fragment.go b/internal/dmap/fragment.go index 6b67813f..57ec89e8 100644 --- a/internal/dmap/fragment.go +++ b/internal/dmap/fragment.go @@ -101,4 +101,49 @@ func (f *fragment) Move(partID uint64, kind partitions.Kind, name string, owner return err } +func (dm *DMap) newFragment() (*fragment, error) { + str, err := dm.engine.Fork(nil) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + return &fragment{ + service: dm.s, + accessLog: newAccessLog(), + storage: str, + ctx: ctx, + cancel: cancel, + }, nil +} + +func (dm *DMap) loadOrCreateFragment(part *partitions.Partition) (*fragment, error) { + part.Lock() + defer part.Unlock() + + // Creating a new fragment is our critical section here. + // It should be protected by a lock. + + fg, ok := part.Map().Load(dm.name) + if ok { + return fg.(*fragment), nil + } + + f, err := dm.newFragment() + if err != nil { + return nil, err + } + + part.Map().Store(dm.name, f) + return f, nil +} + +func (dm *DMap) loadFragment(part *partitions.Partition) (*fragment, error) { + f, ok := part.Map().Load(dm.name) + if !ok { + return nil, errFragmentNotFound + } + return f.(*fragment), nil +} + var _ partitions.Fragment = (*fragment)(nil) diff --git a/internal/dmap/fragment_test.go b/internal/dmap/fragment_test.go index 3b7f0eb0..20e1b103 100644 --- a/internal/dmap/fragment_test.go +++ b/internal/dmap/fragment_test.go @@ -15,13 +15,16 @@ package dmap import ( + "errors" + "sync" "testing" "github.com/buraksezer/olric/internal/cluster/partitions" "github.com/buraksezer/olric/internal/testcluster" + "github.com/buraksezer/olric/internal/testutil" ) -func TestDMapDMap_Fragment(t *testing.T) { +func TestDMap_Fragment(t *testing.T) { cluster := testcluster.New(NewService) s := cluster.AddMember(nil).(*Service) dm, err := s.NewDMap("mydmap") @@ -29,38 +32,92 @@ func TestDMapDMap_Fragment(t *testing.T) { t.Fatalf("Expected nil. Got: %v", err) } - t.Run("loadFragmentFromPartition", func(t *testing.T) { + t.Run("loadFragment", func(t *testing.T) { part := s.primary.PartitionByID(1) - _, err = dm.loadFragmentFromPartition(part) - if err != errFragmentNotFound { + _, err = dm.loadFragment(part) + if !errors.Is(err, errFragmentNotFound) { t.Fatalf("Expected %v. Got: %v", errFragmentNotFound, err) } }) - t.Run("createFragmentOnPartition", func(t *testing.T) { - part := s.primary.PartitionByID(1) - _, err = dm.createFragmentOnPartition(part) + t.Run("newFragment", func(t *testing.T) { + _, err := dm.newFragment() if err != nil { t.Fatalf("Expected nil. Got: %v", err) } }) - t.Run("getFragment -- errFragmentNotFound", func(t *testing.T) { - _, err = dm.getFragment(123, partitions.PRIMARY) - if err != errFragmentNotFound { + t.Run("loadFragment -- errFragmentNotFound", func(t *testing.T) { + part := dm.getPartitionByHKey(123, partitions.PRIMARY) + _, err := dm.loadFragment(part) + if !errors.Is(err, errFragmentNotFound) { t.Fatalf("Expected %v. Got: %v", errFragmentNotFound, err) } }) - t.Run("getOrCreateFragment", func(t *testing.T) { - _, err = dm.getOrCreateFragment(123, partitions.PRIMARY) + t.Run("loadOrCreateFragment", func(t *testing.T) { + part := dm.getPartitionByHKey(123, partitions.PRIMARY) + _, err = dm.loadOrCreateFragment(part) if err != nil { t.Fatalf("Expected nil. Got: %v", err) } - _, err = dm.getFragment(123, partitions.PRIMARY) + _, err := dm.loadFragment(part) if err != nil { t.Fatalf("Expected nil. Got: %v", err) } }) } + +func TestDMap_Fragment_Concurrent_Access(t *testing.T) { + cluster := testcluster.New(NewService) + s := cluster.AddMember(nil).(*Service) + dm, err := s.NewDMap("mydmap") + if err != nil { + t.Fatalf("Expected nil. Got: %v", err) + } + + part := dm.getPartitionByHKey(123, partitions.PRIMARY) + + var mtx sync.RWMutex + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + f, err := dm.loadOrCreateFragment(part) + if err != nil { + t.Errorf("Expected nil. Got: %v", err) + } + + e := f.storage.NewEntry() + e.SetKey(testutil.ToKey(idx)) + + mtx.Lock() + // storage engine is not thread-safe + err = f.storage.Put(uint64(idx), e) + mtx.Unlock() + + if err != nil { + t.Errorf("Expected nil. Got: %v", err) + } + }(i) + } + + wg.Wait() + + f, err := dm.loadFragment(part) + if err != nil { + t.Errorf("Expected nil. Got: %v", err) + } + for i := 0; i < 1000; i++ { + entry, err := f.storage.Get(uint64(i)) + if err != nil { + t.Fatalf("Expected nil. Got: %v", err) + } + if entry.Key() != testutil.ToKey(i) { + t.Fatalf("Expected key: %s. Got: %s", testutil.ToKey(i), entry.Key()) + } + } +} diff --git a/internal/dmap/get.go b/internal/dmap/get.go index 25958d25..379fbe9c 100644 --- a/internal/dmap/get.go +++ b/internal/dmap/get.go @@ -67,7 +67,8 @@ func (dm *DMap) unmarshalValue(raw []byte) (interface{}, error) { } func (dm *DMap) getOnFragment(e *env) (storage.Entry, error) { - f, err := dm.getFragment(e.hkey, e.kind) + part := dm.getPartitionByHKey(e.hkey, e.kind) + f, err := dm.loadFragment(part) if err != nil { return nil, err } @@ -112,7 +113,8 @@ func (dm *DMap) valueToVersion(value storage.Entry) *version { func (dm *DMap) lookupOnThisNode(hkey uint64, key string) *version { // Check on localhost, the partition owner. - f, err := dm.getFragment(hkey, partitions.PRIMARY) + part := dm.getPartitionByHKey(hkey, partitions.PRIMARY) + f, err := dm.loadFragment(part) if err != nil { if !errors.Is(err, errFragmentNotFound) { dm.s.log.V(3).Printf("[ERROR] Failed to get DMap fragment: %v", err) @@ -230,7 +232,8 @@ func (dm *DMap) readRepair(winner *version, versions []*version) { tmp := *version.host if tmp.CompareByID(dm.s.rt.This()) { hkey := partitions.HKey(dm.name, winner.entry.Key()) - f, err := dm.getOrCreateFragment(hkey, partitions.PRIMARY) + part := dm.getPartitionByHKey(hkey, partitions.PRIMARY) + f, err := dm.loadOrCreateFragment(part) if err != nil { dm.s.log.V(3).Printf("[ERROR] Failed to get or create the fragment for: %s on %s: %v", winner.entry.Key(), dm.name, err) diff --git a/internal/dmap/janitor.go b/internal/dmap/janitor.go index 2c218993..fb667065 100644 --- a/internal/dmap/janitor.go +++ b/internal/dmap/janitor.go @@ -49,12 +49,12 @@ func (s *Service) deleteEmptyFragments() { err := wipeOutFragment(part, name.(string), f) if err != nil { s.log.V(3).Printf("[ERROR] Failed to delete empty DMap fragment (kind: %s): %s on PartID: %d", - part.Kind(), name, part.Id()) + part.Kind(), name, part.ID()) // continue scanning return true } s.log.V(4).Printf("[INFO] Empty DMap fragment (kind: %s) has been deleted: %s on PartID: %d", - part.Kind(), name, part.Id()) + part.Kind(), name, part.ID()) return true }) } diff --git a/internal/dmap/put.go b/internal/dmap/put.go index dc3a4e91..9e70b283 100644 --- a/internal/dmap/put.go +++ b/internal/dmap/put.go @@ -80,13 +80,16 @@ func (dm *DMap) putOnFragment(e *env) error { } func (dm *DMap) putOnReplicaFragment(e *env) error { - f, err := dm.getOrCreateFragment(e.hkey, partitions.BACKUP) + part := dm.getPartitionByHKey(e.hkey, partitions.BACKUP) + f, err := dm.loadOrCreateFragment(part) if err != nil { return err } + e.fragment = f f.Lock() defer f.Unlock() + return dm.putOnFragment(e) } @@ -214,10 +217,12 @@ func (dm *DMap) checkPutConditions(e *env) error { } func (dm *DMap) putOnCluster(e *env) error { - f, err := dm.getOrCreateFragment(e.hkey, partitions.PRIMARY) + part := dm.getPartitionByHKey(e.hkey, partitions.PRIMARY) + f, err := dm.loadOrCreateFragment(part) if err != nil { return err } + e.fragment = f f.Lock() defer f.Unlock() diff --git a/internal/dmap/query_pipeline.go b/internal/dmap/query_pipeline.go index e77e5ef2..40c457f2 100644 --- a/internal/dmap/query_pipeline.go +++ b/internal/dmap/query_pipeline.go @@ -15,6 +15,7 @@ package dmap import ( + "errors" "fmt" "github.com/buraksezer/olric/pkg/storage" @@ -37,8 +38,8 @@ func newQueryPipeline(dm *DMap, partID uint64) *queryPipeline { func (p *queryPipeline) doOnKey(q query.M) error { part := p.dm.s.primary.PartitionByID(p.partID) - f, err := p.dm.loadFragmentFromPartition(part) - if err == errFragmentNotFound { + f, err := p.dm.loadFragment(part) + if errors.Is(err, errFragmentNotFound) { // there is nothing to do return nil }