Skip to content

Commit

Permalink
fix: malfunctioning read repair and balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jun 20, 2021
1 parent 549d87c commit ad678ec
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
12 changes: 8 additions & 4 deletions internal/cluster/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (b *Balancer) isAlive() bool {
return true
}

func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owners... discovery.Member) bool {
func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owners ...discovery.Member) bool {
var clean = true
part.Map().Range(func(name, tmp interface{}) bool {
u := tmp.(partitions.Fragment)
Expand Down Expand Up @@ -145,18 +145,18 @@ LOOP:
}

part := b.backup.PartitionByID(partID)
if part.Length() == 0 || part.OwnerCount() == 0{
if part.Length() == 0 || part.OwnerCount() == 0 {
continue
}

var (
counter = 1
counter = 1
currentOwners []discovery.Member
)

owners := part.Owners()
for i := len(owners) - 1; i >= 0; i-- {
if counter >= b.config.ReplicaCount-1 {
if counter > b.config.ReplicaCount-1 {
break
}

Expand All @@ -173,6 +173,10 @@ LOOP:
currentOwners = append(currentOwners, owner)
}

if len(currentOwners) == 0 {
continue LOOP
}

if b.scanPartition(sign, part, currentOwners...) {
part.Map().Range(func(name, tmp interface{}) bool {
// Delete the moved storage unit instance. GC will free the allocated memory.
Expand Down
1 change: 0 additions & 1 deletion internal/cluster/routingtable/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package routingtable

import (
"errors"

"github.com/buraksezer/consistent"
"github.com/buraksezer/olric/internal/discovery"
"github.com/buraksezer/olric/internal/protocol"
Expand Down
19 changes: 10 additions & 9 deletions internal/dmap/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,15 @@ func (dm *DMap) sanitizeAndSortVersions(versions []*version) []*version {
}

func (dm *DMap) lookupOnReplicas(hkey uint64, key string) []*version {
var versions []*version
// Check backup.
backups := dm.s.backup.PartitionOwnersByHKey(hkey)
versions := make([]*version, 0, len(backups))
for _, replica := range backups {
req := protocol.NewDMapMessage(protocol.OpGetBackup)
req.SetDMap(dm.name)
req.SetKey(key)
ver := &version{host: &replica}
host := replica
v := &version{host: &host}
resp, err := dm.s.requestTo(replica.String(), req)
if err != nil {
if dm.s.log.V(3).Ok() {
Expand All @@ -212,21 +213,21 @@ func (dm *DMap) lookupOnReplicas(hkey uint64, key string) []*version {
} else {
data := dm.engine.NewEntry()
data.Decode(resp.Value())
ver.entry = data
v.entry = data
}
versions = append(versions, ver)
versions = append(versions, v)
}
return versions
}

func (dm *DMap) readRepair(winner *version, versions []*version) {
for _, ver := range versions {
if ver.entry != nil && winner.entry.Timestamp() == ver.entry.Timestamp() {
for _, version := range versions {
if version.entry != nil && winner.entry.Timestamp() == version.entry.Timestamp() {
continue
}

// Sync
tmp := *ver.host
tmp := *version.host
if tmp.CompareByID(dm.s.rt.This()) {
hkey := partitions.HKey(dm.name, winner.entry.Key())
f, err := dm.getOrCreateFragment(hkey, partitions.PRIMARY)
Expand Down Expand Up @@ -270,9 +271,9 @@ func (dm *DMap) readRepair(winner *version, versions []*version) {
TTL: winner.entry.TTL(),
})
}
_, err := dm.s.requestTo(ver.host.String(), req)
_, err := dm.s.requestTo(version.host.String(), req)
if err != nil {
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize replica %s: %v", ver.host, err)
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize replica %s: %v", version.host, err)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/dmap/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func TestDMap_Get_ReadRepair(t *testing.T) {
c2.ReplicaCount = 2
e2 := testcluster.NewEnvironment(c2)
s2 := cluster.AddMember(e2).(*Service)

defer cluster.Shutdown()

// Call DMap.Put on S1
Expand Down Expand Up @@ -230,7 +231,6 @@ func TestDMap_Get_ReadRepair(t *testing.T) {
c3.ReplicaCount = 2
e3 := testcluster.NewEnvironment(c3)
s3 := cluster.AddMember(e3).(*Service)
defer cluster.Shutdown()

// Call DMap.Get on S2
dm2, err := s3.NewDMap("mymap")
Expand Down

0 comments on commit ad678ec

Please sign in to comment.