Skip to content

Commit

Permalink
fix: wrong offset alignment in backup balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jun 13, 2021
1 parent ef62a72 commit 7497af0
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 60 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/buraksezer/olric
go 1.13

require (
github.com/buraksezer/connpool v0.4.0
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/buraksezer/pool v3.0.0+incompatible
github.com/cespare/xxhash v1.1.0
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/hashicorp/go-multierror v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZ
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/buraksezer/connpool v0.4.0 h1:fNLvWu0FOtJxL7Sqetm6+040mhZWVs8c6vrke14SEKw=
github.com/buraksezer/connpool v0.4.0/go.mod h1:qPiG7gKXo+EjrwG/yqn2StZM4ek6gcYnnGgFIVKN6b0=
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA=
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg=
github.com/buraksezer/pool v3.0.0+incompatible h1:MXcI3YkBnElnbJ8ZPIYqa0dia3qqHDNto1E004YxseA=
Expand Down
37 changes: 21 additions & 16 deletions internal/cluster/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owner
b.log.V(2).Printf("[ERROR] Failed to move %s: %s on PartID: %d to %s: %v", u.Name(), name, part.Id(), owner, err)
}
if err == nil {
// Delete the moved storage unit instance. The GC will free the allocated memory.
// Delete the moved storage unit instance. GC will free the allocated memory.
part.Map().Delete(name)
}
// if this returns true, the iteration continues
Expand All @@ -92,8 +92,8 @@ func (b *Balancer) primaryCopies() {
break
}
if sign != b.rt.Signature() {
// Routing table is updated. Just quit. Another balancer goroutine will work on the
// new table immediately.
// Routing table is updated. Just quit. Another balancer goroutine
// will work on the new table immediately.
break
}

Expand All @@ -104,8 +104,9 @@ func (b *Balancer) primaryCopies() {
}

owner := part.Owner()
// Here we don't use CompareByID function because the routing table has an eventually consistent
// data structure and a node can try to move data to previous instance(the same name but a different birthdate)
// Here we don't use CompareByID function because the routing table is an
// eventually consistent data structure and a node can try to move data
// to previous instance(the same name but a different birthdate)
// of itself. So just check the name.
if owner.CompareByName(b.rt.This()) {
// Already belongs to me.
Expand All @@ -124,8 +125,8 @@ func (b *Balancer) backupCopies() {
}

if sign != b.rt.Signature() {
// Routing table is updated. Just quit. Another balancer goroutine will work on the
// new table immediately.
// Routing table is updated. Just quit. Another balancer goroutine
// will work on the new table immediately.
break
}

Expand All @@ -146,33 +147,37 @@ func (b *Balancer) backupCopies() {
continue
}

var ownerIds []uint64
var ownerIDs []uint64
offset := len(owners) - 1 - (b.config.ReplicaCount - 1)
if offset <= 0 {
offset = -1
}
for i := len(owners) - 1; i > offset; i-- {
owner := owners[i]
// Here we don't use cmpMembersById function because the routing table has an eventually consistent
// data structure and a node can try to move data to previous instance(the same name but a different birthdate)
// Here we don't use CompareById function because the routing table
// is an eventually consistent data structure and a node can try to
// move data to previous instance(the same name but a different birthdate)
// of itself. So just check the name.
if b.rt.This().CompareByName(owner) {
// Already belongs to me.
continue
}
ownerIds = append(ownerIds, owner.ID)
ownerIDs = append(ownerIDs, owner.ID)
}

for _, ownerId := range ownerIds {
for _, ownerID := range ownerIDs {
if !b.isAlive() {
break
}
if sign != b.rt.Signature() {
// Routing table is updated. Just quit. Another balancer goroutine will work on the
// new table immediately.
// Routing table is updated. Just quit. Another balancer goroutine
// will work on the new table immediately.
break
}

owner, err := b.rt.Discovery().FindMemberByID(ownerId)
owner, err := b.rt.Discovery().FindMemberByID(ownerID)
if err != nil {
b.log.V(2).Printf("[ERROR] Failed to get host by ownerId: %d: %v", ownerId, err)
b.log.V(2).Printf("[ERROR] Failed to get host by ownerId: %d: %v", ownerID, err)
continue
}
b.scanPartition(sign, part, owner)
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/partitions/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (p *Partition) OwnerCount() int {
return len(owners.([]discovery.Member))
}

// Owners loads the partition owners from atomic.value and returns.
// Owners loads the partition owners from atomic.Value and returns.
func (p *Partition) Owners() []discovery.Member {
owners := p.owners.Load()
if owners == nil {
Expand Down
8 changes: 5 additions & 3 deletions internal/dmap/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dmap

import (
"errors"
"fmt"
"github.com/buraksezer/olric/pkg/neterrors"

Expand All @@ -34,7 +35,7 @@ type fragmentPack struct {

func (dm *DMap) selectVersionForMerge(f *fragment, hkey uint64, entry storage.Entry) (storage.Entry, error) {
current, err := f.storage.Get(hkey)
if err == storage.ErrKeyNotFound {
if errors.Is(err, storage.ErrKeyNotFound) {
return entry, nil
}
if err != nil {
Expand All @@ -47,7 +48,7 @@ func (dm *DMap) selectVersionForMerge(f *fragment, hkey uint64, entry storage.En

func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) error {
f, err := dm.loadFragmentFromPartition(part)
if err == errFragmentNotFound {
if errors.Is(err, errFragmentNotFound) {
f, err = dm.createFragmentOnPartition(part)
}
if err != nil {
Expand All @@ -57,6 +58,7 @@ func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) err
// Acquire fragment's lock. No one should work on it.
f.Lock()
defer f.Unlock()

// TODO: This may be useless. Check it.
defer part.Map().Store(fp.Name, f)

Expand Down Expand Up @@ -89,7 +91,7 @@ func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) err
}
// TODO: Don't put the winner again if it comes from dm.storage
mergeErr = f.storage.Put(hkey, winner)
if mergeErr == storage.ErrFragmented {
if errors.Is(mergeErr, storage.ErrFragmented) {
dm.s.wg.Add(1)
go dm.s.callCompactionOnStorage(f)
mergeErr = nil
Expand Down
2 changes: 1 addition & 1 deletion internal/dmap/dmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"context"
"errors"
"fmt"
"github.com/buraksezer/olric/pkg/storage"
"time"

"github.com/buraksezer/olric/internal/bufpool"
"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/pkg/neterrors"
"github.com/buraksezer/olric/pkg/storage"
)

// pool is good for recycling memory while reading messages from the socket.
Expand Down
3 changes: 3 additions & 0 deletions internal/dmap/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ func (dm *DMap) readRepair(winner *version, versions []*version) {
winner.entry.Key(), dm.name, err)
return
}

f.Lock()
e := &env{
dmap: dm.name,
key: winner.entry.Key(),
Expand All @@ -248,6 +250,7 @@ func (dm *DMap) readRepair(winner *version, versions []*version) {
if err != nil {
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize with replica: %v", err)
}
f.Unlock()
} else {
// If readRepair is enabled, this function is called by every GET request.
var req *protocol.DMapMessage
Expand Down
7 changes: 5 additions & 2 deletions internal/kvstore/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package kvstore

import "fmt"
import (
"errors"
"fmt"
)

func (kv *KVStore) Compaction() (bool, error) {
if len(kv.tables) == 1 {
Expand All @@ -28,7 +31,7 @@ func (kv *KVStore) Compaction() (bool, error) {
for hkey := range old.hkeys {
entry, _ := old.getRaw(hkey)
err := fresh.putRaw(hkey, entry)
if err == errNotEnoughSpace {
if errors.Is(err, errNotEnoughSpace) {
// Create a new table and put the new k/v pair in it.
nt := newTable(kv.Stats().Inuse * 2)
kv.tables = append(kv.tables, nt)
Expand Down
46 changes: 28 additions & 18 deletions internal/transport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
package transport

import (
"context"
"fmt"
"net"
"os"
"sync"
"time"

"github.com/buraksezer/connpool"
"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/pool"
)

// Client is the client implementation for the internal TCP server.
Expand All @@ -32,7 +34,7 @@ type Client struct {

dialer *net.Dialer
config *config.Client
pools map[string]pool.Pool
pools map[string]connpool.Pool
}

// NewClient returns a new Client.
Expand All @@ -49,7 +51,7 @@ func NewClient(cc *config.Client) *Client {
c := &Client{
dialer: dialer,
config: cc,
pools: make(map[string]pool.Pool),
pools: make(map[string]connpool.Pool),
}
return c
}
Expand All @@ -62,7 +64,7 @@ func (c *Client) Close() {
p.Close()
}
// Reset pool
c.pools = make(map[string]pool.Pool)
c.pools = make(map[string]connpool.Pool)
}

// ClosePool closes the underlying connections in a pool,
Expand All @@ -81,34 +83,38 @@ func (c *Client) ClosePool(addr string) {
}

// pool creates a new pool for a given addr or returns an exiting one.
func (c *Client) pool(addr string) (pool.Pool, error) {
factory := func() (net.Conn, error) {
return c.dialer.Dial("tcp", addr)
}

func (c *Client) pool(addr string) (connpool.Pool, error) {
c.mu.Lock()
defer c.mu.Unlock()

cpool, ok := c.pools[addr]
p, ok := c.pools[addr]
if ok {
return cpool, nil
return p, nil
}

factory := func() (net.Conn, error) {
return c.dialer.Dial("tcp", addr)
}

cpool, err := pool.NewChannelPool(c.config.MinConn, c.config.MaxConn, factory)
p, err := connpool.NewChannelPool(c.config.MinConn, c.config.MaxConn, factory)
if err != nil {
return nil, err
}
c.pools[addr] = cpool
return cpool, nil
c.pools[addr] = p
return p, nil
}

func (c *Client) conn(addr string) (net.Conn, error) {
cpool, err := c.pool(addr)
p, err := c.pool(addr)
if err != nil {
return nil, err
}

conn, err := cpool.Get()
// TODO: Make this configurable
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()

conn, err := p.Get(ctx)
if err != nil {
return nil, err
}
Expand All @@ -128,6 +134,7 @@ func (c *Client) teardownConnWithTimeout(conn *ConnWithTimeout, dead bool) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Failed to unset timeouts on TCP connection: %v", err)
}
}

if err := conn.Close(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Failed to close connection: %v", err)
}
Expand All @@ -138,8 +145,11 @@ func (c *Client) teardownConn(rawConn net.Conn, dead bool) {
c.teardownConnWithTimeout(rawConn.(*ConnWithTimeout), dead)
return
}
pc, _ := rawConn.(*pool.PoolConn)
pc.MarkUnusable()

pc, _ := rawConn.(*connpool.PoolConn)
if dead {
pc.MarkUnusable()
}
err := pc.Close()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Failed to close connection: %v", err)
Expand Down
Loading

0 comments on commit 7497af0

Please sign in to comment.