Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Bring back pipelining feature to Golang client #174

Merged
merged 11 commits into from
Jul 25, 2022
22 changes: 20 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const DefaultScanCount = 10

// Member denotes a member of the Olric cluster.
type Member struct {
// Member name in the cluster
// Member name in the cluster. It's also host:port of the node.
Name string

// ID of the Member in the cluster. Hash of Name and Birthdate of the member
Expand Down Expand Up @@ -178,7 +178,8 @@ type DMap interface {
// after being decremented or an error.
Decr(ctx context.Context, key string, delta int) (int, error)

// GetPut atomically sets the key to value and returns the old value stored at key.
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
// previous value.
GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error)

// IncrByFloat atomically increments the key by delta. The return value is the new value
Expand Down Expand Up @@ -223,6 +224,19 @@ type DMap interface {
// is no global lock on DMaps. So if you call Put/PutEx and Destroy methods
// concurrently on the cluster, Put call may set new values to the DMap.
Destroy(ctx context.Context) error

// Pipeline is a mechanism to realise Redis Pipeline technique.
//
// Pipelining is a technique to extremely speed up processing by packing
// operations to batches, send them at once to Redis and read a replies in a
// singe step.
// See https://redis.io/topics/pipelining
//
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
// results in case of big pipelines and small read/write timeouts.
// Redis client has retransmission logic in case of timeouts, pipeline
// can be retransmitted and commands can be executed more than once.
Pipeline() (*DMapPipeline, error)
}

type statsConfig struct {
Expand Down Expand Up @@ -275,6 +289,10 @@ type Client interface {
// Members returns a thread-safe list of cluster members.
Members(ctx context.Context) ([]Member, error)

// RefreshMetadata fetches a list of available members and the latest routing
// table version. It also closes stale clients, if there are any.
RefreshMetadata(ctx context.Context) error

// Close stops background routines and frees allocated resources.
Close(ctx context.Context) error
}
103 changes: 82 additions & 21 deletions cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ package olric
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/buraksezer/olric/hasher"
"log"
"net"
"os"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/hasher"
"github.com/buraksezer/olric/internal/bufpool"
"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/discovery"
Expand Down Expand Up @@ -69,6 +72,10 @@ func processProtocolError(err error) error {
if err == redis.Nil {
return ErrKeyNotFound
}
if errors.Is(err, syscall.ECONNREFUSED) {
opErr := err.(*net.OpError)
return fmt.Errorf("%s %s %s: %w", opErr.Op, opErr.Net, opErr.Addr, ErrConnRefused)
}
return convertDMapError(protocol.ConvertError(err))
}

Expand All @@ -95,10 +102,7 @@ func (dm *ClusterDMap) writePutCommand(c *dmap.PutConfig, key string, value []by
return cmd
}

func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
hkey := partitions.HKey(dmap, key)
partID := hkey % cl.partitionCount

func (cl *ClusterClient) clientByPartID(partID uint64) (*redis.Client, error) {
raw := cl.routingTable.Load()
if raw == nil {
return nil, fmt.Errorf("routing table is empty")
Expand All @@ -118,6 +122,12 @@ func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
return cl.client.Get(primaryOwner), nil
}

func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
hkey := partitions.HKey(dmap, key)
partID := hkey % cl.partitionCount
return cl.clientByPartID(partID)
}

// Put sets the value for the given key. It overwrites any previous value for
// that key, and it's thread-safe. The key has to be a string. value type is arbitrary.
// It is safe to modify the contents of the arguments after Put returns but not before.
Expand Down Expand Up @@ -150,31 +160,33 @@ func (dm *ClusterDMap) Put(ctx context.Context, key string, value interface{}, o
return processProtocolError(cmd.Err())
}

func (dm *ClusterDMap) makeGetResponse(cmd *redis.StringCmd) (*GetResponse, error) {
raw, err := cmd.Bytes()
if err != nil {
return nil, processProtocolError(err)
}

e := dm.newEntry()
e.Decode(raw)
return &GetResponse{
entry: e,
}, nil
}

// Get gets the value for the given key. It returns ErrKeyNotFound if the DB
// does not contain the key. It's thread-safe. It is safe to modify the contents
// of the returned value. See GetResponse for the details.
func (dm *ClusterDMap) Get(ctx context.Context, key string) (*GetResponse, error) {
cmd := protocol.NewGet(dm.name, key).SetRaw().Command(ctx)
rc, err := dm.clusterClient.smartPick(dm.name, key)
if err != nil {
return nil, err
}

cmd := protocol.NewGet(dm.name, key).SetRaw().Command(ctx)
err = rc.Process(ctx, cmd)
if err != nil {
return nil, processProtocolError(err)
}

raw, err := cmd.Bytes()
if err != nil {
return nil, processProtocolError(err)
}

e := dm.newEntry()
e.Decode(raw)
return &GetResponse{
entry: e,
}, nil
return dm.makeGetResponse(cmd)
}

// Delete deletes values for the given keys. Delete will not return error
Expand Down Expand Up @@ -241,7 +253,8 @@ func (dm *ClusterDMap) Decr(ctx context.Context, key string, delta int) (int, er
return int(res), nil
}

// GetPut atomically sets the key to value and returns the old value stored at key.
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
// previous value.
func (dm *ClusterDMap) GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error) {
rc, err := dm.clusterClient.smartPick(dm.name, key)
if err != nil {
Expand Down Expand Up @@ -604,6 +617,43 @@ func (cl *ClusterClient) Members(ctx context.Context) ([]Member, error) {
return members, nil
}

// RefreshMetadata fetches a list of available members and the latest routing
// table version. It also closes stale clients, if there are any.
func (cl *ClusterClient) RefreshMetadata(ctx context.Context) error {
// Fetch a list of currently available cluster members.
var members []Member
var err error
for {
members, err = cl.Members(ctx)
if errors.Is(err, ErrConnRefused) {
err = nil
continue
}
if err != nil {
return err
}
break
}
// Use a map for fast access.
addresses := make(map[string]struct{})
for _, member := range members {
addresses[member.Name] = struct{}{}
}

// Clean stale client connections
for addr := range cl.client.Addresses() {
if _, ok := addresses[addr]; !ok {
// Gone
if err := cl.client.Close(addr); err != nil {
return err
}
}
}

// Re-fetch the routing table, we should use the latest routing table version.
return cl.fetchRoutingTable()
}

// Close stops background routines and frees allocated resources.
func (cl *ClusterClient) Close(ctx context.Context) error {
select {
Expand Down Expand Up @@ -705,7 +755,7 @@ func (cl *ClusterClient) fetchRoutingTablePeriodically() {
case <-ticker.C:
err := cl.fetchRoutingTable()
if err != nil {
cl.logger.Printf("[ERROR] Failed to fetch the latest routing table: %s", err)
cl.logger.Printf("[ERROR] Failed to fetch the latest version of the routing table: %s", err)
}
}
}
Expand Down Expand Up @@ -754,13 +804,24 @@ func NewClusterClient(addresses []string, options ...ClusterClientOption) (*Clus
cl.client.Get(address)
}

// Discover all cluster members
members, err := cl.Members(ctx)
if err != nil {
return nil, fmt.Errorf("error while discovering the cluster members: %w", err)
}
for _, member := range members {
cl.client.Get(member.Name)
}

// Hash function is required to target primary owners instead of random cluster members.
partitions.SetHashFunc(cc.hasher)

// Initial fetch.
// Initial fetch. ClusterClient targets the primary owners for a smooth and quick operation.
if err := cl.fetchRoutingTable(); err != nil {
return nil, err
}

// Refresh the routing table in every 15 seconds.
cl.wg.Add(1)
go cl.fetchRoutingTablePeriodically()

Expand Down
2 changes: 1 addition & 1 deletion cluster_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package olric

import (
"context"
"github.com/buraksezer/olric/hasher"
"log"
"os"
"testing"
"time"

"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/hasher"
"github.com/buraksezer/olric/internal/testutil"
"github.com/buraksezer/olric/stats"
"github.com/stretchr/testify/require"
Expand Down
2 changes: 1 addition & 1 deletion cluster_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (i *ClusterIterator) fetchRoutingTablePeriodically() {
return
case <-time.After(time.Second):
if err := i.fetchRoutingTable(); err != nil {
i.logger.Printf("[ERROR] Failed to fetch the latest routing table: %s", err)
i.logger.Printf("[ERROR] Failed to fetch the latest version of the routing table: %s", err)
}
}
}
Expand Down
34 changes: 33 additions & 1 deletion embedded_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,37 @@ type EmbeddedDMap struct {
storageEngine string
}

// Pipeline is a mechanism to realise Redis Pipeline technique.
//
// Pipelining is a technique to extremely speed up processing by packing
// operations to batches, send them at once to Redis and read a replies in a
// singe step.
// See https://redis.io/topics/pipelining
//
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
// results in case of big pipelines and small read/write timeouts.
// Redis client has retransmission logic in case of timeouts, pipeline
// can be retransmitted and commands can be executed more than once.
func (dm *EmbeddedDMap) Pipeline() (*DMapPipeline, error) {
cc, err := NewClusterClient([]string{dm.client.db.rt.This().String()})
if err != nil {
return nil, err
}
cdm, err := cc.NewDMap(dm.name)
if err != nil {
return nil, err
}
return cdm.Pipeline()
}

// RefreshMetadata fetches a list of available members and the latest routing
// table version. It also closes stale clients, if there are any. EmbeddedClient has
// this method to implement the Client interface. It doesn't need to refresh metadata manually.
func (e *EmbeddedClient) RefreshMetadata(_ context.Context) error {
// EmbeddedClient already has the latest metadata.
return nil
}

// Scan returns an iterator to loop over the keys.
//
// Available scan options:
Expand Down Expand Up @@ -152,7 +183,8 @@ func (dm *EmbeddedDMap) Name() string {
return dm.name
}

// GetPut atomically sets the key to value and returns the old value stored at key.
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
// previous value.
func (dm *EmbeddedDMap) GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error) {
e, err := dm.dm.GetPut(ctx, key, value)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package olric

import (
"context"
"errors"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -71,6 +72,12 @@ func TestIntegration_NodesJoinOrLeftDuringQuery(t *testing.T) {

for i := 0; i < 100000; i++ {
_, err = dm.Get(context.Background(), fmt.Sprintf("mykey-%d", i))
if errors.Is(err, ErrConnRefused) {
// Rewind
i--
require.NoError(t, c.RefreshMetadata(context.Background()))
continue
}
require.NoError(t, err)
if i == 5999 {
err = c.client.Close(db2.name)
Expand Down Expand Up @@ -411,6 +418,11 @@ func TestIntegration_Kill_Nodes_During_Operation(t *testing.T) {

for i := 0; i < 100000; i++ {
_, err = dm.Get(context.Background(), fmt.Sprintf("mykey-%d", i))
if errors.Is(err, ErrConnRefused) {
i--
fmt.Println(c.RefreshMetadata(context.Background()))
continue
}
require.NoError(t, err)
}
}
Expand Down
29 changes: 22 additions & 7 deletions internal/server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ func NewClient(c *config.Client) *Client {
}
}

func (c *Client) Addresses() map[string]struct{} {
c.mu.RLock()
defer c.mu.RUnlock()

addresses := make(map[string]struct{})
for address, _ := range c.clients {
addresses[address] = struct{}{}
}
return addresses
}

func (c *Client) Get(addr string) *redis.Client {
c.mu.RLock()
rc, ok := c.clients[addr]
Expand All @@ -69,22 +80,26 @@ func (c *Client) Get(addr string) *redis.Client {
return rc
}

func (c *Client) Pick() (*redis.Client, error) {
func (c *Client) pickNodeRoundRobin() (string, error) {
c.mu.RLock()
defer c.mu.RUnlock()

addr, err := c.roundRobin.Get()
if err == roundrobin.ErrEmptyInstance {
return nil, fmt.Errorf("no available client found")
return "", fmt.Errorf("no available client found")
}
if err != nil {
return nil, err
return "", err
}
rc, ok := c.clients[addr]
if !ok {
return nil, fmt.Errorf("client could not be found: %s", addr)
return addr, nil
}

func (c *Client) Pick() (*redis.Client, error) {
addr, err := c.pickNodeRoundRobin()
if err != nil {
return nil, err
}
return rc, nil
return c.Get(addr), nil
}

func (c *Client) Close(addr string) error {
Expand Down
Loading