Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store/redis): use redis SCAN #97

Merged
merged 5 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 106 additions & 40 deletions v4/store/redis/redis.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
// Package redis is a redis backed store implementation
package redis

import (
"context"
"fmt"
"time"

"github.com/go-redis/redis/v8"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
"go-micro.dev/v4/util/cmd"
)

// DefaultDatabase is the namespace that the store
// will use if no namespace is provided.
var (
DefaultDatabase = "micro"
DefaultTable = "micro"
)

type rkv struct {
ctx context.Context
options store.Options
Expand All @@ -33,49 +42,67 @@ func (r *rkv) Close() error {
}

func (r *rkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
options := store.ReadOptions{}
options.Table = r.options.Table
options := store.ReadOptions{
Table: r.options.Table,
}

for _, o := range opts {
o(&options)
}

var keys []string

rkey := fmt.Sprintf("%s%s", options.Table, key)
// Handle Prefix
// TODO suffix
if options.Prefix {
prefixKey := fmt.Sprintf("%s*", rkey)
fkeys, err := r.Client.Keys(r.ctx, prefixKey).Result()
if err != nil {
return nil, err
}
// TODO Limit Offset
var rkey string

keys = append(keys, fkeys...)
} else {
keys = []string{rkey}
switch {
case options.Prefix:
rkey = fmt.Sprintf("%s%s*", options.Table, key)
case options.Suffix:
rkey = fmt.Sprintf("%s*%s", options.Table, key)
default:
keys = []string{fmt.Sprintf("%s%s", options.Table, key)}
}

records := make([]*store.Record, 0, len(keys))
if len(keys) == 0 {
cursor := uint64(options.Offset)
count := int64(options.Limit)

for _, rkey = range keys {
val, err := r.Client.Get(r.ctx, rkey).Bytes()
for {
var err error

if err != nil && err == redis.Nil {
return nil, store.ErrNotFound
} else if err != nil {
return nil, err
var ks []string

ks, cursor, err = r.Client.Scan(r.ctx, cursor, rkey, count).Result()
if err != nil {
return nil, err
}

keys = append(keys, ks...)

if cursor == 0 {
break
}
}
}

records := make([]*store.Record, 0, len(keys))

// read all keys, continue on error
var val []byte

var d time.Duration

if val == nil {
return nil, store.ErrNotFound
var err error

for _, rkey = range keys {
val, err = r.Client.Get(r.ctx, rkey).Bytes()
if err != nil || val == nil {
continue
}

d, err := r.Client.TTL(r.ctx, rkey).Result()
d, err = r.Client.TTL(r.ctx, rkey).Result()
if err != nil {
return nil, err
continue
}

records = append(records, &store.Record{
Expand All @@ -85,47 +112,77 @@ func (r *rkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error
})
}

if len(keys) == 1 {
return records, err
}

// keys might have vanished since we scanned them, ignore errors
return records, nil
}

func (r *rkv) Delete(key string, opts ...store.DeleteOption) error {
options := store.DeleteOptions{}
options.Table = r.options.Table
options := store.DeleteOptions{
Table: r.options.Table,
}

for _, o := range opts {
o(&options)
}

rkey := fmt.Sprintf("%s%s", options.Table, key)

return r.Client.Del(r.ctx, rkey).Err()
}

func (r *rkv) Write(record *store.Record, opts ...store.WriteOption) error {
options := store.WriteOptions{}
options.Table = r.options.Table
options := store.WriteOptions{
Table: r.options.Table,
}

for _, o := range opts {
o(&options)
}

rkey := fmt.Sprintf("%s%s", options.Table, record.Key)

return r.Client.Set(r.ctx, rkey, record.Value, record.Expiry).Err()
}

func (r *rkv) List(opts ...store.ListOption) ([]string, error) {
options := store.ListOptions{}
options.Table = r.options.Table
options := store.ListOptions{
Table: r.options.Table,
}

for _, o := range opts {
o(&options)
}

keys, err := r.Client.Keys(r.ctx, "*").Result()
if err != nil {
return nil, err
key := fmt.Sprintf("%s%s*%s", options.Table, options.Prefix, options.Suffix)

cursor := uint64(options.Offset)

count := int64(options.Limit)

var allKeys []string

var keys []string

var err error

for {
keys, cursor, err = r.Client.Scan(r.ctx, cursor, key, count).Result()
if err != nil {
return nil, err
}

allKeys = append(allKeys, keys...)

if cursor == 0 {
break
}
}

return keys, nil
return allKeys, nil
}

func (r *rkv) Options() store.Options {
Expand All @@ -136,8 +193,14 @@ func (r *rkv) String() string {
return "redis"
}

// NewStore returns a redis store.
func NewStore(opts ...store.Option) store.Store {
var options store.Options
options := store.Options{
Database: DefaultDatabase,
Table: DefaultTable,
Logger: logger.DefaultLogger,
}

for _, o := range opts {
o(&options)
}
Expand All @@ -148,16 +211,19 @@ func NewStore(opts ...store.Option) store.Store {
}

if err := s.configure(); err != nil {
log.Fatal(err)
s.options.Logger.Log(logger.ErrorLevel, "Error configuring store ", err)
}

return s
}

func (r *rkv) configure() error {
if r.Client != nil {
r.Client.Close()
if err := r.Client.Close(); err != nil {
return err
}
}

r.Client = newUniversalClient(r.options)

return nil
Expand Down
31 changes: 19 additions & 12 deletions v4/store/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,12 @@ func Test_rkv_configure_cluster(t *testing.T) {
}

func Test_Store(t *testing.T) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
t.Skip()
url := os.Getenv("REDIS_URL")
if len(url) == 0 {
t.Skip("REDIS_URL not defined")
}
r := new(rkv)

// r.options = store.Options{Nodes: []string{"redis://:[email protected]:6379"}}
//r.options = store.Options{Nodes: []string{"127.0.0.1:6379"}}
r.options = store.Options{Nodes: []string{"redis://127.0.0.1:6379"}}

if err := r.configure(); err != nil {
t.Error(err)
return
}
r := NewStore(store.Nodes(url))

key := "myTest"
rec := store.Record{
Expand All @@ -201,16 +194,30 @@ func Test_Store(t *testing.T) {
if err != nil {
t.Errorf("Write Erroe. Error: %v", err)
}

rec1, err := r.Read(key)
if err != nil {
t.Errorf("Read Error. Error: %v\n", err)
}

keys, err := r.List()
if err != nil {
t.Errorf("listing error %v\n", err)
}
if len(keys) < 1 {
t.Errorf("not enough keys\n")
}

err = r.Delete(rec1[0].Key)
if err != nil {
t.Errorf("Delete error %v\n", err)
}
_, err = r.List()

keys, err = r.List()
if err != nil {
t.Errorf("listing error %v\n", err)
}
if len(keys) > 0 {
t.Errorf("too many keys\n")
}
}