Skip to content

Commit

Permalink
fix: Race condition in DMap creation
Browse files Browse the repository at this point in the history
refactor: More consistent logging
  • Loading branch information
buraksezer committed Oct 8, 2020
1 parent 003e447 commit d6af61f
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 54 deletions.
23 changes: 4 additions & 19 deletions dmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,42 +51,31 @@ func (db *Olric) NewDMap(name string) (*DMap, error) {
}, nil
}

// createDMap creates and returns a new dmap, internal representation of a dmap.
// createDMap creates and returns a new dmap, internal representation of a dmap. This function is not thread-safe.
func (db *Olric) createDMap(part *partition, name string, str *storage.Storage) (*dmap, error) {
// We need to protect storage.New
part.Lock()
defer part.Unlock()

// Try to load one more time. Another goroutine may have created the dmap.
dm, ok := part.m.Load(name)
if ok {
return dm.(*dmap), nil
}

// create a new map here.
nm := &dmap{
storage: str,
}

if db.config.Cache != nil {
err := db.setCacheConfiguration(nm, name)
if err != nil {
return nil, err
}
}

// rebalancer code may send a storage instance for the new dmap. Just use it.
if nm.storage != nil {
nm.storage = str
} else {
nm.storage = storage.New(db.config.TableSize)
}

part.m.Store(name, nm)
return nm, nil
}

func (db *Olric) getOrCreateDMap(part *partition, name string) (*dmap, error) {
part.Lock()
defer part.Unlock()
dm, ok := part.m.Load(name)
if ok {
return dm.(*dmap), nil
Expand All @@ -102,9 +91,5 @@ func (db *Olric) getDMap(name string, hkey uint64) (*dmap, error) {

func (db *Olric) getBackupDMap(name string, hkey uint64) (*dmap, error) {
part := db.getBackupPartition(hkey)
dm, ok := part.m.Load(name)
if ok {
return dm.(*dmap), nil
}
return db.createDMap(part, name, nil)
return db.getOrCreateDMap(part, name)
}
4 changes: 2 additions & 2 deletions dmap_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error)
defer func() {
err := db.locker.Unlock(atomicKey)
if err != nil {
db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on dmap: %s: %v", w.key, w.dmap, err)
db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on DMap: %s: %v", w.key, w.dmap, err)
}
}()

Expand Down Expand Up @@ -105,7 +105,7 @@ func (db *Olric) getPut(w *writeop) ([]byte, error) {
defer func() {
err := db.locker.Unlock(atomicKey)
if err != nil {
db.log.V(3).Printf("[ERROR] Failed to release the lock for key: %s on dmap: %s: %v", w.key, w.dmap, err)
db.log.V(3).Printf("[ERROR] Failed to release the lock for key: %s on DMap: %s: %v", w.key, w.dmap, err)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion dmap_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (db *Olric) destroyDMap(name string) error {
db.log.V(6).Printf("[DEBUG] Calling Destroy command on %s for %s", addr, name)
_, err := db.requestTo(addr, req)
if err != nil {
db.log.V(3).Printf("[ERROR] Failed to destroy dmap: %s on %s", name, addr)
db.log.V(3).Printf("[ERROR] Failed to destroy DMap: %s on %s", name, addr)
}
return err
})
Expand Down
4 changes: 2 additions & 2 deletions dmap_eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (db *Olric) scanDMapForEviction(partID uint64, name string, dm *dmap) {
err := db.delKeyVal(dm, hkey, name, vdata.Key)
if err != nil {
// It will be tried again.
db.log.V(3).Printf("[ERROR] Failed to delete expired hkey: %d on dmap: %s: %v",
db.log.V(3).Printf("[ERROR] Failed to delete expired hkey: %d on DMap: %s: %v",
hkey, name, err)
return true // this means 'continue'
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func (db *Olric) evictKeyWithLRU(dm *dmap, name string) error {
return err
}
if db.log.V(6).Ok() {
db.log.V(6).Printf("[DEBUG] Evicted item on dmap: %s, key: %s with LRU", name, key)
db.log.V(6).Printf("[DEBUG] Evicted item on DMap: %s, key: %s with LRU", name, key)
}
return db.delKeyVal(dm, item.HKey, name, key)
}
4 changes: 2 additions & 2 deletions dmap_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (db *Olric) syncExpireOnCluster(hkey uint64, dm *dmap, w *writeop) error {
_, err := db.requestTo(owner.String(), req)
if err != nil {
if db.log.V(3).Ok() {
db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for dmap: %s: %v",
db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for DMap: %s: %v",
owner, w.dmap, err)
}
continue
Expand All @@ -80,7 +80,7 @@ func (db *Olric) syncExpireOnCluster(hkey uint64, dm *dmap, w *writeop) error {
err := db.localExpire(hkey, dm, w)
if err != nil {
if db.log.V(3).Ok() {
db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for dmap: %s: %v",
db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for DMap: %s: %v",
db.this, w.dmap, err)
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion dmap_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (db *Olric) lookupOnOwners(dm *dmap, hkey uint64, name, key string) []*vers
// the requested key can be found on a replica or a previous partition owner.
if db.log.V(5).Ok() {
db.log.V(5).Printf(
"[DEBUG] key: %s, HKey: %d on dmap: %s could not be found on the local storage: %v",
"[DEBUG] key: %s, HKey: %d on DMap: %s could not be found on the local storage: %v",
key, hkey, name, err)
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion dmap_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (db *Olric) unlockKey(name, key string, token []byte) error {
defer func() {
err := db.locker.Unlock(lkey)
if err != nil {
db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on dmap: %s: %v", key, name, err)
db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on DMap: %s: %v", key, name, err)
}
}()

Expand Down
4 changes: 2 additions & 2 deletions dmap_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (db *Olric) syncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error {
_, err := db.requestTo(owner.String(), req)
if err != nil {
if db.log.V(3).Ok() {
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for dmap: %s: %v", owner, w.dmap, err)
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for DMap: %s: %v", owner, w.dmap, err)
}
continue
}
Expand All @@ -188,7 +188,7 @@ func (db *Olric) syncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error {
err := db.localPut(hkey, dm, w)
if err != nil {
if db.log.V(3).Ok() {
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for dmap: %s: %v", db.this, w.dmap, err)
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for DMap: %s: %v", db.this, w.dmap, err)
}
} else {
successful++
Expand Down
40 changes: 19 additions & 21 deletions rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,20 @@ func (db *Olric) selectVersionForMerge(dm *dmap, hkey uint64, vdata *storage.VDa
}

func (db *Olric) mergeDMaps(part *partition, data *dmapbox) error {
str, err := storage.Import(data.Payload)
dm, err := db.getOrCreateDMap(part, data.Name)
if err != nil {
return err
}

tmp, exist := part.m.Load(data.Name)
if !exist {
// create a new dmap if it doesn't exist.
tmp, err = db.createDMap(part, data.Name, str)
if err != nil {
return err
}
}

// Acquire dmap's lock. No one should work on it.
dm := tmp.(*dmap)
dm.Lock()
defer dm.Unlock()
defer part.m.Store(data.Name, dm)

str, err := storage.Import(data.Payload)
if err != nil {
return err
}

// Merge accessLog.
if dm.cache != nil && dm.cache.accessLog != nil {
Expand All @@ -118,11 +114,14 @@ func (db *Olric) mergeDMaps(part *partition, data *dmapbox) error {
dm.cache.Unlock()
}

// We do not need the following loop if the dmap is created here.
if !exist {
if dm.storage.Len() == 0 {
// DMap has no keys. Set the imported storage instance.
// The old one will be garbage collected.
dm.storage = str
return nil
}

// DMap has some keys. Merge with the new one.
var mergeErr error
str.Range(func(hkey uint64, vdata *storage.VData) bool {
winner, err := db.selectVersionForMerge(dm, hkey, vdata)
Expand Down Expand Up @@ -171,11 +170,11 @@ func (db *Olric) rebalancePrimaryPartitions() {
}
// This is a previous owner. Move the keys.
part.m.Range(func(name, dm interface{}) bool {
db.log.V(2).Printf("[INFO] Moving dmap: %s (backup: %v) on PartID: %d to %s",
db.log.V(2).Printf("[INFO] Moving DMap: %s (backup: %v) on PartID: %d to %s",
name, part.backup, partID, owner)
err := db.moveDMap(part, name.(string), dm.(*dmap), owner)
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to move dmap: %s on PartID: %d to %s: %v",
db.log.V(2).Printf("[ERROR] Failed to move DMap: %s on PartID: %d to %s: %v",
name, partID, owner, err)
}
// if this returns true, the iteration continues
Expand Down Expand Up @@ -232,11 +231,11 @@ func (db *Olric) rebalanceBackupPartitions() {
}

part.m.Range(func(name, dm interface{}) bool {
db.log.V(2).Printf("[INFO] Moving dmap: %s (backup: %v) on PartID: %d to %s",
db.log.V(2).Printf("[INFO] Moving DMap: %s (backup: %v) on PartID: %d to %s",
name, part.backup, partID, owner)
err := db.moveDMap(part, name.(string), dm.(*dmap), owner)
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to move backup dmap: %s on PartID: %d to %s: %v",
db.log.V(2).Printf("[ERROR] Failed to move backup DMap: %s on PartID: %d to %s: %v",
name, partID, owner, err)
}
// if this returns true, the iteration continues
Expand Down Expand Up @@ -294,20 +293,19 @@ func (db *Olric) moveDMapOperation(w, r protocol.EncodeDecoder) {
}
// Check ownership before merging. This is useful to prevent data corruption in network partitioning case.
if !db.checkOwnership(part) {
db.log.V(2).Printf("[ERROR] Received dmap: %s on PartID: %d (backup: %v) doesn't belong to me",
db.log.V(2).Printf("[ERROR] Received DMap: %s on PartID: %d (backup: %v) doesn't belong to me",
box.Name, box.PartID, box.Backup)

err := fmt.Errorf("partID: %d (backup: %v) doesn't belong to %s: %w", box.PartID, box.Backup, db.this, ErrInvalidArgument)
db.errorResponse(w, err)
return
}

db.log.V(2).Printf("[INFO] Received dmap (backup:%v): %s on PartID: %d",
db.log.V(2).Printf("[INFO] Received DMap (backup:%v): %s on PartID: %d",
box.Backup, box.Name, box.PartID)

err = db.mergeDMaps(part, box)
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to merge dmap: %v", err)
db.log.V(2).Printf("[ERROR] Failed to merge DMap: %v", err)
db.errorResponse(w, err)
return
}
Expand Down
7 changes: 4 additions & 3 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func (db *Olric) distributeBackups(partID uint64) []discovery.Member {
}
if count == 0 {
// Delete it.
db.log.V(5).Printf("[DEBUG] Empty backup partition found. PartID: %d on %s", partID, backup)
owners = append(owners[:i], owners[i+1:]...)
i--
}
Expand Down Expand Up @@ -206,7 +205,6 @@ func (db *Olric) distributePrimaryCopies(partID uint64) []discovery.Member {
continue
}
if count == 0 {
db.log.V(6).Printf("[DEBUG] PartID: %d on %s is empty", partID, owner)
// Empty partition. Delete it from ownership list.
owners = append(owners[:i], owners[i+1:]...)
i--
Expand Down Expand Up @@ -283,7 +281,10 @@ func (db *Olric) updateRoutingTableOnCluster(table routingTable) (map[discovery.
return nil
})
}
return ownershipReports, g.Wait()
if err := g.Wait(); err != nil {
return nil, err
}
return ownershipReports, nil
}

func (db *Olric) updateRouting() {
Expand Down

0 comments on commit d6af61f

Please sign in to comment.