From aba21be8cd68b5039bb658f611546cf49f24a008 Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Wed, 16 Aug 2023 17:06:07 +0300 Subject: [PATCH] Add Redis Gears support (#2675) * Add gears commands and tests * Fix tfunctionlist and add docstrings * fixes * fixes * add tfcallasync to gearsCmdable --- .github/wordlist.txt | 3 +- command.go | 65 +++++++++++++++++ commands.go | 1 + redis_gears.go | 161 +++++++++++++++++++++++++++++++++++++++++++ redis_gears_test.go | 139 +++++++++++++++++++++++++++++++++++++ 5 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 redis_gears.go create mode 100644 redis_gears_test.go diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 294988d52..32aba8317 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -52,4 +52,5 @@ uri URI url variadic -RedisStack \ No newline at end of file +RedisStack +RedisGears \ No newline at end of file diff --git a/command.go b/command.go index b6df28fbe..1bd4d5db1 100644 --- a/command.go +++ b/command.go @@ -3713,6 +3713,71 @@ func (cmd *MapStringStringSliceCmd) readReply(rd *proto.Reader) error { return nil } +//----------------------------------------------------------------------- + +type MapStringInterfaceSliceCmd struct { + baseCmd + + val []map[string]interface{} +} + +var _ Cmder = (*MapStringInterfaceSliceCmd)(nil) + +func NewMapStringInterfaceSliceCmd(ctx context.Context, args ...interface{}) *MapStringInterfaceSliceCmd { + return &MapStringInterfaceSliceCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *MapStringInterfaceSliceCmd) SetVal(val []map[string]interface{}) { + cmd.val = val +} + +func (cmd *MapStringInterfaceSliceCmd) Val() []map[string]interface{} { + return cmd.val +} + +func (cmd *MapStringInterfaceSliceCmd) Result() ([]map[string]interface{}, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *MapStringInterfaceSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *MapStringInterfaceSliceCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + + cmd.val = make([]map[string]interface{}, n) + for i := 0; i < n; i++ { + nn, err := rd.ReadMapLen() + if err != nil { + return err + } + cmd.val[i] = make(map[string]interface{}, nn) + for f := 0; f < nn; f++ { + k, err := rd.ReadString() + if err != nil { + return err + } + v, err := rd.ReadReply() + if err != nil { + if err != Nil { + return err + } + } + cmd.val[i][k] = v + } + } + return nil +} + //------------------------------------------------------------------------------ type KeyValuesCmd struct { diff --git a/commands.go b/commands.go index 41f8d3ae3..07c8e2c88 100644 --- a/commands.go +++ b/commands.go @@ -505,6 +505,7 @@ type Cmdable interface { ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd + gearsCmdable probabilisticCmdable } diff --git a/redis_gears.go b/redis_gears.go new file mode 100644 index 000000000..5fafea403 --- /dev/null +++ b/redis_gears.go @@ -0,0 +1,161 @@ +package redis + +import ( + "context" + "fmt" + "strings" +) + +type gearsCmdable interface { + TFunctionLoad(ctx context.Context, lib string) *StatusCmd + TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd + TFunctionDelete(ctx context.Context, libName string) *StatusCmd + TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd + TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd + TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd + TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd + TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd + TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd +} +type TFunctionLoadOptions struct { + Replace bool + Config string +} + +type TFunctionListOptions struct { + Withcode bool + Verbose int + Library string +} + +type TFCallOptions struct { + Keys []string + Arguments []string +} + +// TFunctionLoad - load a new JavaScript library into Redis. +// For more information - https://redis.io/commands/tfunction-load/ +func (c cmdable) TFunctionLoad(ctx context.Context, lib string) *StatusCmd { + args := []interface{}{"TFUNCTION", "LOAD", lib} + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd { + args := []interface{}{"TFUNCTION", "LOAD"} + if options != nil { + if options.Replace { + args = append(args, "REPLACE") + } + if options.Config != "" { + args = append(args, "CONFIG", options.Config) + } + } + args = append(args, lib) + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TFunctionDelete - delete a JavaScript library from Redis. +// For more information - https://redis.io/commands/tfunction-delete/ +func (c cmdable) TFunctionDelete(ctx context.Context, libName string) *StatusCmd { + args := []interface{}{"TFUNCTION", "DELETE", libName} + cmd := NewStatusCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TFunctionList - list the functions with additional information about each function. +// For more information - https://redis.io/commands/tfunction-list/ +func (c cmdable) TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd { + args := []interface{}{"TFUNCTION", "LIST"} + cmd := NewMapStringInterfaceSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd { + args := []interface{}{"TFUNCTION", "LIST"} + if options != nil { + if options.Withcode { + args = append(args, "WITHCODE") + } + if options.Verbose != 0 { + v := strings.Repeat("v", options.Verbose) + args = append(args, v) + } + if options.Library != "" { + args = append(args, "LIBRARY", options.Library) + + } + } + cmd := NewMapStringInterfaceSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TFCall - invoke a function. +// For more information - https://redis.io/commands/tfcall/ +func (c cmdable) TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd { + lf := libName + "." + funcName + args := []interface{}{"TFCALL", lf, numKeys} + cmd := NewCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd { + lf := libName + "." + funcName + args := []interface{}{"TFCALL", lf, numKeys} + if options != nil { + if options.Keys != nil { + for _, key := range options.Keys { + + args = append(args, key) + } + } + if options.Arguments != nil { + for _, key := range options.Arguments { + + args = append(args, key) + } + } + } + cmd := NewCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// TFCallASYNC - invoke an asynchronous JavaScript function (coroutine). +// For more information - https://redis.io/commands/TFCallASYNC/ +func (c cmdable) TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd { + lf := fmt.Sprintf("%s.%s", libName, funcName) + args := []interface{}{"TFCALLASYNC", lf, numKeys} + cmd := NewCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd { + lf := fmt.Sprintf("%s.%s", libName, funcName) + args := []interface{}{"TFCALLASYNC", lf, numKeys} + if options != nil { + if options.Keys != nil { + for _, key := range options.Keys { + + args = append(args, key) + } + } + if options.Arguments != nil { + for _, key := range options.Arguments { + + args = append(args, key) + } + } + } + cmd := NewCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} diff --git a/redis_gears_test.go b/redis_gears_test.go new file mode 100644 index 000000000..d02487cb7 --- /dev/null +++ b/redis_gears_test.go @@ -0,0 +1,139 @@ +package redis_test + +import ( + "context" + "fmt" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" + "github.com/redis/go-redis/v9" +) + +func libCode(libName string) string { + return fmt.Sprintf("#!js api_version=1.0 name=%s\n redis.registerFunction('foo', ()=>{{return 'bar'}})", libName) +} + +func libCodeWithConfig(libName string) string { + lib := `#!js api_version=1.0 name=%s + + var last_update_field_name = "__last_update__" + + if (redis.config.last_update_field_name !== undefined) { + if (typeof redis.config.last_update_field_name != 'string') { + throw "last_update_field_name must be a string"; + } + last_update_field_name = redis.config.last_update_field_name + } + + redis.registerFunction("hset", function(client, key, field, val){ + // get the current time in ms + var curr_time = client.call("time")[0]; + return client.call('hset', key, field, val, last_update_field_name, curr_time); + });` + return fmt.Sprintf(lib, libName) +} + +var _ = Describe("RedisGears commands", Label("gears"), func() { + ctx := context.TODO() + var client *redis.Client + + BeforeEach(func() { + client = redis.NewClient(&redis.Options{Addr: ":6379"}) + Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should TFunctionLoad, TFunctionLoadArgs and TFunctionDelete ", Label("gears", "tfunctionload"), func() { + + resultAdd, err := client.TFunctionLoad(ctx, libCode("libtflo1")).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + opt := &redis.TFunctionLoadOptions{Replace: true, Config: `{"last_update_field_name":"last_update"}`} + resultAdd, err = client.TFunctionLoadArgs(ctx, libCodeWithConfig("libtflo1"), opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + resultAdd, err = client.TFunctionDelete(ctx, "libtflo1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + + }) + It("should TFunctionList", Label("gears", "tfunctionlist"), func() { + resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfli1")).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + resultAdd, err = client.TFunctionLoad(ctx, libCode("libtfli2")).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + resultList, err := client.TFunctionList(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultList[0]["engine"]).To(BeEquivalentTo("js")) + opt := &redis.TFunctionListOptions{Withcode: true, Verbose: 2} + resultListArgs, err := client.TFunctionListArgs(ctx, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultListArgs[0]["code"]).To(BeEquivalentTo(libCode("libtfli1"))) + resultAdd, err = client.TFunctionDelete(ctx, "libtfli1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + resultAdd, err = client.TFunctionDelete(ctx, "libtfli2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + }) + + It("should TFCall", Label("gears", "tfcall"), func() { + var resultAdd interface{} + resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfc1")).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + resultAdd, err = client.TFCall(ctx, "libtfc1", "foo", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("bar")) + resultAdd, err = client.TFunctionDelete(ctx, "libtfc1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + }) + + It("should TFCallArgs", Label("gears", "tfcallargs"), func() { + var resultAdd interface{} + resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfca1")).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}} + resultAdd, err = client.TFCallArgs(ctx, "libtfca1", "foo", 0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("bar")) + resultAdd, err = client.TFunctionDelete(ctx, "libtfca1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + }) + + It("should TFCallASYNC", Label("gears", "TFCallASYNC"), func() { + var resultAdd interface{} + resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfc1")).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + resultAdd, err = client.TFCallASYNC(ctx, "libtfc1", "foo", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("bar")) + resultAdd, err = client.TFunctionDelete(ctx, "libtfc1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + }) + + It("should TFCallASYNCArgs", Label("gears", "TFCallASYNCargs"), func() { + var resultAdd interface{} + resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfca1")).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}} + resultAdd, err = client.TFCallASYNCArgs(ctx, "libtfca1", "foo", 0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("bar")) + resultAdd, err = client.TFunctionDelete(ctx, "libtfca1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo("OK")) + }) + +})