Skip to content

Commit

Permalink
feat(store/redis): use redis SCAN (#97)
Browse files Browse the repository at this point in the history
* do not fail when redis cannot be initialized

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* make redis store use SCAN to list keys

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* read using scan with prefix and suffix

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* linter does not want to cuddle

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* only run redis unit test when REDIS_URL is configured

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

---------

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic authored Apr 5, 2023
1 parent b8cbef2 commit efd9191
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 52 deletions.
146 changes: 106 additions & 40 deletions v4/store/redis/redis.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
// Package redis is a redis backed store implementation
package redis

import (
"context"
"fmt"
"time"

"github.com/go-redis/redis/v8"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
"go-micro.dev/v4/util/cmd"
)

// DefaultDatabase is the namespace that the store
// will use if no namespace is provided.
var (
DefaultDatabase = "micro"
DefaultTable = "micro"
)

type rkv struct {
ctx context.Context
options store.Options
Expand All @@ -33,49 +42,67 @@ func (r *rkv) Close() error {
}

func (r *rkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
options := store.ReadOptions{}
options.Table = r.options.Table
options := store.ReadOptions{
Table: r.options.Table,
}

for _, o := range opts {
o(&options)
}

var keys []string

rkey := fmt.Sprintf("%s%s", options.Table, key)
// Handle Prefix
// TODO suffix
if options.Prefix {
prefixKey := fmt.Sprintf("%s*", rkey)
fkeys, err := r.Client.Keys(r.ctx, prefixKey).Result()
if err != nil {
return nil, err
}
// TODO Limit Offset
var rkey string

keys = append(keys, fkeys...)
} else {
keys = []string{rkey}
switch {
case options.Prefix:
rkey = fmt.Sprintf("%s%s*", options.Table, key)
case options.Suffix:
rkey = fmt.Sprintf("%s*%s", options.Table, key)
default:
keys = []string{fmt.Sprintf("%s%s", options.Table, key)}
}

records := make([]*store.Record, 0, len(keys))
if len(keys) == 0 {
cursor := uint64(options.Offset)
count := int64(options.Limit)

for _, rkey = range keys {
val, err := r.Client.Get(r.ctx, rkey).Bytes()
for {
var err error

if err != nil && err == redis.Nil {
return nil, store.ErrNotFound
} else if err != nil {
return nil, err
var ks []string

ks, cursor, err = r.Client.Scan(r.ctx, cursor, rkey, count).Result()
if err != nil {
return nil, err
}

keys = append(keys, ks...)

if cursor == 0 {
break
}
}
}

records := make([]*store.Record, 0, len(keys))

// read all keys, continue on error
var val []byte

var d time.Duration

if val == nil {
return nil, store.ErrNotFound
var err error

for _, rkey = range keys {
val, err = r.Client.Get(r.ctx, rkey).Bytes()
if err != nil || val == nil {
continue
}

d, err := r.Client.TTL(r.ctx, rkey).Result()
d, err = r.Client.TTL(r.ctx, rkey).Result()
if err != nil {
return nil, err
continue
}

records = append(records, &store.Record{
Expand All @@ -85,47 +112,77 @@ func (r *rkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error
})
}

if len(keys) == 1 {
return records, err
}

// keys might have vanished since we scanned them, ignore errors
return records, nil
}

func (r *rkv) Delete(key string, opts ...store.DeleteOption) error {
options := store.DeleteOptions{}
options.Table = r.options.Table
options := store.DeleteOptions{
Table: r.options.Table,
}

for _, o := range opts {
o(&options)
}

rkey := fmt.Sprintf("%s%s", options.Table, key)

return r.Client.Del(r.ctx, rkey).Err()
}

func (r *rkv) Write(record *store.Record, opts ...store.WriteOption) error {
options := store.WriteOptions{}
options.Table = r.options.Table
options := store.WriteOptions{
Table: r.options.Table,
}

for _, o := range opts {
o(&options)
}

rkey := fmt.Sprintf("%s%s", options.Table, record.Key)

return r.Client.Set(r.ctx, rkey, record.Value, record.Expiry).Err()
}

func (r *rkv) List(opts ...store.ListOption) ([]string, error) {
options := store.ListOptions{}
options.Table = r.options.Table
options := store.ListOptions{
Table: r.options.Table,
}

for _, o := range opts {
o(&options)
}

keys, err := r.Client.Keys(r.ctx, "*").Result()
if err != nil {
return nil, err
key := fmt.Sprintf("%s%s*%s", options.Table, options.Prefix, options.Suffix)

cursor := uint64(options.Offset)

count := int64(options.Limit)

var allKeys []string

var keys []string

var err error

for {
keys, cursor, err = r.Client.Scan(r.ctx, cursor, key, count).Result()
if err != nil {
return nil, err
}

allKeys = append(allKeys, keys...)

if cursor == 0 {
break
}
}

return keys, nil
return allKeys, nil
}

func (r *rkv) Options() store.Options {
Expand All @@ -136,8 +193,14 @@ func (r *rkv) String() string {
return "redis"
}

// NewStore returns a redis store.
func NewStore(opts ...store.Option) store.Store {
var options store.Options
options := store.Options{
Database: DefaultDatabase,
Table: DefaultTable,
Logger: logger.DefaultLogger,
}

for _, o := range opts {
o(&options)
}
Expand All @@ -148,16 +211,19 @@ func NewStore(opts ...store.Option) store.Store {
}

if err := s.configure(); err != nil {
log.Fatal(err)
s.options.Logger.Log(logger.ErrorLevel, "Error configuring store ", err)
}

return s
}

func (r *rkv) configure() error {
if r.Client != nil {
r.Client.Close()
if err := r.Client.Close(); err != nil {
return err
}
}

r.Client = newUniversalClient(r.options)

return nil
Expand Down
31 changes: 19 additions & 12 deletions v4/store/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,12 @@ func Test_rkv_configure_cluster(t *testing.T) {
}

func Test_Store(t *testing.T) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
t.Skip()
url := os.Getenv("REDIS_URL")
if len(url) == 0 {
t.Skip("REDIS_URL not defined")
}
r := new(rkv)

// r.options = store.Options{Nodes: []string{"redis://:[email protected]:6379"}}
//r.options = store.Options{Nodes: []string{"127.0.0.1:6379"}}
r.options = store.Options{Nodes: []string{"redis://127.0.0.1:6379"}}

if err := r.configure(); err != nil {
t.Error(err)
return
}
r := NewStore(store.Nodes(url))

key := "myTest"
rec := store.Record{
Expand All @@ -201,16 +194,30 @@ func Test_Store(t *testing.T) {
if err != nil {
t.Errorf("Write Erroe. Error: %v", err)
}

rec1, err := r.Read(key)
if err != nil {
t.Errorf("Read Error. Error: %v\n", err)
}

keys, err := r.List()
if err != nil {
t.Errorf("listing error %v\n", err)
}
if len(keys) < 1 {
t.Errorf("not enough keys\n")
}

err = r.Delete(rec1[0].Key)
if err != nil {
t.Errorf("Delete error %v\n", err)
}
_, err = r.List()

keys, err = r.List()
if err != nil {
t.Errorf("listing error %v\n", err)
}
if len(keys) > 0 {
t.Errorf("too many keys\n")
}
}

0 comments on commit efd9191

Please sign in to comment.