Skip to content

Commit

Permalink
Add Redis Gears support (#2675)
Browse files Browse the repository at this point in the history
* Add gears commands and tests

* Fix tfunctionlist and add docstrings

* fixes

* fixes

* add tfcallasync to gearsCmdable
  • Loading branch information
ofekshenawa authored Aug 16, 2023
1 parent 558581e commit aba21be
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ uri
URI
url
variadic
RedisStack
RedisStack
RedisGears
65 changes: 65 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3713,6 +3713,71 @@ func (cmd *MapStringStringSliceCmd) readReply(rd *proto.Reader) error {
return nil
}

//-----------------------------------------------------------------------

type MapStringInterfaceSliceCmd struct {
baseCmd

val []map[string]interface{}
}

var _ Cmder = (*MapStringInterfaceSliceCmd)(nil)

func NewMapStringInterfaceSliceCmd(ctx context.Context, args ...interface{}) *MapStringInterfaceSliceCmd {
return &MapStringInterfaceSliceCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}

func (cmd *MapStringInterfaceSliceCmd) SetVal(val []map[string]interface{}) {
cmd.val = val
}

func (cmd *MapStringInterfaceSliceCmd) Val() []map[string]interface{} {
return cmd.val
}

func (cmd *MapStringInterfaceSliceCmd) Result() ([]map[string]interface{}, error) {
return cmd.Val(), cmd.Err()
}

func (cmd *MapStringInterfaceSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}

func (cmd *MapStringInterfaceSliceCmd) readReply(rd *proto.Reader) error {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}

cmd.val = make([]map[string]interface{}, n)
for i := 0; i < n; i++ {
nn, err := rd.ReadMapLen()
if err != nil {
return err
}
cmd.val[i] = make(map[string]interface{}, nn)
for f := 0; f < nn; f++ {
k, err := rd.ReadString()
if err != nil {
return err
}
v, err := rd.ReadReply()
if err != nil {
if err != Nil {
return err
}
}
cmd.val[i][k] = v
}
}
return nil
}

//------------------------------------------------------------------------------

type KeyValuesCmd struct {
Expand Down
1 change: 1 addition & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ type Cmdable interface {

ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd

gearsCmdable
probabilisticCmdable
}

Expand Down
161 changes: 161 additions & 0 deletions redis_gears.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package redis

import (
"context"
"fmt"
"strings"
)

type gearsCmdable interface {
TFunctionLoad(ctx context.Context, lib string) *StatusCmd
TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd
TFunctionDelete(ctx context.Context, libName string) *StatusCmd
TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd
TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd
TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd
TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd
TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd
TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd
}
type TFunctionLoadOptions struct {
Replace bool
Config string
}

type TFunctionListOptions struct {
Withcode bool
Verbose int
Library string
}

type TFCallOptions struct {
Keys []string
Arguments []string
}

// TFunctionLoad - load a new JavaScript library into Redis.
// For more information - https://redis.io/commands/tfunction-load/
func (c cmdable) TFunctionLoad(ctx context.Context, lib string) *StatusCmd {
args := []interface{}{"TFUNCTION", "LOAD", lib}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd {
args := []interface{}{"TFUNCTION", "LOAD"}
if options != nil {
if options.Replace {
args = append(args, "REPLACE")
}
if options.Config != "" {
args = append(args, "CONFIG", options.Config)
}
}
args = append(args, lib)
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// TFunctionDelete - delete a JavaScript library from Redis.
// For more information - https://redis.io/commands/tfunction-delete/
func (c cmdable) TFunctionDelete(ctx context.Context, libName string) *StatusCmd {
args := []interface{}{"TFUNCTION", "DELETE", libName}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// TFunctionList - list the functions with additional information about each function.
// For more information - https://redis.io/commands/tfunction-list/
func (c cmdable) TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd {
args := []interface{}{"TFUNCTION", "LIST"}
cmd := NewMapStringInterfaceSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd {
args := []interface{}{"TFUNCTION", "LIST"}
if options != nil {
if options.Withcode {
args = append(args, "WITHCODE")
}
if options.Verbose != 0 {
v := strings.Repeat("v", options.Verbose)
args = append(args, v)
}
if options.Library != "" {
args = append(args, "LIBRARY", options.Library)

}
}
cmd := NewMapStringInterfaceSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// TFCall - invoke a function.
// For more information - https://redis.io/commands/tfcall/
func (c cmdable) TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd {
lf := libName + "." + funcName
args := []interface{}{"TFCALL", lf, numKeys}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd {
lf := libName + "." + funcName
args := []interface{}{"TFCALL", lf, numKeys}
if options != nil {
if options.Keys != nil {
for _, key := range options.Keys {

args = append(args, key)
}
}
if options.Arguments != nil {
for _, key := range options.Arguments {

args = append(args, key)
}
}
}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// TFCallASYNC - invoke an asynchronous JavaScript function (coroutine).
// For more information - https://redis.io/commands/TFCallASYNC/
func (c cmdable) TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd {
lf := fmt.Sprintf("%s.%s", libName, funcName)
args := []interface{}{"TFCALLASYNC", lf, numKeys}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd {
lf := fmt.Sprintf("%s.%s", libName, funcName)
args := []interface{}{"TFCALLASYNC", lf, numKeys}
if options != nil {
if options.Keys != nil {
for _, key := range options.Keys {

args = append(args, key)
}
}
if options.Arguments != nil {
for _, key := range options.Arguments {

args = append(args, key)
}
}
}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
139 changes: 139 additions & 0 deletions redis_gears_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package redis_test

import (
"context"
"fmt"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
"github.com/redis/go-redis/v9"
)

func libCode(libName string) string {
return fmt.Sprintf("#!js api_version=1.0 name=%s\n redis.registerFunction('foo', ()=>{{return 'bar'}})", libName)
}

func libCodeWithConfig(libName string) string {
lib := `#!js api_version=1.0 name=%s
var last_update_field_name = "__last_update__"
if (redis.config.last_update_field_name !== undefined) {
if (typeof redis.config.last_update_field_name != 'string') {
throw "last_update_field_name must be a string";
}
last_update_field_name = redis.config.last_update_field_name
}
redis.registerFunction("hset", function(client, key, field, val){
// get the current time in ms
var curr_time = client.call("time")[0];
return client.call('hset', key, field, val, last_update_field_name, curr_time);
});`
return fmt.Sprintf(lib, libName)
}

var _ = Describe("RedisGears commands", Label("gears"), func() {
ctx := context.TODO()
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379"})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should TFunctionLoad, TFunctionLoadArgs and TFunctionDelete ", Label("gears", "tfunctionload"), func() {

resultAdd, err := client.TFunctionLoad(ctx, libCode("libtflo1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFunctionLoadOptions{Replace: true, Config: `{"last_update_field_name":"last_update"}`}
resultAdd, err = client.TFunctionLoadArgs(ctx, libCodeWithConfig("libtflo1"), opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionDelete(ctx, "libtflo1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))

})
It("should TFunctionList", Label("gears", "tfunctionlist"), func() {
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfli1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionLoad(ctx, libCode("libtfli2")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultList, err := client.TFunctionList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultList[0]["engine"]).To(BeEquivalentTo("js"))
opt := &redis.TFunctionListOptions{Withcode: true, Verbose: 2}
resultListArgs, err := client.TFunctionListArgs(ctx, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultListArgs[0]["code"]).To(BeEquivalentTo(libCode("libtfli1")))
resultAdd, err = client.TFunctionDelete(ctx, "libtfli1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfli2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

It("should TFCall", Label("gears", "tfcall"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfc1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFCall(ctx, "libtfc1", "foo", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfc1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

It("should TFCallArgs", Label("gears", "tfcallargs"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfca1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}}
resultAdd, err = client.TFCallArgs(ctx, "libtfca1", "foo", 0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfca1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

It("should TFCallASYNC", Label("gears", "TFCallASYNC"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfc1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFCallASYNC(ctx, "libtfc1", "foo", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfc1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

It("should TFCallASYNCArgs", Label("gears", "TFCallASYNCargs"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfca1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}}
resultAdd, err = client.TFCallASYNCArgs(ctx, "libtfca1", "foo", 0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfca1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

})

0 comments on commit aba21be

Please sign in to comment.