Skip to content

Commit

Permalink
feat: Bring back pipelining feature to Golang client (#174)
Browse files Browse the repository at this point in the history
* feat: add initial implementation of new pipeline API

* feat: initial working version of pipelining

* feat: add atomic commands

* feat: add Pipeline method to DMap interface

* feat: add Discard and Close to DMapPipeline

* feat: add TestDMapPipeline_ErrNotReady

* fix: use the same key to test GetPut

* chore: add documentation to Pipeline implementation

* chore: add missing docs to Client interface

* feat: add ErrConnRefused to deal with dead cluster members

* chore: add inline documentation
  • Loading branch information
buraksezer authored Jul 25, 2022
1 parent b661514 commit 37aa6d3
Show file tree
Hide file tree
Showing 10 changed files with 1,120 additions and 34 deletions.
22 changes: 20 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const DefaultScanCount = 10

// Member denotes a member of the Olric cluster.
type Member struct {
// Member name in the cluster
// Member name in the cluster. It's also host:port of the node.
Name string

// ID of the Member in the cluster. Hash of Name and Birthdate of the member
Expand Down Expand Up @@ -178,7 +178,8 @@ type DMap interface {
// after being decremented or an error.
Decr(ctx context.Context, key string, delta int) (int, error)

// GetPut atomically sets the key to value and returns the old value stored at key.
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
// previous value.
GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error)

// IncrByFloat atomically increments the key by delta. The return value is the new value
Expand Down Expand Up @@ -223,6 +224,19 @@ type DMap interface {
// is no global lock on DMaps. So if you call Put/PutEx and Destroy methods
// concurrently on the cluster, Put call may set new values to the DMap.
Destroy(ctx context.Context) error

// Pipeline is a mechanism to realise Redis Pipeline technique.
//
// Pipelining is a technique to extremely speed up processing by packing
// operations to batches, send them at once to Redis and read a replies in a
// singe step.
// See https://redis.io/topics/pipelining
//
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
// results in case of big pipelines and small read/write timeouts.
// Redis client has retransmission logic in case of timeouts, pipeline
// can be retransmitted and commands can be executed more than once.
Pipeline() (*DMapPipeline, error)
}

type statsConfig struct {
Expand Down Expand Up @@ -275,6 +289,10 @@ type Client interface {
// Members returns a thread-safe list of cluster members.
Members(ctx context.Context) ([]Member, error)

// RefreshMetadata fetches a list of available members and the latest routing
// table version. It also closes stale clients, if there are any.
RefreshMetadata(ctx context.Context) error

// Close stops background routines and frees allocated resources.
Close(ctx context.Context) error
}
103 changes: 82 additions & 21 deletions cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ package olric
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/buraksezer/olric/hasher"
"log"
"net"
"os"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/hasher"
"github.com/buraksezer/olric/internal/bufpool"
"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/discovery"
Expand Down Expand Up @@ -69,6 +72,10 @@ func processProtocolError(err error) error {
if err == redis.Nil {
return ErrKeyNotFound
}
if errors.Is(err, syscall.ECONNREFUSED) {
opErr := err.(*net.OpError)
return fmt.Errorf("%s %s %s: %w", opErr.Op, opErr.Net, opErr.Addr, ErrConnRefused)
}
return convertDMapError(protocol.ConvertError(err))
}

Expand All @@ -95,10 +102,7 @@ func (dm *ClusterDMap) writePutCommand(c *dmap.PutConfig, key string, value []by
return cmd
}

func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
hkey := partitions.HKey(dmap, key)
partID := hkey % cl.partitionCount

func (cl *ClusterClient) clientByPartID(partID uint64) (*redis.Client, error) {
raw := cl.routingTable.Load()
if raw == nil {
return nil, fmt.Errorf("routing table is empty")
Expand All @@ -118,6 +122,12 @@ func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
return cl.client.Get(primaryOwner), nil
}

func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
hkey := partitions.HKey(dmap, key)
partID := hkey % cl.partitionCount
return cl.clientByPartID(partID)
}

// Put sets the value for the given key. It overwrites any previous value for
// that key, and it's thread-safe. The key has to be a string. value type is arbitrary.
// It is safe to modify the contents of the arguments after Put returns but not before.
Expand Down Expand Up @@ -150,31 +160,33 @@ func (dm *ClusterDMap) Put(ctx context.Context, key string, value interface{}, o
return processProtocolError(cmd.Err())
}

func (dm *ClusterDMap) makeGetResponse(cmd *redis.StringCmd) (*GetResponse, error) {
raw, err := cmd.Bytes()
if err != nil {
return nil, processProtocolError(err)
}

e := dm.newEntry()
e.Decode(raw)
return &GetResponse{
entry: e,
}, nil
}

// Get gets the value for the given key. It returns ErrKeyNotFound if the DB
// does not contain the key. It's thread-safe. It is safe to modify the contents
// of the returned value. See GetResponse for the details.
func (dm *ClusterDMap) Get(ctx context.Context, key string) (*GetResponse, error) {
cmd := protocol.NewGet(dm.name, key).SetRaw().Command(ctx)
rc, err := dm.clusterClient.smartPick(dm.name, key)
if err != nil {
return nil, err
}

cmd := protocol.NewGet(dm.name, key).SetRaw().Command(ctx)
err = rc.Process(ctx, cmd)
if err != nil {
return nil, processProtocolError(err)
}

raw, err := cmd.Bytes()
if err != nil {
return nil, processProtocolError(err)
}

e := dm.newEntry()
e.Decode(raw)
return &GetResponse{
entry: e,
}, nil
return dm.makeGetResponse(cmd)
}

// Delete deletes values for the given keys. Delete will not return error
Expand Down Expand Up @@ -241,7 +253,8 @@ func (dm *ClusterDMap) Decr(ctx context.Context, key string, delta int) (int, er
return int(res), nil
}

// GetPut atomically sets the key to value and returns the old value stored at key.
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
// previous value.
func (dm *ClusterDMap) GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error) {
rc, err := dm.clusterClient.smartPick(dm.name, key)
if err != nil {
Expand Down Expand Up @@ -604,6 +617,43 @@ func (cl *ClusterClient) Members(ctx context.Context) ([]Member, error) {
return members, nil
}

// RefreshMetadata fetches a list of available members and the latest routing
// table version. It also closes stale clients, if there are any.
func (cl *ClusterClient) RefreshMetadata(ctx context.Context) error {
// Fetch a list of currently available cluster members.
var members []Member
var err error
for {
members, err = cl.Members(ctx)
if errors.Is(err, ErrConnRefused) {
err = nil
continue
}
if err != nil {
return err
}
break
}
// Use a map for fast access.
addresses := make(map[string]struct{})
for _, member := range members {
addresses[member.Name] = struct{}{}
}

// Clean stale client connections
for addr := range cl.client.Addresses() {
if _, ok := addresses[addr]; !ok {
// Gone
if err := cl.client.Close(addr); err != nil {
return err
}
}
}

// Re-fetch the routing table, we should use the latest routing table version.
return cl.fetchRoutingTable()
}

// Close stops background routines and frees allocated resources.
func (cl *ClusterClient) Close(ctx context.Context) error {
select {
Expand Down Expand Up @@ -705,7 +755,7 @@ func (cl *ClusterClient) fetchRoutingTablePeriodically() {
case <-ticker.C:
err := cl.fetchRoutingTable()
if err != nil {
cl.logger.Printf("[ERROR] Failed to fetch the latest routing table: %s", err)
cl.logger.Printf("[ERROR] Failed to fetch the latest version of the routing table: %s", err)
}
}
}
Expand Down Expand Up @@ -754,13 +804,24 @@ func NewClusterClient(addresses []string, options ...ClusterClientOption) (*Clus
cl.client.Get(address)
}

// Discover all cluster members
members, err := cl.Members(ctx)
if err != nil {
return nil, fmt.Errorf("error while discovering the cluster members: %w", err)
}
for _, member := range members {
cl.client.Get(member.Name)
}

// Hash function is required to target primary owners instead of random cluster members.
partitions.SetHashFunc(cc.hasher)

// Initial fetch.
// Initial fetch. ClusterClient targets the primary owners for a smooth and quick operation.
if err := cl.fetchRoutingTable(); err != nil {
return nil, err
}

// Refresh the routing table in every 15 seconds.
cl.wg.Add(1)
go cl.fetchRoutingTablePeriodically()

Expand Down
2 changes: 1 addition & 1 deletion cluster_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package olric

import (
"context"
"github.com/buraksezer/olric/hasher"
"log"
"os"
"testing"
"time"

"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/hasher"
"github.com/buraksezer/olric/internal/testutil"
"github.com/buraksezer/olric/stats"
"github.com/stretchr/testify/require"
Expand Down
2 changes: 1 addition & 1 deletion cluster_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (i *ClusterIterator) fetchRoutingTablePeriodically() {
return
case <-time.After(time.Second):
if err := i.fetchRoutingTable(); err != nil {
i.logger.Printf("[ERROR] Failed to fetch the latest routing table: %s", err)
i.logger.Printf("[ERROR] Failed to fetch the latest version of the routing table: %s", err)
}
}
}
Expand Down
34 changes: 33 additions & 1 deletion embedded_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,37 @@ type EmbeddedDMap struct {
storageEngine string
}

// Pipeline is a mechanism to realise Redis Pipeline technique.
//
// Pipelining is a technique to extremely speed up processing by packing
// operations to batches, send them at once to Redis and read a replies in a
// singe step.
// See https://redis.io/topics/pipelining
//
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
// results in case of big pipelines and small read/write timeouts.
// Redis client has retransmission logic in case of timeouts, pipeline
// can be retransmitted and commands can be executed more than once.
func (dm *EmbeddedDMap) Pipeline() (*DMapPipeline, error) {
cc, err := NewClusterClient([]string{dm.client.db.rt.This().String()})
if err != nil {
return nil, err
}
cdm, err := cc.NewDMap(dm.name)
if err != nil {
return nil, err
}
return cdm.Pipeline()
}

// RefreshMetadata fetches a list of available members and the latest routing
// table version. It also closes stale clients, if there are any. EmbeddedClient has
// this method to implement the Client interface. It doesn't need to refresh metadata manually.
func (e *EmbeddedClient) RefreshMetadata(_ context.Context) error {
// EmbeddedClient already has the latest metadata.
return nil
}

// Scan returns an iterator to loop over the keys.
//
// Available scan options:
Expand Down Expand Up @@ -152,7 +183,8 @@ func (dm *EmbeddedDMap) Name() string {
return dm.name
}

// GetPut atomically sets the key to value and returns the old value stored at key.
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
// previous value.
func (dm *EmbeddedDMap) GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error) {
e, err := dm.dm.GetPut(ctx, key, value)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package olric

import (
"context"
"errors"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -71,6 +72,12 @@ func TestIntegration_NodesJoinOrLeftDuringQuery(t *testing.T) {

for i := 0; i < 100000; i++ {
_, err = dm.Get(context.Background(), fmt.Sprintf("mykey-%d", i))
if errors.Is(err, ErrConnRefused) {
// Rewind
i--
require.NoError(t, c.RefreshMetadata(context.Background()))
continue
}
require.NoError(t, err)
if i == 5999 {
err = c.client.Close(db2.name)
Expand Down Expand Up @@ -411,6 +418,11 @@ func TestIntegration_Kill_Nodes_During_Operation(t *testing.T) {

for i := 0; i < 100000; i++ {
_, err = dm.Get(context.Background(), fmt.Sprintf("mykey-%d", i))
if errors.Is(err, ErrConnRefused) {
i--
fmt.Println(c.RefreshMetadata(context.Background()))
continue
}
require.NoError(t, err)
}
}
Expand Down
29 changes: 22 additions & 7 deletions internal/server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ func NewClient(c *config.Client) *Client {
}
}

func (c *Client) Addresses() map[string]struct{} {
c.mu.RLock()
defer c.mu.RUnlock()

addresses := make(map[string]struct{})
for address, _ := range c.clients {
addresses[address] = struct{}{}
}
return addresses
}

func (c *Client) Get(addr string) *redis.Client {
c.mu.RLock()
rc, ok := c.clients[addr]
Expand All @@ -69,22 +80,26 @@ func (c *Client) Get(addr string) *redis.Client {
return rc
}

func (c *Client) Pick() (*redis.Client, error) {
func (c *Client) pickNodeRoundRobin() (string, error) {
c.mu.RLock()
defer c.mu.RUnlock()

addr, err := c.roundRobin.Get()
if err == roundrobin.ErrEmptyInstance {
return nil, fmt.Errorf("no available client found")
return "", fmt.Errorf("no available client found")
}
if err != nil {
return nil, err
return "", err
}
rc, ok := c.clients[addr]
if !ok {
return nil, fmt.Errorf("client could not be found: %s", addr)
return addr, nil
}

func (c *Client) Pick() (*redis.Client, error) {
addr, err := c.pickNodeRoundRobin()
if err != nil {
return nil, err
}
return rc, nil
return c.Get(addr), nil
}

func (c *Client) Close(addr string) error {
Expand Down
Loading

0 comments on commit 37aa6d3

Please sign in to comment.