From efd9191305c586ef477801f6d658277359d04a01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Wed, 5 Apr 2023 23:00:06 +0200 Subject: [PATCH] feat(store/redis): use redis SCAN (#97) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * do not fail when redis cannot be initialized Signed-off-by: Jörn Friedrich Dreyer * make redis store use SCAN to list keys Signed-off-by: Jörn Friedrich Dreyer * read using scan with prefix and suffix Signed-off-by: Jörn Friedrich Dreyer * linter does not want to cuddle Signed-off-by: Jörn Friedrich Dreyer * only run redis unit test when REDIS_URL is configured Signed-off-by: Jörn Friedrich Dreyer --------- Signed-off-by: Jörn Friedrich Dreyer --- v4/store/redis/redis.go | 146 +++++++++++++++++++++++++---------- v4/store/redis/redis_test.go | 31 +++++--- 2 files changed, 125 insertions(+), 52 deletions(-) diff --git a/v4/store/redis/redis.go b/v4/store/redis/redis.go index de041d6f..4a755db9 100644 --- a/v4/store/redis/redis.go +++ b/v4/store/redis/redis.go @@ -1,15 +1,24 @@ +// Package redis is a redis backed store implementation package redis import ( "context" "fmt" + "time" "github.com/go-redis/redis/v8" - log "go-micro.dev/v4/logger" + "go-micro.dev/v4/logger" "go-micro.dev/v4/store" "go-micro.dev/v4/util/cmd" ) +// DefaultDatabase is the namespace that the store +// will use if no namespace is provided. +var ( + DefaultDatabase = "micro" + DefaultTable = "micro" +) + type rkv struct { ctx context.Context options store.Options @@ -33,8 +42,9 @@ func (r *rkv) Close() error { } func (r *rkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { - options := store.ReadOptions{} - options.Table = r.options.Table + options := store.ReadOptions{ + Table: r.options.Table, + } for _, o := range opts { o(&options) @@ -42,40 +52,57 @@ func (r *rkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error var keys []string - rkey := fmt.Sprintf("%s%s", options.Table, key) - // Handle Prefix - // TODO suffix - if options.Prefix { - prefixKey := fmt.Sprintf("%s*", rkey) - fkeys, err := r.Client.Keys(r.ctx, prefixKey).Result() - if err != nil { - return nil, err - } - // TODO Limit Offset + var rkey string - keys = append(keys, fkeys...) - } else { - keys = []string{rkey} + switch { + case options.Prefix: + rkey = fmt.Sprintf("%s%s*", options.Table, key) + case options.Suffix: + rkey = fmt.Sprintf("%s*%s", options.Table, key) + default: + keys = []string{fmt.Sprintf("%s%s", options.Table, key)} } - records := make([]*store.Record, 0, len(keys)) + if len(keys) == 0 { + cursor := uint64(options.Offset) + count := int64(options.Limit) - for _, rkey = range keys { - val, err := r.Client.Get(r.ctx, rkey).Bytes() + for { + var err error - if err != nil && err == redis.Nil { - return nil, store.ErrNotFound - } else if err != nil { - return nil, err + var ks []string + + ks, cursor, err = r.Client.Scan(r.ctx, cursor, rkey, count).Result() + if err != nil { + return nil, err + } + + keys = append(keys, ks...) + + if cursor == 0 { + break + } } + } + + records := make([]*store.Record, 0, len(keys)) + + // read all keys, continue on error + var val []byte + + var d time.Duration - if val == nil { - return nil, store.ErrNotFound + var err error + + for _, rkey = range keys { + val, err = r.Client.Get(r.ctx, rkey).Bytes() + if err != nil || val == nil { + continue } - d, err := r.Client.TTL(r.ctx, rkey).Result() + d, err = r.Client.TTL(r.ctx, rkey).Result() if err != nil { - return nil, err + continue } records = append(records, &store.Record{ @@ -85,47 +112,77 @@ func (r *rkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error }) } + if len(keys) == 1 { + return records, err + } + + // keys might have vanished since we scanned them, ignore errors return records, nil } func (r *rkv) Delete(key string, opts ...store.DeleteOption) error { - options := store.DeleteOptions{} - options.Table = r.options.Table + options := store.DeleteOptions{ + Table: r.options.Table, + } for _, o := range opts { o(&options) } rkey := fmt.Sprintf("%s%s", options.Table, key) + return r.Client.Del(r.ctx, rkey).Err() } func (r *rkv) Write(record *store.Record, opts ...store.WriteOption) error { - options := store.WriteOptions{} - options.Table = r.options.Table + options := store.WriteOptions{ + Table: r.options.Table, + } for _, o := range opts { o(&options) } rkey := fmt.Sprintf("%s%s", options.Table, record.Key) + return r.Client.Set(r.ctx, rkey, record.Value, record.Expiry).Err() } func (r *rkv) List(opts ...store.ListOption) ([]string, error) { - options := store.ListOptions{} - options.Table = r.options.Table + options := store.ListOptions{ + Table: r.options.Table, + } for _, o := range opts { o(&options) } - keys, err := r.Client.Keys(r.ctx, "*").Result() - if err != nil { - return nil, err + key := fmt.Sprintf("%s%s*%s", options.Table, options.Prefix, options.Suffix) + + cursor := uint64(options.Offset) + + count := int64(options.Limit) + + var allKeys []string + + var keys []string + + var err error + + for { + keys, cursor, err = r.Client.Scan(r.ctx, cursor, key, count).Result() + if err != nil { + return nil, err + } + + allKeys = append(allKeys, keys...) + + if cursor == 0 { + break + } } - return keys, nil + return allKeys, nil } func (r *rkv) Options() store.Options { @@ -136,8 +193,14 @@ func (r *rkv) String() string { return "redis" } +// NewStore returns a redis store. func NewStore(opts ...store.Option) store.Store { - var options store.Options + options := store.Options{ + Database: DefaultDatabase, + Table: DefaultTable, + Logger: logger.DefaultLogger, + } + for _, o := range opts { o(&options) } @@ -148,7 +211,7 @@ func NewStore(opts ...store.Option) store.Store { } if err := s.configure(); err != nil { - log.Fatal(err) + s.options.Logger.Log(logger.ErrorLevel, "Error configuring store ", err) } return s @@ -156,8 +219,11 @@ func NewStore(opts ...store.Option) store.Store { func (r *rkv) configure() error { if r.Client != nil { - r.Client.Close() + if err := r.Client.Close(); err != nil { + return err + } } + r.Client = newUniversalClient(r.options) return nil diff --git a/v4/store/redis/redis_test.go b/v4/store/redis/redis_test.go index 4b4af670..401a761a 100644 --- a/v4/store/redis/redis_test.go +++ b/v4/store/redis/redis_test.go @@ -176,19 +176,12 @@ func Test_rkv_configure_cluster(t *testing.T) { } func Test_Store(t *testing.T) { - if tr := os.Getenv("TRAVIS"); len(tr) > 0 { - t.Skip() + url := os.Getenv("REDIS_URL") + if len(url) == 0 { + t.Skip("REDIS_URL not defined") } - r := new(rkv) - // r.options = store.Options{Nodes: []string{"redis://:password@127.0.0.1:6379"}} - //r.options = store.Options{Nodes: []string{"127.0.0.1:6379"}} - r.options = store.Options{Nodes: []string{"redis://127.0.0.1:6379"}} - - if err := r.configure(); err != nil { - t.Error(err) - return - } + r := NewStore(store.Nodes(url)) key := "myTest" rec := store.Record{ @@ -201,16 +194,30 @@ func Test_Store(t *testing.T) { if err != nil { t.Errorf("Write Erroe. Error: %v", err) } + rec1, err := r.Read(key) if err != nil { t.Errorf("Read Error. Error: %v\n", err) } + + keys, err := r.List() + if err != nil { + t.Errorf("listing error %v\n", err) + } + if len(keys) < 1 { + t.Errorf("not enough keys\n") + } + err = r.Delete(rec1[0].Key) if err != nil { t.Errorf("Delete error %v\n", err) } - _, err = r.List() + + keys, err = r.List() if err != nil { t.Errorf("listing error %v\n", err) } + if len(keys) > 0 { + t.Errorf("too many keys\n") + } }