From 7497af05958f5d996c1a30af5ce4ed6b2ffc4037 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Sun, 13 Jun 2021 20:34:00 +0300 Subject: [PATCH] fix: wrong offset alignment in backup balancer --- go.mod | 2 +- go.sum | 2 ++ internal/cluster/balancer/balancer.go | 37 ++++++++++--------- internal/cluster/partitions/partition.go | 2 +- internal/dmap/balance.go | 8 +++-- internal/dmap/dmap.go | 2 +- internal/dmap/get.go | 3 ++ internal/kvstore/compaction.go | 7 ++-- internal/transport/client.go | 46 ++++++++++++++---------- internal/transport/server.go | 22 ++++++------ internal/transport/stream.go | 6 ++-- internal/transport/timeout.go | 8 ++--- 12 files changed, 85 insertions(+), 60 deletions(-) diff --git a/go.mod b/go.mod index 570720e5..53ca01c6 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/buraksezer/olric go 1.13 require ( + github.com/buraksezer/connpool v0.4.0 github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 - github.com/buraksezer/pool v3.0.0+incompatible github.com/cespare/xxhash v1.1.0 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/hashicorp/go-multierror v1.0.0 diff --git a/go.sum b/go.sum index 7169b7ca..c15bc377 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/buraksezer/connpool v0.4.0 h1:fNLvWu0FOtJxL7Sqetm6+040mhZWVs8c6vrke14SEKw= +github.com/buraksezer/connpool v0.4.0/go.mod h1:qPiG7gKXo+EjrwG/yqn2StZM4ek6gcYnnGgFIVKN6b0= github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA= github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg= github.com/buraksezer/pool v3.0.0+incompatible h1:MXcI3YkBnElnbJ8ZPIYqa0dia3qqHDNto1E004YxseA= diff --git a/internal/cluster/balancer/balancer.go b/internal/cluster/balancer/balancer.go index 6eaaa998..083f0af1 100644 --- a/internal/cluster/balancer/balancer.go +++ b/internal/cluster/balancer/balancer.go @@ -77,7 +77,7 @@ func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owner b.log.V(2).Printf("[ERROR] Failed to move %s: %s on PartID: %d to %s: %v", u.Name(), name, part.Id(), owner, err) } if err == nil { - // Delete the moved storage unit instance. The GC will free the allocated memory. + // Delete the moved storage unit instance. GC will free the allocated memory. part.Map().Delete(name) } // if this returns true, the iteration continues @@ -92,8 +92,8 @@ func (b *Balancer) primaryCopies() { break } if sign != b.rt.Signature() { - // Routing table is updated. Just quit. Another balancer goroutine will work on the - // new table immediately. + // Routing table is updated. Just quit. Another balancer goroutine + // will work on the new table immediately. break } @@ -104,8 +104,9 @@ func (b *Balancer) primaryCopies() { } owner := part.Owner() - // Here we don't use CompareByID function because the routing table has an eventually consistent - // data structure and a node can try to move data to previous instance(the same name but a different birthdate) + // Here we don't use CompareByID function because the routing table is an + // eventually consistent data structure and a node can try to move data + // to previous instance(the same name but a different birthdate) // of itself. So just check the name. if owner.CompareByName(b.rt.This()) { // Already belongs to me. @@ -124,8 +125,8 @@ func (b *Balancer) backupCopies() { } if sign != b.rt.Signature() { - // Routing table is updated. Just quit. Another balancer goroutine will work on the - // new table immediately. + // Routing table is updated. Just quit. Another balancer goroutine + // will work on the new table immediately. break } @@ -146,33 +147,37 @@ func (b *Balancer) backupCopies() { continue } - var ownerIds []uint64 + var ownerIDs []uint64 offset := len(owners) - 1 - (b.config.ReplicaCount - 1) + if offset <= 0 { + offset = -1 + } for i := len(owners) - 1; i > offset; i-- { owner := owners[i] - // Here we don't use cmpMembersById function because the routing table has an eventually consistent - // data structure and a node can try to move data to previous instance(the same name but a different birthdate) + // Here we don't use CompareById function because the routing table + // is an eventually consistent data structure and a node can try to + // move data to previous instance(the same name but a different birthdate) // of itself. So just check the name. if b.rt.This().CompareByName(owner) { // Already belongs to me. continue } - ownerIds = append(ownerIds, owner.ID) + ownerIDs = append(ownerIDs, owner.ID) } - for _, ownerId := range ownerIds { + for _, ownerID := range ownerIDs { if !b.isAlive() { break } if sign != b.rt.Signature() { - // Routing table is updated. Just quit. Another balancer goroutine will work on the - // new table immediately. + // Routing table is updated. Just quit. Another balancer goroutine + // will work on the new table immediately. break } - owner, err := b.rt.Discovery().FindMemberByID(ownerId) + owner, err := b.rt.Discovery().FindMemberByID(ownerID) if err != nil { - b.log.V(2).Printf("[ERROR] Failed to get host by ownerId: %d: %v", ownerId, err) + b.log.V(2).Printf("[ERROR] Failed to get host by ownerId: %d: %v", ownerID, err) continue } b.scanPartition(sign, part, owner) diff --git a/internal/cluster/partitions/partition.go b/internal/cluster/partitions/partition.go index 0c873821..7f81ab4b 100644 --- a/internal/cluster/partitions/partition.go +++ b/internal/cluster/partitions/partition.go @@ -65,7 +65,7 @@ func (p *Partition) OwnerCount() int { return len(owners.([]discovery.Member)) } -// Owners loads the partition owners from atomic.value and returns. +// Owners loads the partition owners from atomic.Value and returns. func (p *Partition) Owners() []discovery.Member { owners := p.owners.Load() if owners == nil { diff --git a/internal/dmap/balance.go b/internal/dmap/balance.go index a93c1b81..137f1cea 100644 --- a/internal/dmap/balance.go +++ b/internal/dmap/balance.go @@ -15,6 +15,7 @@ package dmap import ( + "errors" "fmt" "github.com/buraksezer/olric/pkg/neterrors" @@ -34,7 +35,7 @@ type fragmentPack struct { func (dm *DMap) selectVersionForMerge(f *fragment, hkey uint64, entry storage.Entry) (storage.Entry, error) { current, err := f.storage.Get(hkey) - if err == storage.ErrKeyNotFound { + if errors.Is(err, storage.ErrKeyNotFound) { return entry, nil } if err != nil { @@ -47,7 +48,7 @@ func (dm *DMap) selectVersionForMerge(f *fragment, hkey uint64, entry storage.En func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) error { f, err := dm.loadFragmentFromPartition(part) - if err == errFragmentNotFound { + if errors.Is(err, errFragmentNotFound) { f, err = dm.createFragmentOnPartition(part) } if err != nil { @@ -57,6 +58,7 @@ func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) err // Acquire fragment's lock. No one should work on it. f.Lock() defer f.Unlock() + // TODO: This may be useless. Check it. defer part.Map().Store(fp.Name, f) @@ -89,7 +91,7 @@ func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) err } // TODO: Don't put the winner again if it comes from dm.storage mergeErr = f.storage.Put(hkey, winner) - if mergeErr == storage.ErrFragmented { + if errors.Is(mergeErr, storage.ErrFragmented) { dm.s.wg.Add(1) go dm.s.callCompactionOnStorage(f) mergeErr = nil diff --git a/internal/dmap/dmap.go b/internal/dmap/dmap.go index af7c7272..f6e64813 100644 --- a/internal/dmap/dmap.go +++ b/internal/dmap/dmap.go @@ -18,13 +18,13 @@ import ( "context" "errors" "fmt" - "github.com/buraksezer/olric/pkg/storage" "time" "github.com/buraksezer/olric/internal/bufpool" "github.com/buraksezer/olric/internal/cluster/partitions" "github.com/buraksezer/olric/internal/protocol" "github.com/buraksezer/olric/pkg/neterrors" + "github.com/buraksezer/olric/pkg/storage" ) // pool is good for recycling memory while reading messages from the socket. diff --git a/internal/dmap/get.go b/internal/dmap/get.go index 5fde6ff4..bfad0b08 100644 --- a/internal/dmap/get.go +++ b/internal/dmap/get.go @@ -235,6 +235,8 @@ func (dm *DMap) readRepair(winner *version, versions []*version) { winner.entry.Key(), dm.name, err) return } + + f.Lock() e := &env{ dmap: dm.name, key: winner.entry.Key(), @@ -248,6 +250,7 @@ func (dm *DMap) readRepair(winner *version, versions []*version) { if err != nil { dm.s.log.V(3).Printf("[ERROR] Failed to synchronize with replica: %v", err) } + f.Unlock() } else { // If readRepair is enabled, this function is called by every GET request. var req *protocol.DMapMessage diff --git a/internal/kvstore/compaction.go b/internal/kvstore/compaction.go index e4ed1a78..746bd8c3 100644 --- a/internal/kvstore/compaction.go +++ b/internal/kvstore/compaction.go @@ -14,7 +14,10 @@ package kvstore -import "fmt" +import ( + "errors" + "fmt" +) func (kv *KVStore) Compaction() (bool, error) { if len(kv.tables) == 1 { @@ -28,7 +31,7 @@ func (kv *KVStore) Compaction() (bool, error) { for hkey := range old.hkeys { entry, _ := old.getRaw(hkey) err := fresh.putRaw(hkey, entry) - if err == errNotEnoughSpace { + if errors.Is(err, errNotEnoughSpace) { // Create a new table and put the new k/v pair in it. nt := newTable(kv.Stats().Inuse * 2) kv.tables = append(kv.tables, nt) diff --git a/internal/transport/client.go b/internal/transport/client.go index d2711d32..c050f174 100644 --- a/internal/transport/client.go +++ b/internal/transport/client.go @@ -15,14 +15,16 @@ package transport import ( + "context" "fmt" "net" "os" "sync" + "time" + "github.com/buraksezer/connpool" "github.com/buraksezer/olric/config" "github.com/buraksezer/olric/internal/protocol" - "github.com/buraksezer/pool" ) // Client is the client implementation for the internal TCP server. @@ -32,7 +34,7 @@ type Client struct { dialer *net.Dialer config *config.Client - pools map[string]pool.Pool + pools map[string]connpool.Pool } // NewClient returns a new Client. @@ -49,7 +51,7 @@ func NewClient(cc *config.Client) *Client { c := &Client{ dialer: dialer, config: cc, - pools: make(map[string]pool.Pool), + pools: make(map[string]connpool.Pool), } return c } @@ -62,7 +64,7 @@ func (c *Client) Close() { p.Close() } // Reset pool - c.pools = make(map[string]pool.Pool) + c.pools = make(map[string]connpool.Pool) } // ClosePool closes the underlying connections in a pool, @@ -81,34 +83,38 @@ func (c *Client) ClosePool(addr string) { } // pool creates a new pool for a given addr or returns an exiting one. -func (c *Client) pool(addr string) (pool.Pool, error) { - factory := func() (net.Conn, error) { - return c.dialer.Dial("tcp", addr) - } - +func (c *Client) pool(addr string) (connpool.Pool, error) { c.mu.Lock() defer c.mu.Unlock() - cpool, ok := c.pools[addr] + p, ok := c.pools[addr] if ok { - return cpool, nil + return p, nil + } + + factory := func() (net.Conn, error) { + return c.dialer.Dial("tcp", addr) } - cpool, err := pool.NewChannelPool(c.config.MinConn, c.config.MaxConn, factory) + p, err := connpool.NewChannelPool(c.config.MinConn, c.config.MaxConn, factory) if err != nil { return nil, err } - c.pools[addr] = cpool - return cpool, nil + c.pools[addr] = p + return p, nil } func (c *Client) conn(addr string) (net.Conn, error) { - cpool, err := c.pool(addr) + p, err := c.pool(addr) if err != nil { return nil, err } - conn, err := cpool.Get() + // TODO: Make this configurable + ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second) + defer cancel() + + conn, err := p.Get(ctx) if err != nil { return nil, err } @@ -128,6 +134,7 @@ func (c *Client) teardownConnWithTimeout(conn *ConnWithTimeout, dead bool) { _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Failed to unset timeouts on TCP connection: %v", err) } } + if err := conn.Close(); err != nil { _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Failed to close connection: %v", err) } @@ -138,8 +145,11 @@ func (c *Client) teardownConn(rawConn net.Conn, dead bool) { c.teardownConnWithTimeout(rawConn.(*ConnWithTimeout), dead) return } - pc, _ := rawConn.(*pool.PoolConn) - pc.MarkUnusable() + + pc, _ := rawConn.(*connpool.PoolConn) + if dead { + pc.MarkUnusable() + } err := pc.Close() if err != nil { _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Failed to close connection: %v", err) diff --git a/internal/transport/server.go b/internal/transport/server.go index 63010be5..ccb5b90f 100644 --- a/internal/transport/server.go +++ b/internal/transport/server.go @@ -126,7 +126,7 @@ func (s *Server) SetDispatcher(f func(w, r protocol.EncodeDecoder)) { s.dispatcher = f } -func (s *Server) controlConnLifeCycle(conn io.ReadWriteCloser, connStatus *uint32, done chan struct{}) { +func (s *Server) controlLifeCycle(conn io.Closer, connStatus *uint32, done chan struct{}) { CurrentConnections.Increase(1) defer CurrentConnections.Decrease(1) @@ -184,8 +184,8 @@ func (s *Server) closeStream(req *protocol.StreamMessage, done chan struct{}) { } } -// processMessage waits for a new request, handles it and returns the appropriate response. -func (s *Server) processMessage(conn io.ReadWriteCloser, connStatus *uint32, done chan struct{}) error { +// handleMessage waits for a new request, handles it and returns the appropriate response. +func (s *Server) handleMessage(conn io.ReadWriteCloser, status *uint32, done chan struct{}) error { CommandsTotal.Increase(1) buf := bufferPool.Get() @@ -215,10 +215,10 @@ func (s *Server) processMessage(conn io.ReadWriteCloser, connStatus *uint32, don } // Mark connection as busy. - atomic.StoreUint32(connStatus, busyConn) + atomic.StoreUint32(status, busyConn) // Mark connection as idle before start waiting a new request - defer atomic.StoreUint32(connStatus, idleConn) + defer atomic.StoreUint32(status, idleConn) resp := req.Response(nil) @@ -235,8 +235,8 @@ func (s *Server) processMessage(conn io.ReadWriteCloser, connStatus *uint32, don return err } -// processConn waits for requests and calls request handlers to generate a response. The connections are reusable. -func (s *Server) processConn(conn io.ReadWriteCloser) { +// handleConn waits for requests and calls request handlers to generate a response. The connections are reusable. +func (s *Server) handleConn(conn io.ReadWriteCloser) { ConnectionsTotal.Increase(1) defer s.wg.Done() @@ -246,12 +246,12 @@ func (s *Server) processConn(conn io.ReadWriteCloser) { defer close(done) s.wg.Add(1) - go s.controlConnLifeCycle(conn, &connStatus, done) + go s.controlLifeCycle(conn, &connStatus, done) for { - // processMessage waits to read a message from the TCP socket. + // handleMessage waits to read a message from the TCP socket. // Then calls its handler to generate a response. - err := s.processMessage(conn, &connStatus, done) + err := s.handleMessage(conn, &connStatus, done) if err != nil { // The socket probably would have been closed by the client. if errors.Is(errors.Cause(err), io.EOF) || errors.Is(errors.Cause(err), protocol.ErrConnClosed) { @@ -291,7 +291,7 @@ func (s *Server) listenAndServe() error { } } s.wg.Add(1) - go s.processConn(conn) + go s.handleConn(conn) } } diff --git a/internal/transport/stream.go b/internal/transport/stream.go index ac9d795d..5da79778 100644 --- a/internal/transport/stream.go +++ b/internal/transport/stream.go @@ -20,8 +20,8 @@ import ( "io" "os" + "github.com/buraksezer/connpool" "github.com/buraksezer/olric/internal/protocol" - "github.com/buraksezer/pool" ) func readFromStream(conn io.ReadWriteCloser, bufCh chan<- protocol.EncodeDecoder, errCh chan<- error) { @@ -62,14 +62,14 @@ func (c *Client) CreateStream(ctx context.Context, addr string, read chan<- prot return err } - conn, err := p.Get() + conn, err := p.Get(ctx) if err != nil { return err } defer func() { // marks the connection not usable any more, to let the pool close it instead of returning it to pool. - pc, _ := conn.(*pool.PoolConn) + pc, _ := conn.(*connpool.PoolConn) pc.MarkUnusable() if err = pc.Close(); err != nil { _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Failed to close connection: %v", err) diff --git a/internal/transport/timeout.go b/internal/transport/timeout.go index 4868a0a6..f96d5fa6 100644 --- a/internal/transport/timeout.go +++ b/internal/transport/timeout.go @@ -18,7 +18,7 @@ import ( "net" "time" - "github.com/buraksezer/pool" + "github.com/buraksezer/connpool" ) // ConnWithTimeout denotes a composite type which can used to implement i/o timeout feature for TCP sockets. @@ -61,16 +61,16 @@ func (c *ConnWithTimeout) UnsetDeadline() error { } // MarkUnusable() marks the connection not usable any more, to let the pool close it instead of returning it to pool. -// Wrapper around pool.PoolConn.MarkUnusable +// Wrapper around connpool.PoolConn.MarkUnusable func (c *ConnWithTimeout) MarkUnusable() { - if conn, ok := c.Conn.(*pool.PoolConn); ok { + if conn, ok := c.Conn.(*connpool.PoolConn); ok { conn.MarkUnusable() } } // Close closes the connection. func (c *ConnWithTimeout) Close() error { - conn, ok := c.Conn.(*pool.PoolConn) + conn, ok := c.Conn.(*connpool.PoolConn) if ok { return conn.Close() }