Skip to content

Commit

Permalink
adjusted and extended redis cluster support
Browse files Browse the repository at this point in the history
Signed-off-by: Abel Andrés <[email protected]>
  • Loading branch information
abel296 committed Nov 21, 2023
1 parent 5025f50 commit 66e2719
Show file tree
Hide file tree
Showing 11 changed files with 1,973 additions and 44 deletions.
71 changes: 29 additions & 42 deletions cache/engine/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

goredislib "github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -106,12 +107,30 @@ func (rdb *RedisClient) unlock(ctx context.Context, key string) error {

// PurgeAll - Purges all the existing keys on a DB.
func (rdb *RedisClient) PurgeAll() (bool, error) {
if rdb.Client.ClusterSlots(ctx).Err() == nil {
clusterClient := rdb.Client.(*goredislib.ClusterClient)

err := clusterClient.ForEachShard(ctx, func(ctx context.Context, client *goredislib.Client) error {
return rdb.purgeAllClusterKeys(client)
})

return err == nil, err
} else {
_, err := circuitbreaker.CB(rdb.Name, rdb.logger).Execute(func() (interface{}, error) {
err := rdb.Client.FlushDB(ctx).Err()
return nil, err
})

return err == nil, err
}
}

func (rdb *RedisClient) purgeAllClusterKeys(client *goredislib.Client) error {
_, err := circuitbreaker.CB(rdb.Name, rdb.logger).Execute(func() (interface{}, error) {
err := rdb.Client.FlushDB(ctx).Err()
err := client.FlushDB(ctx).Err()
return nil, err
})

return err == nil, err
return err
}

// Ping - Tests the connection.
Expand Down Expand Up @@ -168,47 +187,15 @@ func (rdb *RedisClient) Del(ctx context.Context, key string) error {
return err
}

// DelWildcard - Removes the matching keys based on a pattern.
func (rdb *RedisClient) DelWildcard(ctx context.Context, key string) (int, error) {
k, err := circuitbreaker.CB(rdb.Name, rdb.logger).Execute(func() (interface{}, error) {
keys, err := rdb.Client.Keys(ctx, key).Result()
return keys, err
})

if err != nil {
return 0, nil
}

return rdb.deleteKeys(ctx, key, k.([]string))
}

// DelWildcard - Removes the matching keys based on a pattern.
func (rdb *RedisClient) deleteKeys(ctx context.Context, keyID string, keys []string) (int, error) {
l := len(keys)

if l == 0 {
return 0, nil
}

_, errDel := circuitbreaker.CB(rdb.Name, rdb.logger).Execute(rdb.doDeleteKeys(ctx, keyID, keys))

return l, errDel
type Counter struct {
mu sync.Mutex
counter int
}

func (rdb *RedisClient) doDeleteKeys(ctx context.Context, keyID string, keys []string) func() (interface{}, error) {
return func() (interface{}, error) {
if errLock := rdb.lock(ctx, keyID); errLock != nil {
return nil, errLock
}

err := rdb.Client.Del(ctx, keys...).Err()

if errUnlock := rdb.unlock(ctx, keyID); errUnlock != nil {
return nil, errUnlock
}

return nil, err
}
func (c *Counter) incrementar(num int) {
c.mu.Lock()
defer c.mu.Unlock()
c.counter = c.counter + num
}

// List - Returns the values in a list.
Expand Down
Loading

0 comments on commit 66e2719

Please sign in to comment.