Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change executeWithParameter signature #179

Merged
merged 1 commit into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 5 additions & 25 deletions basic_example/parameter_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync"

nebulago "github.com/vesoft-inc/nebula-go/v2"
nebula "github.com/vesoft-inc/nebula-go/v2/nebula"
)

const (
Expand Down Expand Up @@ -60,30 +59,11 @@ func main() {
}
}

var params map[string]*nebula.Value
params = make(map[string]*nebula.Value)

var bVal bool = true
var iVal int64 = 3
// bool
p1 := nebula.Value{BVal: &bVal}
// int
p2 := nebula.Value{IVal: &iVal}
// list
lSlice := []*nebula.Value{&p1, &p2}
var lVal nebula.NList
lVal.Values = lSlice
p3 := nebula.Value{LVal: &lVal}
// map
var nmap map[string]*nebula.Value = map[string]*nebula.Value{"a": &p1, "b": &p2}
var mVal nebula.NMap
mVal.Kvs = nmap
p4 := nebula.Value{MVal: &mVal}

params["p1"] = &p1
params["p2"] = &p2
params["p3"] = &p3
params["p4"] = &p4
params := make(map[string]interface{})
params["p1"] = true
params["p2"] = 3
params["p3"] = []interface{}{true, 3}
params["p4"] = map[string]interface{}{"a":true, "b":3}

// Extract data from the resultSet
{
Expand Down
34 changes: 8 additions & 26 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ var nebulaLog = DefaultLogger{}
// Create default configs
var testPoolConfig = GetDefaultConf()

var params map[string]*nebula.Value

// Before run `go test -v`, you should start a nebula server listening on 3699 port.
// Using docker-compose is the easiest way and you can reference this file:
// https://github.com/vesoft-inc/nebula/blob/master/docker/docker-compose.yaml
Expand Down Expand Up @@ -992,8 +990,14 @@ func TestExecuteWithParameter(t *testing.T) {
createTestDataSchema(t, session)
// Load data
loadTestData(t, session)

// p1:true p2:3 p3:[true,3] p4:{"a":true,"b":"Bob"}
prepareParameter()
params := make(map[string]interface{})
params["p1"] = true
params["p2"] = 3
params["p3"] = []interface{}{true, 3}
params["p4"] = map[string]interface{}{"a": true, "b": "Bob"}

// Simple result
{
resp, err := tryToExecuteWithParameter(session, "RETURN toBoolean($p1) and false, $p2+3, $p3[1]>3", params)
Expand Down Expand Up @@ -1208,7 +1212,7 @@ func tryToExecute(session *Session, query string) (resp *ResultSet, err error) {
return
}

func tryToExecuteWithParameter(session *Session, query string, params map[string]*nebula.Value) (resp *ResultSet, err error) {
func tryToExecuteWithParameter(session *Session, query string, params map[string]interface{}) (resp *ResultSet, err error) {
for i := 3; i > 0; i-- {
resp, err = session.ExecuteWithParameter(query, params)
if err == nil && resp.IsSucceed() {
Expand Down Expand Up @@ -1309,28 +1313,6 @@ func loadTestData(t *testing.T, session *Session) {
checkResultSet(t, query, resultSet)
}

func prepareParameter() {
// p1:true p2:3 p3:[true,3] p4:{"a":true,"b":"Bob"}
params = make(map[string]*nebula.Value)
var bVal bool = true
var iVal int64 = 3
p1 := nebula.Value{BVal: &bVal}
p2 := nebula.Value{IVal: &iVal}
p5 := nebula.Value{SVal: []byte("Bob")}
lSlice := []*nebula.Value{&p1, &p2}
var lVal nebula.NList
lVal.Values = lSlice
p3 := nebula.Value{LVal: &lVal}
var nmap map[string]*nebula.Value = map[string]*nebula.Value{"a": &p1, "b": &p5}
var mVal nebula.NMap
mVal.Kvs = nmap
p4 := nebula.Value{MVal: &mVal}
params["p1"] = &p1
params["p2"] = &p2
params["p3"] = &p3
params["p4"] = &p4
}

func dropSpace(t *testing.T, session *Session, spaceName string) {
query := fmt.Sprintf("DROP SPACE IF EXISTS %s;", spaceName)
resultSet, err := tryToExecute(session, query)
Expand Down
25 changes: 8 additions & 17 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,7 @@ func (cn *connection) authenticate(username, password string) (*graph.AuthRespon
}

func (cn *connection) execute(sessionID int64, stmt string) (*graph.ExecutionResponse, error) {
resp, err := cn.graph.Execute(sessionID, []byte(stmt))
if err != nil {
// reopen the connection if timeout
if _, ok := err.(thrift.TransportException); ok {
if err.(thrift.TransportException).TypeID() == thrift.TIMED_OUT {
reopenErr := cn.reopen()
if reopenErr != nil {
return nil, reopenErr
}
return cn.graph.Execute(sessionID, []byte(stmt))
}
}
}

return resp, err
return cn.executeWithParameter(sessionID, stmt, map[string]*nebula.Value{})
}

func (cn *connection) executeWithParameter(sessionID int64, stmt string, params map[string]*nebula.Value) (*graph.ExecutionResponse, error) {
Expand All @@ -145,8 +131,13 @@ func (cn *connection) executeWithParameter(sessionID int64, stmt string, params

return resp, err
}

func (cn *connection) executeJson(sessionID int64, stmt string) ([]byte, error) {
jsonResp, err := cn.graph.ExecuteJson(sessionID, []byte(stmt))
return cn.ExecuteJsonWithParameter(sessionID, stmt, map[string]*nebula.Value{})
}

func (cn *connection) ExecuteJsonWithParameter(sessionID int64, stmt string, params map[string]*nebula.Value) ([]byte, error) {
jsonResp, err := cn.graph.ExecuteJsonWithParameter(sessionID, []byte(stmt), params)
if err != nil {
// reopen the connection if timeout
if _, ok := err.(thrift.TransportException); ok {
Expand All @@ -155,7 +146,7 @@ func (cn *connection) executeJson(sessionID int64, stmt string) ([]byte, error)
if reopenErr != nil {
return nil, reopenErr
}
return cn.graph.ExecuteJson(sessionID, []byte(stmt))
return cn.graph.ExecuteJsonWithParameter(sessionID, []byte(stmt), params)
}
}
}
Expand Down
202 changes: 174 additions & 28 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,41 +61,23 @@ func (session *Session) executeWithReconnect(f func() (interface{}, error)) (int

}

// Execute returns the result of the given query as a ResultSet
func (session *Session) Execute(stmt string) (*ResultSet, error) {
// Execute returns the result of given query as a ResultSet
Aiee marked this conversation as resolved.
Show resolved Hide resolved
func (session *Session) ExecuteWithParameter(stmt string, params map[string]interface{}) (*ResultSet, error) {
session.mu.Lock()
defer session.mu.Unlock()
if session.connection == nil {
return nil, fmt.Errorf("failed to execute: Session has been released")
}

execFunc := func() (interface{}, error) {
resp, err := session.connection.execute(session.sessionID, stmt)
if err != nil {
return nil, err
paramsMap := make(map[string]*nebula.Value)
for k, v := range params {
nv, er := value2Nvalue(v)
if er != nil {
return nil, er
}
resSet, err := genResultSet(resp, session.timezoneInfo)
if err != nil {
return nil, err
}
return resSet, nil
}
resp, err := session.executeWithReconnect(execFunc)
if err != nil {
return nil, err
}
return resp.(*ResultSet), err
}

// Execute returns the result of given query as a ResultSet
func (session *Session) ExecuteWithParameter(stmt string, params map[string]*nebula.Value) (*ResultSet, error) {
session.mu.Lock()
defer session.mu.Unlock()
if session.connection == nil {
return nil, fmt.Errorf("failed to execute: Session has been released")
paramsMap[k] = nv
}
execFunc := func() (interface{}, error) {
resp, err := session.connection.executeWithParameter(session.sessionID, stmt, params)
resp, err := session.connection.executeWithParameter(session.sessionID, stmt, paramsMap)
if err != nil {
return nil, err
}
Expand All @@ -114,6 +96,11 @@ func (session *Session) ExecuteWithParameter(stmt string, params map[string]*neb

}

// Execute returns the result of the given query as a ResultSet
func (session *Session) Execute(stmt string) (*ResultSet, error) {
return session.ExecuteWithParameter(stmt, map[string]interface{}{})
}

// ExecuteJson returns the result of the given query as a json string
// Date and Datetime will be returned in UTC
// JSON struct:
Expand Down Expand Up @@ -173,14 +160,84 @@ func (session *Session) ExecuteWithParameter(stmt string, params map[string]*neb
// ]
// }
func (session *Session) ExecuteJson(stmt string) ([]byte, error) {
return session.ExecuteJsonWithParameter(stmt, map[string]interface{}{})
}

// ExecuteJson returns the result of the given query as a json string
// Date and Datetime will be returned in UTC
// JSON struct:
// {
// "results":[
// {
// "columns":[
// ],
// "data":[
// {
// "row":[
// "row-data"
// ],
// "meta":[
// "metadata"
// ]
// }
// ],
// "latencyInUs":0,
// "spaceName":"",
// "planDesc ":{
// "planNodeDescs":[
// {
// "name":"",
// "id":0,
// "outputVar":"",
// "description":{
// "key":""
// },
// "profiles":[
// {
// "rows":1,
// "execDurationInUs":0,
// "totalDurationInUs":0,
// "otherStats":{}
// }
// ],
// "branchInfo":{
// "isDoBranch":false,
// "conditionNodeId":-1
// },
// "dependencies":[]
// }
// ],
// "nodeIndexMap":{},
// "format":"",
// "optimize_time_in_us":0
// },
// "comment ":""
// }
// ],
// "errors":[
// {
// "code": 0,
// "message": ""
// }
// ]
// }
func (session *Session) ExecuteJsonWithParameter(stmt string, params map[string]interface{}) ([]byte, error) {
session.mu.Lock()
defer session.mu.Unlock()
if session.connection == nil {
return nil, fmt.Errorf("failed to execute: Session has been released")
}

paramsMap := make(map[string]*nebula.Value)
for k, v := range params {
nv, er := value2Nvalue(v)
if er != nil {
return nil, er
}
paramsMap[k] = nv
}
execFunc := func() (interface{}, error) {
resp, err := session.connection.executeJson(session.sessionID, stmt)
resp, err := session.connection.ExecuteJsonWithParameter(session.sessionID, stmt, paramsMap)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -234,3 +291,92 @@ func (session *Session) GetSessionID() int64 {
func IsError(resp *graph.ExecutionResponse) bool {
return resp.GetErrorCode() != nebula.ErrorCode_SUCCEEDED
}

// construct Slice to nebula.NList
func Slice2Nlist(list []interface{}) (*nebula.NList, error) {
sv := []*nebula.Value{}
var ret nebula.NList
for _, item := range list {
nv, er := value2Nvalue(item)
if er != nil {
return nil, er
}
sv = append(sv, nv)
}
ret.Values = sv
return &ret, nil
}

// construct map to nebula.NMap
func Map2Nmap(m map[string]interface{}) (*nebula.NMap, error) {
var ret nebula.NMap
kvs := map[string]*nebula.Value{}
for k, v := range m {
nv, err := value2Nvalue(v)
if err != nil {
return nil, err
}
kvs[k] = nv
}
ret.Kvs = kvs
return &ret, nil
}
Aiee marked this conversation as resolved.
Show resolved Hide resolved

// construct go-type to nebula.Value
func value2Nvalue(any interface{}) (value *nebula.Value, err error) {
value = nebula.NewValue()
if v, ok := any.(bool); ok {
value.BVal = &v
} else if v, ok := any.(int); ok {
ival := int64(v)
value.IVal = &ival
} else if v, ok := any.(float64); ok {
if v == float64(int64(v)) {
iv := int64(v)
value.IVal = &iv
} else {
value.FVal = &v
}
} else if v, ok := any.(float32); ok {
if v == float32(int64(v)) {
iv := int64(v)
value.IVal = &iv
} else {
fval := float64(v)
value.FVal = &fval
}
} else if v, ok := any.(string); ok {
value.SVal = []byte(v)
} else if any == nil {
nval := nebula.NullType___NULL__
value.NVal = &nval
} else if v, ok := any.([]interface{}); ok {
nv, er := Slice2Nlist([]interface{}(v))
if er != nil {
err = er
}
value.LVal = nv
} else if v, ok := any.(map[string]interface{}); ok {
nv, er := Map2Nmap(map[string]interface{}(v))
if er != nil {
err = er
}
value.MVal = nv
} else if v, ok := any.(nebula.Value); ok {
value = &v
} else if v, ok := any.(nebula.Date); ok {
value.SetDVal(&v)
} else if v, ok := any.(nebula.DateTime); ok {
value.SetDtVal(&v)
} else if v, ok := any.(nebula.Duration); ok {
value.SetDuVal(&v)
} else if v, ok := any.(nebula.Time); ok {
value.SetTVal(&v)
} else if v, ok := any.(nebula.Geography); ok {
value.SetGgVal(&v)
} else {
// unsupport other Value type, use this function carefully
err = fmt.Errorf("Only support convert boolean/float/int/string/map/list to nebula.Value but %T", any)
}
return
}