Skip to content

Commit

Permalink
refactor: add some integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Mar 20, 2022
1 parent 9822a9a commit 4a5e4ca
Show file tree
Hide file tree
Showing 70 changed files with 2,771 additions and 2,243 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The current production version is [v0.4.3](https://github.com/buraksezer/olric/t
* Designed to share some transient, approximate, fast-changing data between servers,
* Embeddable but can be used as a language-independent service with *olricd*,
* Supports different eviction algorithms,
* Fast binary protocol,
* Supports Redis protocol,
* Highly available and horizontally scalable,
* Provides best-effort consistency guarantees without being a complete CP (indeed PA/EC) solution,
* Supports replication by default (with sync and async options),
Expand Down
21 changes: 21 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (

const DefaultScanCount = 10

type Member struct {
Name string
ID uint64
Birthdate int64
Coordinator bool
}

type Iterator interface {
Next() bool
Key() string
Expand Down Expand Up @@ -177,11 +184,25 @@ type statsConfig struct {

type StatsOption func(*statsConfig)

type pubsubConfig struct {
Address string
}

func ToAddress(addr string) PubSubOption {
return func(cfg *pubsubConfig) {
cfg.Address = addr
}
}

type PubSubOption func(option *pubsubConfig)

type Client interface {
NewDMap(name string, options ...DMapOption) (DMap, error)
NewPubSub(options ...PubSubOption) (*PubSub, error)
Stats(ctx context.Context, options ...StatsOption) (stats.Stats, error)
Ping(ctx context.Context, addr string) error
PingWithMessage(ctx context.Context, addr, message string) (string, error)
RoutingTable(ctx context.Context) (RoutingTable, error)
Members(ctx context.Context) ([]Member, error)
Close(ctx context.Context) error
}
23 changes: 23 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,26 @@ func (db *Olric) routingTable(ctx context.Context) (RoutingTable, error) {
}
return mapToRoutingTable(slice)
}

func (db *Olric) clusterMembersCommandHandler(conn redcon.Conn, cmd redcon.Command) {
_, err := protocol.ParseClusterMembers(cmd)
if err != nil {
protocol.WriteError(conn, err)
return
}

coordinator := db.rt.Discovery().GetCoordinator()
members := db.rt.Discovery().GetMembers()
conn.WriteArray(len(members))
for _, member := range members {
conn.WriteArray(4)
conn.WriteBulkString(member.Name)
conn.WriteUint64(member.ID)
conn.WriteInt64(member.Birthdate)
if coordinator.CompareByID(member) {
conn.WriteBulkString("true")
} else {
conn.WriteBulkString("false")
}
}
}
101 changes: 89 additions & 12 deletions cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/internal/bufpool"
"github.com/buraksezer/olric/internal/dmap"
"github.com/buraksezer/olric/internal/encoding"
"github.com/buraksezer/olric/internal/kvstore/entry"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/resp"
"github.com/buraksezer/olric/internal/server"
"github.com/buraksezer/olric/pkg/storage"
"github.com/buraksezer/olric/stats"
Expand Down Expand Up @@ -97,7 +97,7 @@ func (dm *ClusterDMap) Put(ctx context.Context, key string, value interface{}, o
valueBuf := pool.Get()
defer pool.Put(valueBuf)

enc := encoding.New(valueBuf)
enc := resp.New(valueBuf)
err = enc.Encode(value)
if err != nil {
return err
Expand Down Expand Up @@ -203,7 +203,7 @@ func (dm *ClusterDMap) GetPut(ctx context.Context, key string, value interface{}
valueBuf := pool.Get()
defer pool.Put(valueBuf)

enc := encoding.New(valueBuf)
enc := resp.New(valueBuf)
err = enc.Encode(value)
if err != nil {
return nil, err
Expand Down Expand Up @@ -330,14 +330,13 @@ func (dm *ClusterDMap) Scan(ctx context.Context, options ...ScanOption) (Iterato
if sc.Count == 0 {
sc.Count = DefaultScanCount
}
if sc.Logger == nil {
sc.Logger = log.New(os.Stderr, "logger: ", log.Lshortfile)
}

ictx, cancel := context.WithCancel(ctx)
i := &ClusterIterator{
dm: dm,
clusterClient: dm.clusterClient,
config: &sc,
logger: dm.clusterClient.logger,
allKeys: make(map[string]struct{}),
finished: make(map[string]struct{}),
cursors: make(map[string]uint64),
Expand Down Expand Up @@ -372,6 +371,8 @@ func (dm *ClusterDMap) Destroy(ctx context.Context) error {

type ClusterClient struct {
client *server.Client
config *clusterClientConfig
logger *log.Logger
}

func (cl *ClusterClient) Ping(ctx context.Context, addr string) error {
Expand Down Expand Up @@ -465,10 +466,56 @@ func (cl *ClusterClient) Stats(ctx context.Context, options ...StatsOption) (sta
return s, nil
}

func (cl *ClusterClient) Members(ctx context.Context) ([]Member, error) {
rc, err := cl.client.Pick()
if err != nil {
return []Member{}, err
}

cmd := protocol.NewClusterMembers().Command(ctx)
err = rc.Process(ctx, cmd)
if err != nil {
return []Member{}, processProtocolError(err)
}

if err = cmd.Err(); err != nil {
return []Member{}, processProtocolError(err)
}

items, err := cmd.Slice()
if err != nil {
return []Member{}, processProtocolError(err)
}
var members []Member
for _, rawItem := range items {
m := Member{}
item := rawItem.([]interface{})
m.Name = item[0].(string)

switch id := item[1].(type) {
case uint64:
m.ID = id
case int64:
m.ID = uint64(id)
}

m.Birthdate = item[2].(int64)
if item[3] == "true" {
m.Coordinator = true
}
members = append(members, m)
}
return members, nil
}

func (cl *ClusterClient) Close(ctx context.Context) error {
return cl.client.Shutdown(ctx)
}

func (cl *ClusterClient) NewPubSub(options ...PubSubOption) (*PubSub, error) {
return newPubSub(cl.client, options...)
}

func (cl *ClusterClient) NewDMap(name string, options ...DMapOption) (DMap, error) {
var dc dmapConfig
for _, opt := range options {
Expand All @@ -489,24 +536,54 @@ func (cl *ClusterClient) NewDMap(name string, options ...DMapOption) (DMap, erro
}, nil
}

func NewClusterClient(addresses []string, c *config.Client) (*ClusterClient, error) {
type ClusterClientOption func(c *clusterClientConfig)

type clusterClientConfig struct {
logger *log.Logger
config *config.Client
}

func WithLogger(l *log.Logger) ClusterClientOption {
return func(cfg *clusterClientConfig) {
cfg.logger = l
}
}

func WithConfig(c *config.Client) ClusterClientOption {
return func(cfg *clusterClientConfig) {
cfg.config = c
}
}

func NewClusterClient(addresses []string, options ...ClusterClientOption) (*ClusterClient, error) {
if len(addresses) == 0 {
return nil, fmt.Errorf("addresses cannot be empty")
}

if c == nil {
c = config.NewClient()
var cc clusterClientConfig
for _, opt := range options {
opt(&cc)
}

if cc.logger == nil {
cc.logger = log.New(os.Stderr, "logger: ", log.Lshortfile)
}

if cc.config == nil {
cc.config = config.NewClient()
}

if err := c.Sanitize(); err != nil {
if err := cc.config.Sanitize(); err != nil {
return nil, err
}
if err := c.Validate(); err != nil {
if err := cc.config.Validate(); err != nil {
return nil, err
}

cl := &ClusterClient{
client: server.NewClient(c),
client: server.NewClient(cc.config),
config: &cc,
logger: cc.logger,
}
for _, address := range addresses {
cl.client.Get(address)
Expand Down
Loading

0 comments on commit 4a5e4ca

Please sign in to comment.