Skip to content

Commit

Permalink
Fix tmscan tests, add new params for PYEXECUTE, rework XINFO (#146)
Browse files Browse the repository at this point in the history
* Fix test periodically failed because of inconsistency of COUNT

* Rework pyexecute: add new params

* Re-implement xifno using marshaling

* Format

* Add XINFO and update dashboard

* Add Test data for Time-series and JSON

* Fix tests

Co-authored-by: Mikhail <[email protected]>
  • Loading branch information
santriseus and Mikhail authored Jan 22, 2021
1 parent d5c95c0 commit ab8a470
Show file tree
Hide file tree
Showing 12 changed files with 472 additions and 111 deletions.
Binary file modified data/dump.rdb
Binary file not shown.
30 changes: 26 additions & 4 deletions pkg/redis-gears.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,45 @@ func queryRgDumpregistrations(qm queryModel, client redisClient) backend.DataRes
}

/**
* RG.PYEXECUTE
* RG.PYEXECUTE "<function>" [UNBLOCKING] [REQUIREMENTS "<dep> ..."]
*
* Returns the list of function registrations
* @see https://oss.redislabs.com/redisgears/commands.html#rgdumpregistrations
* Executes a Python function
* @see https://oss.redislabs.com/redisgears/commands.html#rgpyexecute
*/
func queryRgPyexecute(qm queryModel, client redisClient) backend.DataResponse {
response := backend.DataResponse{}

var result interface{}

// Check and create list of optional parameters
var args []interface{}
if qm.Unblocking {
args = append(args, "UNBLOCKING")
}

if qm.Requirements != "" {
args = append(args, "REQUIREMENTS", qm.Requirements)
}

// Run command
err := client.RunFlatCmd(&result, "RG.PYEXECUTE", qm.Key)
err := client.RunFlatCmd(&result, "RG.PYEXECUTE", qm.Key, args...)

// Check error
if err != nil {
return errorHandler(response, err)
}

// UNBLOCKING
if qm.Unblocking {
// when running with UNBLOCKING only operationId is returned
frame := data.NewFrame("operationId")
frame.Fields = append(frame.Fields, data.NewField("operationId", nil, []string{string(result.([]byte))}))

// Adding frame to response
response.Frames = append(response.Frames, frame)
return response
}

// New Frame for results
frameWithResults := data.NewFrame("results")
frameWithResults.Fields = append(frameWithResults.Fields, data.NewField("results", nil, []string{}))
Expand All @@ -162,6 +183,7 @@ func queryRgPyexecute(qm queryModel, client redisClient) backend.DataResponse {
response.Frames = append(response.Frames, frameWithResults)
response.Frames = append(response.Frames, frameWithErrors)

// Parse result
switch value := result.(type) {
case string:
return response
Expand Down
17 changes: 14 additions & 3 deletions pkg/redis-gears_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func TestRgPyexecuteIntegration(t *testing.T) {
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Results
t.Run("Test command with full response", func(t *testing.T) {
// Response
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 1)
Expand All @@ -80,8 +80,19 @@ func TestRgPyexecuteIntegration(t *testing.T) {
require.NoError(t, resp.Error)
})

// UNBLOCKING and REQUIREMENTS
t.Run("Test command with UNBLOCKING and REQUIREMENTS", func(t *testing.T) {
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GearsBuilder(reader=\"KeysReader\").run()", Unblocking: true, Requirements: "numpy"}, &client)
require.Len(t, resp.Frames, 1)
require.Len(t, resp.Frames[0].Fields, 1)
require.Equal(t, "operationId", resp.Frames[0].Name)
require.Equal(t, "operationId", resp.Frames[0].Fields[0].Name)
require.Greater(t, resp.Frames[0].Fields[0].Len(), 0)
require.IsType(t, "", resp.Frames[0].Fields[0].At(0))
})

// OK
t.Run("Test command with full OK string", func(t *testing.T) {
// Response
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB('CommandReader')"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 1)
Expand All @@ -95,8 +106,8 @@ func TestRgPyexecuteIntegration(t *testing.T) {
require.NoError(t, resp.Error)
})

// Error
t.Run("Test command with error", func(t *testing.T) {
// Response
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "some key"}, &client)
require.Len(t, resp.Frames, 0)
require.Error(t, resp.Error)
Expand Down
29 changes: 26 additions & 3 deletions pkg/redis-gears_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestRgPyexecute(t *testing.T) {
}

// Response
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute"}, &client)
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 1)
require.Equal(t, "results", resp.Frames[0].Name)
Expand All @@ -170,6 +170,29 @@ func TestRgPyexecute(t *testing.T) {
require.NoError(t, resp.Error)
})

/**
* Success with Unblocking
*/
t.Run("should process command with Unblocking and requirements", func(t *testing.T) {
t.Parallel()

// Client
client := testClient{
rcv: []byte("0000000000000000000000000000000000000000-11"),
err: nil,
}

// Response
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()", Unblocking: true, Requirements: "numpy"}, &client)
require.Len(t, resp.Frames, 1)
require.Len(t, resp.Frames[0].Fields, 1)
require.Equal(t, "operationId", resp.Frames[0].Name)
require.Equal(t, "operationId", resp.Frames[0].Fields[0].Name)
require.Greater(t, resp.Frames[0].Fields[0].Len(), 0)
require.Equal(t, "0000000000000000000000000000000000000000-11", resp.Frames[0].Fields[0].At(0))
require.NoError(t, resp.Error)
})

/**
* Success with 2 arrays in result
*/
Expand All @@ -190,7 +213,7 @@ func TestRgPyexecute(t *testing.T) {
}

// Response
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute"}, &client)
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 1)
require.Equal(t, "results", resp.Frames[0].Name)
Expand Down Expand Up @@ -219,7 +242,7 @@ func TestRgPyexecute(t *testing.T) {
err: errors.New("error occurred")}

// Response
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute"}, &client)
resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()"}, &client)
require.EqualError(t, resp.Error, "error occurred")
})
}
72 changes: 58 additions & 14 deletions pkg/redis-stream.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
package main

import (
"bytes"
"fmt"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)

/**
* XINFO Radix marshaling
*/
type xinfo struct {
Length int64 `redis:"length"`
RadixTreeKeys int64 `redis:"radix-tree-keys"`
RadixTreeNodes int64 `redis:"radix-tree-nodes"`
Groups int64 `redis:"groups"`
LastGeneratedID string `redis:"last-generated-id"`
FirstEntry []interface{} `redis:"first-entry"`
LastEntry []interface{} `redis:"last-entry"`
}

/**
* XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
*
Expand All @@ -14,31 +30,59 @@ func queryXInfoStream(qm queryModel, client redisClient) backend.DataResponse {
response := backend.DataResponse{}

// Execute command
var result map[string]string
err := client.RunFlatCmd(&result, "XINFO", "STREAM", qm.Key)
var model xinfo
err := client.RunFlatCmd(&model, "XINFO", "STREAM", qm.Key)

// Check error
if err != nil {
return errorHandler(response, err)
}

fields := []string{}
values := []string{}

// Add fields and values
for k := range result {
fields = append(fields, k)
values = append(values, result[k])
}

// New Frame
frame := data.NewFrame(qm.Key,
data.NewField("Field", nil, fields),
data.NewField("Value", nil, values))
frame := data.NewFrame(qm.Key)

// Add the frames to the response
response.Frames = append(response.Frames, frame)

// Add plain fields to frame
frame.Fields = append(frame.Fields, data.NewField("length", nil, []int64{model.Length}))
frame.Fields = append(frame.Fields, data.NewField("radix-tree-keys", nil, []int64{model.RadixTreeKeys}))
frame.Fields = append(frame.Fields, data.NewField("radix-tree-nodes", nil, []int64{model.RadixTreeNodes}))
frame.Fields = append(frame.Fields, data.NewField("groups", nil, []int64{model.Groups}))
frame.Fields = append(frame.Fields, data.NewField("last-generated-id", nil, []string{model.LastGeneratedID}))

// First entry
if model.FirstEntry != nil {
frame.Fields = append(frame.Fields, data.NewField("first-entry-id", nil, []string{string(model.FirstEntry[0].([]byte))}))
entryFields := model.FirstEntry[1].([]interface{})
fields := new(bytes.Buffer)

// Merging args to string like "key"="value"\n
for i := 0; i < len(entryFields); i += 2 {
field := string(entryFields[i].([]byte))
value := string(entryFields[i+1].([]byte))
fmt.Fprintf(fields, "\"%s\"=\"%s\"\n", field, value)
}

frame.Fields = append(frame.Fields, data.NewField("first-entry-fields", nil, []string{fields.String()}))
}

// Last entry
if model.LastEntry != nil {
frame.Fields = append(frame.Fields, data.NewField("last-entry-id", nil, []string{string(model.LastEntry[0].([]byte))}))
entryFields := model.LastEntry[1].([]interface{})
fields := new(bytes.Buffer)

// Merging args to string like "key"="value"\n
for i := 0; i < len(entryFields); i += 2 {
field := string(entryFields[i].([]byte))
value := string(entryFields[i+1].([]byte))
fmt.Fprintf(fields, "\"%s\"=\"%s\"\n", field, value)
}

frame.Fields = append(frame.Fields, data.NewField("last-entry-fields", nil, []string{fields.String()}))
}

// Return
return response
}
36 changes: 36 additions & 0 deletions pkg/redis-stream_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// +build integration

package main

import (
"fmt"
"testing"

"github.com/mediocregopher/radix/v3"
"github.com/stretchr/testify/require"
)

/**
* XINFO
*/
func TestXInfoStreamIntegration(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Customers
t.Run("query stream queue:customers", func(t *testing.T) {
resp := queryXInfoStream(queryModel{Key: "queue:customers"}, &client)
require.Len(t, resp.Frames, 1)
require.Len(t, resp.Frames[0].Fields, 9)
require.Equal(t, 1, resp.Frames[0].Fields[0].Len())
})

// Orders
t.Run("query stream queue:orders", func(t *testing.T) {
resp := queryXInfoStream(queryModel{Key: "queue:orders"}, &client)
require.Len(t, resp.Frames, 1)
require.Len(t, resp.Frames[0].Fields, 9)
require.Equal(t, 1, resp.Frames[0].Fields[0].Len())
})
}
Loading

0 comments on commit ab8a470

Please sign in to comment.