From 4b3d145ae56877556d0db914a4353178e19c87d4 Mon Sep 17 00:00:00 2001 From: "kyle.cao" Date: Thu, 30 Dec 2021 15:44:33 +0800 Subject: [PATCH] add wrapper --- basic_example/parameter_example.go | 30 +---- client_test.go | 34 ++--- connection.go | 25 ++-- session.go | 202 +++++++++++++++++++++++++---- 4 files changed, 195 insertions(+), 96 deletions(-) diff --git a/basic_example/parameter_example.go b/basic_example/parameter_example.go index 59e6933b..f89bf69a 100644 --- a/basic_example/parameter_example.go +++ b/basic_example/parameter_example.go @@ -12,7 +12,6 @@ import ( "sync" nebulago "github.com/vesoft-inc/nebula-go/v2" - nebula "github.com/vesoft-inc/nebula-go/v2/nebula" ) const ( @@ -60,30 +59,11 @@ func main() { } } - var params map[string]*nebula.Value - params = make(map[string]*nebula.Value) - - var bVal bool = true - var iVal int64 = 3 - // bool - p1 := nebula.Value{BVal: &bVal} - // int - p2 := nebula.Value{IVal: &iVal} - // list - lSlice := []*nebula.Value{&p1, &p2} - var lVal nebula.NList - lVal.Values = lSlice - p3 := nebula.Value{LVal: &lVal} - // map - var nmap map[string]*nebula.Value = map[string]*nebula.Value{"a": &p1, "b": &p2} - var mVal nebula.NMap - mVal.Kvs = nmap - p4 := nebula.Value{MVal: &mVal} - - params["p1"] = &p1 - params["p2"] = &p2 - params["p3"] = &p3 - params["p4"] = &p4 + params := make(map[string]interface{}) + params["p1"] = true + params["p2"] = 3 + params["p3"] = []interface{}{true, 3} + params["p4"] = map[string]interface{}{"a":true, "b":3} // Extract data from the resultSet { diff --git a/client_test.go b/client_test.go index 7e772731..be38745a 100644 --- a/client_test.go +++ b/client_test.go @@ -49,8 +49,6 @@ var nebulaLog = DefaultLogger{} // Create default configs var testPoolConfig = GetDefaultConf() -var params map[string]*nebula.Value - // Before run `go test -v`, you should start a nebula server listening on 3699 port. // Using docker-compose is the easiest way and you can reference this file: // https://github.com/vesoft-inc/nebula/blob/master/docker/docker-compose.yaml @@ -992,8 +990,14 @@ func TestExecuteWithParameter(t *testing.T) { createTestDataSchema(t, session) // Load data loadTestData(t, session) + // p1:true p2:3 p3:[true,3] p4:{"a":true,"b":"Bob"} - prepareParameter() + params := make(map[string]interface{}) + params["p1"] = true + params["p2"] = 3 + params["p3"] = []interface{}{true, 3} + params["p4"] = map[string]interface{}{"a": true, "b": "Bob"} + // Simple result { resp, err := tryToExecuteWithParameter(session, "RETURN toBoolean($p1) and false, $p2+3, $p3[1]>3", params) @@ -1208,7 +1212,7 @@ func tryToExecute(session *Session, query string) (resp *ResultSet, err error) { return } -func tryToExecuteWithParameter(session *Session, query string, params map[string]*nebula.Value) (resp *ResultSet, err error) { +func tryToExecuteWithParameter(session *Session, query string, params map[string]interface{}) (resp *ResultSet, err error) { for i := 3; i > 0; i-- { resp, err = session.ExecuteWithParameter(query, params) if err == nil && resp.IsSucceed() { @@ -1309,28 +1313,6 @@ func loadTestData(t *testing.T, session *Session) { checkResultSet(t, query, resultSet) } -func prepareParameter() { - // p1:true p2:3 p3:[true,3] p4:{"a":true,"b":"Bob"} - params = make(map[string]*nebula.Value) - var bVal bool = true - var iVal int64 = 3 - p1 := nebula.Value{BVal: &bVal} - p2 := nebula.Value{IVal: &iVal} - p5 := nebula.Value{SVal: []byte("Bob")} - lSlice := []*nebula.Value{&p1, &p2} - var lVal nebula.NList - lVal.Values = lSlice - p3 := nebula.Value{LVal: &lVal} - var nmap map[string]*nebula.Value = map[string]*nebula.Value{"a": &p1, "b": &p5} - var mVal nebula.NMap - mVal.Kvs = nmap - p4 := nebula.Value{MVal: &mVal} - params["p1"] = &p1 - params["p2"] = &p2 - params["p3"] = &p3 - params["p4"] = &p4 -} - func dropSpace(t *testing.T, session *Session, spaceName string) { query := fmt.Sprintf("DROP SPACE IF EXISTS %s;", spaceName) resultSet, err := tryToExecute(session, query) diff --git a/connection.go b/connection.go index 16ecac3b..93bafe38 100644 --- a/connection.go +++ b/connection.go @@ -111,21 +111,7 @@ func (cn *connection) authenticate(username, password string) (*graph.AuthRespon } func (cn *connection) execute(sessionID int64, stmt string) (*graph.ExecutionResponse, error) { - resp, err := cn.graph.Execute(sessionID, []byte(stmt)) - if err != nil { - // reopen the connection if timeout - if _, ok := err.(thrift.TransportException); ok { - if err.(thrift.TransportException).TypeID() == thrift.TIMED_OUT { - reopenErr := cn.reopen() - if reopenErr != nil { - return nil, reopenErr - } - return cn.graph.Execute(sessionID, []byte(stmt)) - } - } - } - - return resp, err + return cn.executeWithParameter(sessionID, stmt, map[string]*nebula.Value{}) } func (cn *connection) executeWithParameter(sessionID int64, stmt string, params map[string]*nebula.Value) (*graph.ExecutionResponse, error) { @@ -145,8 +131,13 @@ func (cn *connection) executeWithParameter(sessionID int64, stmt string, params return resp, err } + func (cn *connection) executeJson(sessionID int64, stmt string) ([]byte, error) { - jsonResp, err := cn.graph.ExecuteJson(sessionID, []byte(stmt)) + return cn.ExecuteJsonWithParameter(sessionID, stmt, map[string]*nebula.Value{}) +} + +func (cn *connection) ExecuteJsonWithParameter(sessionID int64, stmt string, params map[string]*nebula.Value) ([]byte, error) { + jsonResp, err := cn.graph.ExecuteJsonWithParameter(sessionID, []byte(stmt), params) if err != nil { // reopen the connection if timeout if _, ok := err.(thrift.TransportException); ok { @@ -155,7 +146,7 @@ func (cn *connection) executeJson(sessionID int64, stmt string) ([]byte, error) if reopenErr != nil { return nil, reopenErr } - return cn.graph.ExecuteJson(sessionID, []byte(stmt)) + return cn.graph.ExecuteJsonWithParameter(sessionID, []byte(stmt), params) } } } diff --git a/session.go b/session.go index 0d9fd062..9c0d578d 100644 --- a/session.go +++ b/session.go @@ -61,41 +61,23 @@ func (session *Session) executeWithReconnect(f func() (interface{}, error)) (int } -// Execute returns the result of the given query as a ResultSet -func (session *Session) Execute(stmt string) (*ResultSet, error) { +// Execute returns the result of given query as a ResultSet +func (session *Session) ExecuteWithParameter(stmt string, params map[string]interface{}) (*ResultSet, error) { session.mu.Lock() defer session.mu.Unlock() if session.connection == nil { return nil, fmt.Errorf("failed to execute: Session has been released") } - - execFunc := func() (interface{}, error) { - resp, err := session.connection.execute(session.sessionID, stmt) - if err != nil { - return nil, err + paramsMap := make(map[string]*nebula.Value) + for k, v := range params { + nv, er := value2Nvalue(v) + if er != nil { + return nil, er } - resSet, err := genResultSet(resp, session.timezoneInfo) - if err != nil { - return nil, err - } - return resSet, nil - } - resp, err := session.executeWithReconnect(execFunc) - if err != nil { - return nil, err - } - return resp.(*ResultSet), err -} - -// Execute returns the result of given query as a ResultSet -func (session *Session) ExecuteWithParameter(stmt string, params map[string]*nebula.Value) (*ResultSet, error) { - session.mu.Lock() - defer session.mu.Unlock() - if session.connection == nil { - return nil, fmt.Errorf("failed to execute: Session has been released") + paramsMap[k] = nv } execFunc := func() (interface{}, error) { - resp, err := session.connection.executeWithParameter(session.sessionID, stmt, params) + resp, err := session.connection.executeWithParameter(session.sessionID, stmt, paramsMap) if err != nil { return nil, err } @@ -114,6 +96,11 @@ func (session *Session) ExecuteWithParameter(stmt string, params map[string]*neb } +// Execute returns the result of the given query as a ResultSet +func (session *Session) Execute(stmt string) (*ResultSet, error) { + return session.ExecuteWithParameter(stmt, map[string]interface{}{}) +} + // ExecuteJson returns the result of the given query as a json string // Date and Datetime will be returned in UTC // JSON struct: @@ -173,14 +160,84 @@ func (session *Session) ExecuteWithParameter(stmt string, params map[string]*neb // ] // } func (session *Session) ExecuteJson(stmt string) ([]byte, error) { + return session.ExecuteJsonWithParameter(stmt, map[string]interface{}{}) +} + +// ExecuteJson returns the result of the given query as a json string +// Date and Datetime will be returned in UTC +// JSON struct: +// { +// "results":[ +// { +// "columns":[ +// ], +// "data":[ +// { +// "row":[ +// "row-data" +// ], +// "meta":[ +// "metadata" +// ] +// } +// ], +// "latencyInUs":0, +// "spaceName":"", +// "planDesc ":{ +// "planNodeDescs":[ +// { +// "name":"", +// "id":0, +// "outputVar":"", +// "description":{ +// "key":"" +// }, +// "profiles":[ +// { +// "rows":1, +// "execDurationInUs":0, +// "totalDurationInUs":0, +// "otherStats":{} +// } +// ], +// "branchInfo":{ +// "isDoBranch":false, +// "conditionNodeId":-1 +// }, +// "dependencies":[] +// } +// ], +// "nodeIndexMap":{}, +// "format":"", +// "optimize_time_in_us":0 +// }, +// "comment ":"" +// } +// ], +// "errors":[ +// { +// "code": 0, +// "message": "" +// } +// ] +// } +func (session *Session) ExecuteJsonWithParameter(stmt string, params map[string]interface{}) ([]byte, error) { session.mu.Lock() defer session.mu.Unlock() if session.connection == nil { return nil, fmt.Errorf("failed to execute: Session has been released") } + paramsMap := make(map[string]*nebula.Value) + for k, v := range params { + nv, er := value2Nvalue(v) + if er != nil { + return nil, er + } + paramsMap[k] = nv + } execFunc := func() (interface{}, error) { - resp, err := session.connection.executeJson(session.sessionID, stmt) + resp, err := session.connection.ExecuteJsonWithParameter(session.sessionID, stmt, paramsMap) if err != nil { return nil, err } @@ -234,3 +291,92 @@ func (session *Session) GetSessionID() int64 { func IsError(resp *graph.ExecutionResponse) bool { return resp.GetErrorCode() != nebula.ErrorCode_SUCCEEDED } + +// construct Slice to nebula.NList +func Slice2Nlist(list []interface{}) (*nebula.NList, error) { + sv := []*nebula.Value{} + var ret nebula.NList + for _, item := range list { + nv, er := value2Nvalue(item) + if er != nil { + return nil, er + } + sv = append(sv, nv) + } + ret.Values = sv + return &ret, nil +} + +// construct map to nebula.NMap +func Map2Nmap(m map[string]interface{}) (*nebula.NMap, error) { + var ret nebula.NMap + kvs := map[string]*nebula.Value{} + for k, v := range m { + nv, err := value2Nvalue(v) + if err != nil { + return nil, err + } + kvs[k] = nv + } + ret.Kvs = kvs + return &ret, nil +} + +// construct go-type to nebula.Value +func value2Nvalue(any interface{}) (value *nebula.Value, err error) { + value = nebula.NewValue() + if v, ok := any.(bool); ok { + value.BVal = &v + } else if v, ok := any.(int); ok { + ival := int64(v) + value.IVal = &ival + } else if v, ok := any.(float64); ok { + if v == float64(int64(v)) { + iv := int64(v) + value.IVal = &iv + } else { + value.FVal = &v + } + } else if v, ok := any.(float32); ok { + if v == float32(int64(v)) { + iv := int64(v) + value.IVal = &iv + } else { + fval := float64(v) + value.FVal = &fval + } + } else if v, ok := any.(string); ok { + value.SVal = []byte(v) + } else if any == nil { + nval := nebula.NullType___NULL__ + value.NVal = &nval + } else if v, ok := any.([]interface{}); ok { + nv, er := Slice2Nlist([]interface{}(v)) + if er != nil { + err = er + } + value.LVal = nv + } else if v, ok := any.(map[string]interface{}); ok { + nv, er := Map2Nmap(map[string]interface{}(v)) + if er != nil { + err = er + } + value.MVal = nv + } else if v, ok := any.(nebula.Value); ok { + value = &v + } else if v, ok := any.(nebula.Date); ok { + value.SetDVal(&v) + } else if v, ok := any.(nebula.DateTime); ok { + value.SetDtVal(&v) + } else if v, ok := any.(nebula.Duration); ok { + value.SetDuVal(&v) + } else if v, ok := any.(nebula.Time); ok { + value.SetTVal(&v) + } else if v, ok := any.(nebula.Geography); ok { + value.SetGgVal(&v) + } else { + // unsupport other Value type, use this function carefully + err = fmt.Errorf("Only support convert boolean/float/int/string/map/list to nebula.Value but %T", any) + } + return +}