From ab8a470e02b76a05f3b8356ba8d52330be98bd33 Mon Sep 17 00:00:00 2001 From: santriseus Date: Fri, 22 Jan 2021 09:02:27 +0300 Subject: [PATCH] Fix tmscan tests, add new params for PYEXECUTE, rework XINFO (#146) * Fix test periodically failed because of inconsistency of COUNT * Rework pyexecute: add new params * Re-implement xifno using marshaling * Format * Add XINFO and update dashboard * Add Test data for Time-series and JSON * Fix tests Co-authored-by: Mikhail --- data/dump.rdb | Bin 10240 -> 14681 bytes pkg/redis-gears.go | 30 +++- pkg/redis-gears_integration_test.go | 17 +- pkg/redis-gears_test.go | 29 +++- pkg/redis-stream.go | 72 +++++++-- pkg/redis-stream_integration_test.go | 36 +++++ pkg/redis-stream_test.go | 134 ++++++++++------ pkg/redis-tmscan_integration_test.go | 8 +- pkg/testing-utilities_test.go | 2 + pkg/types.go | 40 ++--- provisioning/dashboards/data-types.json | 203 +++++++++++++++++++++++- src/redis/command.ts | 12 +- 12 files changed, 472 insertions(+), 111 deletions(-) create mode 100644 pkg/redis-stream_integration_test.go diff --git a/data/dump.rdb b/data/dump.rdb index 36e231d99188ba9c5e7b7f71059fa41370633b96..149e9f771063bf3922f1742aa5ff5d9e5798084e 100644 GIT binary patch delta 5707 zcmcgwdr(x@89(>jyANPjSOJlTdbuvJ%S!`_)>OMZRaU`ARHCW6>|VIUec(QdFLIYe z#k6Zja_OWq9W6GF@AyD#ACo34n9AsAZIhZ<69czs#$=*1=^u`^)t<8gyTILrW~OBZ zcDdhozVrM2&i8%i+~Zw#%eikRbJrZb@rlyoVl8<#*4Cvv`!aZPb zZOtdzYNu*mzi@2V&38E^Bp?7!!9U}Ln)q(bm9tH{CISGUNOZgASy{WVKQ&qSYsw$^ zSU2l(&vQ4}SQqPTU|lUs>5GqxV&kA}qy+-F44^+P01!>O9lJlHP(1Xb^+O6hhBDY- zk@>gF!%qf7`x2<+FUuO5J%Yk*`Bwj&683w|7c zO+rH%f<0>(?B+)~c>@ekFt|1idE^dws9>S2d%-dgb-MUl_`c$jN-1dIY+-qdnFoqS zr^DhgyDw&DDxw1TUv)X`E#=~hxnH#90%$Nm4WyBZ2CpnCupVxctr04r_mziJkl8Cs zaI%&Lm(^73ni2TMnw(}1I{*S6m2#+eni`)w0KUbBJv5;qYOM~34UJ#JU{hSE2}27+ z21j=yPzvlY0@TxoV}VNgAP9;N!VW>T>sVM&HG$~Kp`Z}?V4fm$W#$ODPd~xN*uZ#G z!wi1l3mR5>K$_7H<)4nlh;alNNfK(k9flCEgf)RRip=Oy1V+DIJd9BYp|8RaN;%*i z#LI_CoMP9YN2o){674O-4w8AZ_8n}BNT?zOyP(wKgP9x5t!sJMICL6Q-70-GW((b6Adm*Ir?!G=auWMiuYBlp}ejBe~5Bk5rL?{Ulsgy!%|u!kfg z>wIjA$f)Web{WoSh|u!G*eFHlT96QrH~7H~Wxf0tQfm~lM6e#>pb;3gt-}tIgc?s{ z({Mtl{|xKhE4r{x7)Fkp*eJosa|l9{9qOjA$ch~#(*SGo0=7ohq!YW0(1ag2Pmwu(srkpLNf{n1 zrOEdtHF>>n)xQTc;YVeDNTU27G~v%lh%%(frT@7m{6P1ks8{xR-`yKNQHJs)RSHw^ zU<8X=vBP-tgr5;O-3;r2mh+J``J2@I-#Jfsm#G2O+;-_i>0bQ!gMH)Bq*@A7s3w8r zmePbrZzLB>V@%sPG#TS81B(Kh@KU0T zSChbdR5(o*Nv+8!O@^QU>Ia+bf2qlyN1J{<9!)lk!i47!O_Yn+BdjI`*c!RXMmRr^ zxt3A$SD^Vvi4uG#!RQ7xslbD!@?<_x2>Xa6!H^J*K7S)@O#VX1BZ|>p7I>X^v2M~| z;I~k-^XC>$c>aoSMW+NpqX7V@(Jw?dG8Xb_!LxHzB1g)u;!7D}Z|ePXjj%V4!v6}5 zlWz;!gx)j|9f(96=iAGwL$+P=G8&nb7cbJ1LkF`859H#QNK&uGvKwkf&9=5{Wr ziknQ!qf)ATavQp!u(I?nXm_~Nb#V>$b{}Oo+1Pe2#z$HlW_m4*G1=)fU@BhYJwtBx zn5-Ss<2>#aCQircxoj?m%cD4gbSE#I2fs*ufOp$OWoi3e2SAzVmAAu|}ak{+N zZ1&hZR@z9FY&Ei5MUeihLf~mJ$L_A($}B6fiSm~P_4FkEa991^Isi-^P0UTO8u1`L z6PMon5TrlokG7VaX8wFvEeGo}-smbj4yN|2k5|03^AzFZWp%vP z+&}X|iCp1B_-wV+jdXc`YA1;5BZz(yefEE^lh^?-b!N4fog%mi+A}bY(`e6tiDJx2 zf=kot$@D6YL3FiZLMzaSDXsmPXUa|~3ONlotBRXV^I(psyL?F7pLwJ=VfKCE=&?Kg zJih{b-l1gDZh*gF(%1W!AA-;**K3Fpw_c+?3LxVbUexN<*MLsHo2pq-Ue*5QN@24$ zfj>-UxBFsfwVbtY_OKq7*1VGi+;oPF-NKqZ?#klI4Cte_(1oRjyD&p0bGwQymaI-i zsT>d!h)KlV$vTRQ^+h|4b)eo@mp!e6RhBwzHj~}bE_^xds+iCX62;_hf=gU{1l z4fPOK+sn?n9G%qXgfyADrvRsDzV- TE`cFxInVUF|M=kTDyHW@#pe4_ delta 1326 zcmYLJZERCz6h7~JZ`-@Bg^qP)YzVh^rE9ybjD0Z)j;>otT(N+_m_l^@xZ|DlW3;z{ z>DY=e9LY@-ZvFTpc8GCf2rMHQOiT*Jh@v8p2uqx-*8)MJCTgP2L@fHlUnl20CnwMI zoaC8~CN{iQyL?BJvU`q+$M`@^h!0(4`G7DO z9iB5D-ZDo24S(sRE`w7V)lV0y`l69Q+%H@(o265RrBfB~-}G(p$7&`U8C@wUp)5|8tXOYD{z+>pCwf>{bn6wrZ9Jq*Wy5Nvej;G^kQ1 zrn(z8W2!lmmOg4)qe6;|>&p73{Q7eAs?=!rG2f;bDnY+~O?t=v!_Wee+R67P4<@aI z8BP*4DRisZVX$`ZK^+fSn1z=ZQnEKm7J5i2t1KwdzpG35oQ+sSrCi+dP9AYsGw#I&Md?n!*cmvu*fi&xL1JPo7$KF8bAwu zgUF58101L|MEgpT%qKc`L{5s(2 zg0G+3=nTusXXm?tC$(e+sUtd4?;#EB5H!gv-oHrh^kDF&c5^qOGcOm-$WPt;1RHYUj5olYzrFRw{!IG>EXipn)^;8^jYLJyK2so9`=L{*67zgRzV&^q zD_1@R1zCi0RKDgOpUS&?X{z##_S1;WGfiS0DatXEPCgn-b7GWI*>d zC-2dhsrjNh>h6Ud?p_<0;+eKcDC7+Xl0(wHrC-YW43x-=GKiEsIX-$z)^^FI9K=&C z90xC?)bcjYeZ_nkga`Nv5~-@p+RTIE>u~gL*=H1CMCPQ&_;&Y}>=Q>on_YASiIsQh z+f_ZdHzri|!|MBm1<1}A&p9Bi%0D0&`KFE^E3i0OF7_2`WL)D?DCsy zPN}wp`@D}r?kmdiC>PS>`*&g+=uR$0BgF}Gnxs_;nEJJ812#;48v6XrvtJ{OF~CDZ L)p>m8V5RCGg{HZP diff --git a/pkg/redis-gears.go b/pkg/redis-gears.go index 1932577..8935a7e 100644 --- a/pkg/redis-gears.go +++ b/pkg/redis-gears.go @@ -132,24 +132,45 @@ func queryRgDumpregistrations(qm queryModel, client redisClient) backend.DataRes } /** - * RG.PYEXECUTE + * RG.PYEXECUTE "" [UNBLOCKING] [REQUIREMENTS " ..."] * - * Returns the list of function registrations - * @see https://oss.redislabs.com/redisgears/commands.html#rgdumpregistrations + * Executes a Python function + * @see https://oss.redislabs.com/redisgears/commands.html#rgpyexecute */ func queryRgPyexecute(qm queryModel, client redisClient) backend.DataResponse { response := backend.DataResponse{} var result interface{} + // Check and create list of optional parameters + var args []interface{} + if qm.Unblocking { + args = append(args, "UNBLOCKING") + } + + if qm.Requirements != "" { + args = append(args, "REQUIREMENTS", qm.Requirements) + } + // Run command - err := client.RunFlatCmd(&result, "RG.PYEXECUTE", qm.Key) + err := client.RunFlatCmd(&result, "RG.PYEXECUTE", qm.Key, args...) // Check error if err != nil { return errorHandler(response, err) } + // UNBLOCKING + if qm.Unblocking { + // when running with UNBLOCKING only operationId is returned + frame := data.NewFrame("operationId") + frame.Fields = append(frame.Fields, data.NewField("operationId", nil, []string{string(result.([]byte))})) + + // Adding frame to response + response.Frames = append(response.Frames, frame) + return response + } + // New Frame for results frameWithResults := data.NewFrame("results") frameWithResults.Fields = append(frameWithResults.Fields, data.NewField("results", nil, []string{})) @@ -162,6 +183,7 @@ func queryRgPyexecute(qm queryModel, client redisClient) backend.DataResponse { response.Frames = append(response.Frames, frameWithResults) response.Frames = append(response.Frames, frameWithErrors) + // Parse result switch value := result.(type) { case string: return response diff --git a/pkg/redis-gears_integration_test.go b/pkg/redis-gears_integration_test.go index df12850..cf60c82 100644 --- a/pkg/redis-gears_integration_test.go +++ b/pkg/redis-gears_integration_test.go @@ -65,8 +65,8 @@ func TestRgPyexecuteIntegration(t *testing.T) { radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10) client := radixV3Impl{radixClient: radixClient} + // Results t.Run("Test command with full response", func(t *testing.T) { - // Response resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()"}, &client) require.Len(t, resp.Frames, 2) require.Len(t, resp.Frames[0].Fields, 1) @@ -80,8 +80,19 @@ func TestRgPyexecuteIntegration(t *testing.T) { require.NoError(t, resp.Error) }) + // UNBLOCKING and REQUIREMENTS + t.Run("Test command with UNBLOCKING and REQUIREMENTS", func(t *testing.T) { + resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GearsBuilder(reader=\"KeysReader\").run()", Unblocking: true, Requirements: "numpy"}, &client) + require.Len(t, resp.Frames, 1) + require.Len(t, resp.Frames[0].Fields, 1) + require.Equal(t, "operationId", resp.Frames[0].Name) + require.Equal(t, "operationId", resp.Frames[0].Fields[0].Name) + require.Greater(t, resp.Frames[0].Fields[0].Len(), 0) + require.IsType(t, "", resp.Frames[0].Fields[0].At(0)) + }) + + // OK t.Run("Test command with full OK string", func(t *testing.T) { - // Response resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB('CommandReader')"}, &client) require.Len(t, resp.Frames, 2) require.Len(t, resp.Frames[0].Fields, 1) @@ -95,8 +106,8 @@ func TestRgPyexecuteIntegration(t *testing.T) { require.NoError(t, resp.Error) }) + // Error t.Run("Test command with error", func(t *testing.T) { - // Response resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "some key"}, &client) require.Len(t, resp.Frames, 0) require.Error(t, resp.Error) diff --git a/pkg/redis-gears_test.go b/pkg/redis-gears_test.go index 89cadf8..8c26e25 100644 --- a/pkg/redis-gears_test.go +++ b/pkg/redis-gears_test.go @@ -157,7 +157,7 @@ func TestRgPyexecute(t *testing.T) { } // Response - resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute"}, &client) + resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()"}, &client) require.Len(t, resp.Frames, 2) require.Len(t, resp.Frames[0].Fields, 1) require.Equal(t, "results", resp.Frames[0].Name) @@ -170,6 +170,29 @@ func TestRgPyexecute(t *testing.T) { require.NoError(t, resp.Error) }) + /** + * Success with Unblocking + */ + t.Run("should process command with Unblocking and requirements", func(t *testing.T) { + t.Parallel() + + // Client + client := testClient{ + rcv: []byte("0000000000000000000000000000000000000000-11"), + err: nil, + } + + // Response + resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()", Unblocking: true, Requirements: "numpy"}, &client) + require.Len(t, resp.Frames, 1) + require.Len(t, resp.Frames[0].Fields, 1) + require.Equal(t, "operationId", resp.Frames[0].Name) + require.Equal(t, "operationId", resp.Frames[0].Fields[0].Name) + require.Greater(t, resp.Frames[0].Fields[0].Len(), 0) + require.Equal(t, "0000000000000000000000000000000000000000-11", resp.Frames[0].Fields[0].At(0)) + require.NoError(t, resp.Error) + }) + /** * Success with 2 arrays in result */ @@ -190,7 +213,7 @@ func TestRgPyexecute(t *testing.T) { } // Response - resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute"}, &client) + resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()"}, &client) require.Len(t, resp.Frames, 2) require.Len(t, resp.Frames[0].Fields, 1) require.Equal(t, "results", resp.Frames[0].Name) @@ -219,7 +242,7 @@ func TestRgPyexecute(t *testing.T) { err: errors.New("error occurred")} // Response - resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute"}, &client) + resp := queryRgPyexecute(queryModel{Command: "rg.pyexecute", Key: "GB().run()"}, &client) require.EqualError(t, resp.Error, "error occurred") }) } diff --git a/pkg/redis-stream.go b/pkg/redis-stream.go index 7aafc9b..8241704 100644 --- a/pkg/redis-stream.go +++ b/pkg/redis-stream.go @@ -1,10 +1,26 @@ package main import ( + "bytes" + "fmt" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" ) +/** + * XINFO Radix marshaling + */ +type xinfo struct { + Length int64 `redis:"length"` + RadixTreeKeys int64 `redis:"radix-tree-keys"` + RadixTreeNodes int64 `redis:"radix-tree-nodes"` + Groups int64 `redis:"groups"` + LastGeneratedID string `redis:"last-generated-id"` + FirstEntry []interface{} `redis:"first-entry"` + LastEntry []interface{} `redis:"last-entry"` +} + /** * XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP] * @@ -14,31 +30,59 @@ func queryXInfoStream(qm queryModel, client redisClient) backend.DataResponse { response := backend.DataResponse{} // Execute command - var result map[string]string - err := client.RunFlatCmd(&result, "XINFO", "STREAM", qm.Key) + var model xinfo + err := client.RunFlatCmd(&model, "XINFO", "STREAM", qm.Key) // Check error if err != nil { return errorHandler(response, err) } - fields := []string{} - values := []string{} - - // Add fields and values - for k := range result { - fields = append(fields, k) - values = append(values, result[k]) - } - // New Frame - frame := data.NewFrame(qm.Key, - data.NewField("Field", nil, fields), - data.NewField("Value", nil, values)) + frame := data.NewFrame(qm.Key) // Add the frames to the response response.Frames = append(response.Frames, frame) + // Add plain fields to frame + frame.Fields = append(frame.Fields, data.NewField("length", nil, []int64{model.Length})) + frame.Fields = append(frame.Fields, data.NewField("radix-tree-keys", nil, []int64{model.RadixTreeKeys})) + frame.Fields = append(frame.Fields, data.NewField("radix-tree-nodes", nil, []int64{model.RadixTreeNodes})) + frame.Fields = append(frame.Fields, data.NewField("groups", nil, []int64{model.Groups})) + frame.Fields = append(frame.Fields, data.NewField("last-generated-id", nil, []string{model.LastGeneratedID})) + + // First entry + if model.FirstEntry != nil { + frame.Fields = append(frame.Fields, data.NewField("first-entry-id", nil, []string{string(model.FirstEntry[0].([]byte))})) + entryFields := model.FirstEntry[1].([]interface{}) + fields := new(bytes.Buffer) + + // Merging args to string like "key"="value"\n + for i := 0; i < len(entryFields); i += 2 { + field := string(entryFields[i].([]byte)) + value := string(entryFields[i+1].([]byte)) + fmt.Fprintf(fields, "\"%s\"=\"%s\"\n", field, value) + } + + frame.Fields = append(frame.Fields, data.NewField("first-entry-fields", nil, []string{fields.String()})) + } + + // Last entry + if model.LastEntry != nil { + frame.Fields = append(frame.Fields, data.NewField("last-entry-id", nil, []string{string(model.LastEntry[0].([]byte))})) + entryFields := model.LastEntry[1].([]interface{}) + fields := new(bytes.Buffer) + + // Merging args to string like "key"="value"\n + for i := 0; i < len(entryFields); i += 2 { + field := string(entryFields[i].([]byte)) + value := string(entryFields[i+1].([]byte)) + fmt.Fprintf(fields, "\"%s\"=\"%s\"\n", field, value) + } + + frame.Fields = append(frame.Fields, data.NewField("last-entry-fields", nil, []string{fields.String()})) + } + // Return return response } diff --git a/pkg/redis-stream_integration_test.go b/pkg/redis-stream_integration_test.go new file mode 100644 index 0000000..d5a7281 --- /dev/null +++ b/pkg/redis-stream_integration_test.go @@ -0,0 +1,36 @@ +// +build integration + +package main + +import ( + "fmt" + "testing" + + "github.com/mediocregopher/radix/v3" + "github.com/stretchr/testify/require" +) + +/** + * XINFO + */ +func TestXInfoStreamIntegration(t *testing.T) { + // Client + radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10) + client := radixV3Impl{radixClient: radixClient} + + // Customers + t.Run("query stream queue:customers", func(t *testing.T) { + resp := queryXInfoStream(queryModel{Key: "queue:customers"}, &client) + require.Len(t, resp.Frames, 1) + require.Len(t, resp.Frames[0].Fields, 9) + require.Equal(t, 1, resp.Frames[0].Fields[0].Len()) + }) + + // Orders + t.Run("query stream queue:orders", func(t *testing.T) { + resp := queryXInfoStream(queryModel{Key: "queue:orders"}, &client) + require.Len(t, resp.Frames, 1) + require.Len(t, resp.Frames[0].Fields, 9) + require.Equal(t, 1, resp.Frames[0].Fields[0].Len()) + }) +} diff --git a/pkg/redis-stream_test.go b/pkg/redis-stream_test.go index 587a9b4..10533ac 100644 --- a/pkg/redis-stream_test.go +++ b/pkg/redis-stream_test.go @@ -13,58 +13,94 @@ import ( func TestQueryXInfoStream(t *testing.T) { t.Parallel() - tests := []struct { - name string - qm queryModel - rcv interface{} - fieldsCount int - rowsPerField int - err error - }{ - { - "should handle default payload, but collect only top-level key-value pairs", - queryModel{Command: "xinfoStream", Key: "test1"}, - map[string]string{ - "length": "2", - "radix-tree-keys": "1", - "radix-tree-nodes": "2", - "groups": "2", - "last-generated-id": "1538385846314-0", + t.Run("should handle response with FirstEntry and LasEntry", func(t *testing.T) { + t.Parallel() + + // Client + client := testClient{ + rcv: xinfo{ + Length: 5, + RadixTreeKeys: 4, + RadixTreeNodes: 3, + Groups: 2, + LastGeneratedID: "id", + FirstEntry: []interface{}{ + []byte("id2"), + []interface{}{ + []byte("key1"), + []byte("value1"), + []byte("key2"), + []byte("value2"), + }, + }, + LastEntry: []interface{}{ + []byte("id3"), + []interface{}{ + []byte("key3"), + []byte("value3"), + []byte("key4"), + []byte("value4"), + }, + }, }, - 2, - 5, - nil, - }, - { - "should handle error", - queryModel{Command: "xinfoStream"}, - nil, - 0, - 0, - errors.New("error occurred"), - }, - } + } + + // Response + resp := queryXInfoStream(queryModel{Command: "xinfoStream", Key: "test1"}, &client) + require.Len(t, resp.Frames, 1) + require.Len(t, resp.Frames[0].Fields, 9) + require.Equal(t, 1, resp.Frames[0].Fields[0].Len()) + require.Equal(t, "length", resp.Frames[0].Fields[0].Name) + require.Equal(t, int64(5), resp.Frames[0].Fields[0].At(0)) + require.Equal(t, "radix-tree-keys", resp.Frames[0].Fields[1].Name) + require.Equal(t, int64(4), resp.Frames[0].Fields[1].At(0)) + require.Equal(t, "radix-tree-nodes", resp.Frames[0].Fields[2].Name) + require.Equal(t, int64(3), resp.Frames[0].Fields[2].At(0)) + require.Equal(t, "groups", resp.Frames[0].Fields[3].Name) + require.Equal(t, int64(2), resp.Frames[0].Fields[3].At(0)) + require.Equal(t, "last-generated-id", resp.Frames[0].Fields[4].Name) + require.Equal(t, "id", resp.Frames[0].Fields[4].At(0)) + require.Equal(t, "first-entry-id", resp.Frames[0].Fields[5].Name) + require.Equal(t, "id2", resp.Frames[0].Fields[5].At(0)) + require.Equal(t, "first-entry-fields", resp.Frames[0].Fields[6].Name) + require.Equal(t, "\"key1\"=\"value1\"\n\"key2\"=\"value2\"\n", resp.Frames[0].Fields[6].At(0)) + require.Equal(t, "last-entry-id", resp.Frames[0].Fields[7].Name) + require.Equal(t, "id3", resp.Frames[0].Fields[7].At(0)) + require.Equal(t, "last-entry-fields", resp.Frames[0].Fields[8].Name) + require.Equal(t, "\"key3\"=\"value3\"\n\"key4\"=\"value4\"\n", resp.Frames[0].Fields[8].At(0)) + }) + + t.Run("should handle response without FirstEntry and LastEntry", func(t *testing.T) { + t.Parallel() + + // Client + client := testClient{rcv: xinfo{ + Length: 5, + RadixTreeKeys: 4, + RadixTreeNodes: 3, + Groups: 2, + LastGeneratedID: "id", + FirstEntry: nil, + LastEntry: nil, + }} - // Run Tests - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() + // Response + resp := queryXInfoStream(queryModel{Command: "xinfoStream", Key: "test1"}, &client) + require.Len(t, resp.Frames, 1) + require.Len(t, resp.Frames[0].Fields, 5) + require.Equal(t, 1, resp.Frames[0].Fields[0].Len()) + }) - // Client - client := testClient{rcv: tt.rcv, err: tt.err} + // Error + t.Run("should handle rerror", func(t *testing.T) { + t.Parallel() - // Response - response := queryXInfoStream(tt.qm, &client) - if tt.err != nil { - require.EqualError(t, response.Error, tt.err.Error(), "Should set error to response if failed") - require.Nil(t, response.Frames, "No frames should be created if failed") - } else { - require.Equal(t, tt.qm.Key, response.Frames[0].Name, "Invalid frame name") - require.Len(t, response.Frames[0].Fields, tt.fieldsCount, "Invalid number of fields created ") - require.Equal(t, tt.rowsPerField, response.Frames[0].Fields[0].Len(), "Invalid number of values in field vectors") + // Client + client := testClient{err: errors.New("some error")} - } - }) - } + // Response + resp := queryXInfoStream(queryModel{Command: "xinfoStream", Key: "test1"}, &client) + require.Len(t, resp.Frames, 0) + require.EqualError(t, resp.Error, "some error") + }) } diff --git a/pkg/redis-tmscan_integration_test.go b/pkg/redis-tmscan_integration_test.go index 844dc24..2e29d31 100644 --- a/pkg/redis-tmscan_integration_test.go +++ b/pkg/redis-tmscan_integration_test.go @@ -92,9 +92,9 @@ func TestTMScanIntegrationWithMatched(t *testing.T) { require.Len(t, resp.Frames[0].Fields, 3) require.Len(t, resp.Frames[1].Fields, 2) require.Equal(t, 1, resp.Frames[1].Fields[0].Len()) - require.Equal(t, 6, resp.Frames[0].Fields[0].Len()) - require.Equal(t, 6, resp.Frames[0].Fields[1].Len()) - require.Equal(t, 6, resp.Frames[0].Fields[2].Len()) + require.Equal(t, 9, resp.Frames[0].Fields[0].Len()) + require.Equal(t, 9, resp.Frames[0].Fields[1].Len()) + require.Equal(t, 9, resp.Frames[0].Fields[2].Len()) require.Equal(t, "0", resp.Frames[1].Fields[0].At(0)) require.Equal(t, int64(resp.Frames[0].Fields[0].Len()), resp.Frames[1].Fields[1].At(0)) @@ -158,5 +158,5 @@ func TestTMScanIntegrationWithSize(t *testing.T) { } require.NotEqual(t, "0", resp.Frames[1].Fields[0].At(0)) - require.Equal(t, int64(10), resp.Frames[1].Fields[1].At(0)) + require.GreaterOrEqual(t, int64(10), resp.Frames[1].Fields[1].At(0)) } diff --git a/pkg/testing-utilities_test.go b/pkg/testing-utilities_test.go index a496814..a319823 100644 --- a/pkg/testing-utilities_test.go +++ b/pkg/testing-utilities_test.go @@ -119,6 +119,8 @@ func assignReceiver(to interface{}, from interface{}) { *(to.(*string)) = from.(string) case *pystats: *(to.(*pystats)) = from.(pystats) + case *xinfo: + *(to.(*xinfo)) = from.(xinfo) case *[]dumpregistrations: *(to.(*[]dumpregistrations)) = from.([]dumpregistrations) case interface{}: diff --git a/pkg/types.go b/pkg/types.go index 68b58fc..9ffdf17 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -38,23 +38,25 @@ type dataModel struct { * Query Model */ type queryModel struct { - Type string `json:"type"` - Query string `json:"query"` - Key string `json:"keyName"` - Field string `json:"field"` - Filter string `json:"filter"` - Command string `json:"command"` - Aggregation string `json:"aggregation"` - Bucket int `json:"bucket"` - Legend string `json:"legend"` - Value string `json:"value"` - Section string `json:"section"` - Size int `json:"size"` - Fill bool `json:"fill"` - Streaming bool `json:"streaming"` - CLI bool `json:"cli"` - Cursor string `json:"cursor"` - Match string `json:"match"` - Count int `json:"count"` - Samples int `json:"samples"` + Type string `json:"type"` + Query string `json:"query"` + Key string `json:"keyName"` + Field string `json:"field"` + Filter string `json:"filter"` + Command string `json:"command"` + Aggregation string `json:"aggregation"` + Bucket int `json:"bucket"` + Legend string `json:"legend"` + Value string `json:"value"` + Section string `json:"section"` + Size int `json:"size"` + Fill bool `json:"fill"` + Streaming bool `json:"streaming"` + CLI bool `json:"cli"` + Cursor string `json:"cursor"` + Match string `json:"match"` + Count int `json:"count"` + Samples int `json:"samples"` + Unblocking bool `json:"unblocking"` + Requirements string `json:"requirements"` } diff --git a/provisioning/dashboards/data-types.json b/provisioning/dashboards/data-types.json index dea7798..03bbfdd 100644 --- a/provisioning/dashboards/data-types.json +++ b/provisioning/dashboards/data-types.json @@ -1525,6 +1525,193 @@ "title": "SMEMBERS test:set", "type": "table" }, + { + "datasource": null, + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "dark-red", + "value": null + }, + { + "color": "dark-green", + "value": 3 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 19, + "x": 0, + "y": 22 + }, + "id": 57, + "options": { + "showHeader": true + }, + "pluginVersion": "7.3.7", + "targets": [ + { + "command": "xinfoStream", + "keyName": "test:stream", + "query": "", + "refId": "A", + "streaming": false, + "type": "command" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "XINFO STREAM test:stream", + "type": "table" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "dark-red", + "value": null + }, + { + "color": "dark-green", + "value": 2 + } + ] + } + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 5, + "x": 19, + "y": 22 + }, + "hiddenSeries": false, + "id": 58, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.7", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "command": "xinfoStream", + "keyName": "test:stream", + "query": "", + "refId": "A", + "streaming": true, + "type": "command" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "XINFO STREAM test:stream Streaming", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "first-entry-fields": true, + "first-entry-id": true, + "groups": true, + "last-entry-fields": true, + "last-entry-id": true, + "last-generated-id": true, + "length": false, + "radix-tree-keys": true, + "radix-tree-nodes": true + }, + "indexByName": {}, + "renameByName": {} + } + } + ], + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:69", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:70", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "collapsed": false, "datasource": null, @@ -1532,7 +1719,7 @@ "h": 1, "w": 24, "x": 0, - "y": 22 + "y": 28 }, "id": 39, "panels": [], @@ -1568,7 +1755,7 @@ "h": 3, "w": 24, "x": 0, - "y": 23 + "y": 29 }, "id": 41, "options": { @@ -1677,7 +1864,7 @@ "h": 9, "w": 11, "x": 0, - "y": 26 + "y": 32 }, "hiddenSeries": false, "id": 44, @@ -1853,7 +2040,7 @@ "h": 9, "w": 13, "x": 11, - "y": 26 + "y": 32 }, "hiddenSeries": false, "id": 42, @@ -2000,7 +2187,7 @@ "h": 1, "w": 24, "x": 0, - "y": 35 + "y": 41 }, "id": 50, "panels": [], @@ -2033,7 +2220,7 @@ "h": 9, "w": 2, "x": 0, - "y": 36 + "y": 42 }, "id": 53, "options": { @@ -2095,7 +2282,7 @@ "h": 9, "w": 6, "x": 2, - "y": 36 + "y": 42 }, "hiddenSeries": false, "id": 54, @@ -2340,7 +2527,7 @@ "h": 9, "w": 16, "x": 8, - "y": 36 + "y": 42 }, "id": 56, "options": { diff --git a/src/redis/command.ts b/src/redis/command.ts index 3684425..54f69de 100644 --- a/src/redis/command.ts +++ b/src/redis/command.ts @@ -55,14 +55,12 @@ export const Commands = { label: 'TYPE', description: 'Returns the string representation of the type of the value stored at key', value: 'type', + }, + { + label: 'XINFO STREAM', + description: 'Returns general information about the stream stored at the specified key', + value: 'xinfoStream', }, - /** - * { - * label: 'XINFO STREAM', - * description: 'Returns general information about the stream stored at the specified key', - * value: 'xinfoStream', - * }, - */ { label: 'XLEN', description: 'Returns the number of entries inside a stream',