Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api-server): add query parameters to the sql query api #2277

Merged
merged 13 commits into from
Sep 15, 2022
12 changes: 10 additions & 2 deletions docs/en/quickstart/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ The request body example:
```json
{
"mode": "online",
"sql": "select 1"
"sql": "SELECT c1, c2, c3 FROM demo WHERE c1 = ? AND c2 = ?",
qsliu2017 marked this conversation as resolved.
Show resolved Hide resolved
"parameter": {
"schema": ["Int32", "String"],
qsliu2017 marked this conversation as resolved.
Show resolved Hide resolved
"data": [1, "aaa"]
}
}
```

Expand All @@ -95,7 +99,11 @@ The response:
```json
{
"code":0,
"msg":"ok"
"msg":"ok",
"result": {
qsliu2017 marked this conversation as resolved.
Show resolved Hide resolved
"schema": ["Int32", "String", "Float"],
"data": [[1, "aaa", 1.2], [1, "aaa", 3.4]]
}
}
```

Expand Down
34 changes: 34 additions & 0 deletions docs/zh/quickstart/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,37 @@ response:
}
}
```

## 查询

The request URL: http://ip:port/dbs/{db_name}

HTTP method: POST

The request body example:

```json
{
"mode": "online",
"sql": "SELECT c1, c2, c3 FROM demo WHERE c1 = ? AND c2 = ?",
"parameter": {
"schema": ["Int32", "String"],
"data": [1, "aaa"]
}
}
```

mode: "offsync", "offasync", "online"

The response:

```json
{
"code":0,
"msg":"ok",
"result": {
"schema": ["Int32", "String", "Float"],
"data": [[1, "aaa", 1.2], [1, "aaa", 3.4]]
}
}
```
300 changes: 246 additions & 54 deletions src/apiserver/api_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,51 +104,52 @@ std::map<std::string, ExecContext> mode_map{
{"offsync", {false, true}}, {"offasync", {false, false}}, {"online", {true, false}}};

void APIServerImpl::RegisterQuery() {
provider_.post("/dbs/:db_name",
[this](const InterfaceProvider::Params& param, const butil::IOBuf& req_body, JsonWriter& writer) {
auto resp = GeneralResp();
auto db_it = param.find("db_name");
if (db_it == param.end()) {
writer << resp.Set("url has no db_name");
return;
}
auto db = db_it->second;

// default mode is offsync
QueryReq req;
JsonReader query_reader(req_body.to_string().c_str());
query_reader >> req;
if (!query_reader) {
writer << resp.Set("Json parse failed, " + req_body.to_string());
return;
}
auto mode = boost::to_lower_copy(req.mode);
auto it = mode_map.find(mode);
if (it == mode_map.end()) {
writer << resp.Set("Invalid mode " + mode);
return;
}
ExecContext ctx = it->second;

const auto& sql = req.sql;
VLOG(1) << "post [" << ctx.ToString() << "] query on db [" << db << "], sql: " << sql;
// TODO(hw): if api server supports standalone, we should check if cluster mode here

hybridse::sdk::Status status;
// TODO(hw): if sql is not a query, it may be a ddl, we use ExecuteSQL to execute it before we
// supports ddl http api. It's useful for api server tests(We can create table when we only
// connect to the api server).
auto rs = sql_router_->ExecuteSQL(db, sql, ctx.is_online, ctx.is_sync, ctx.job_timeout, &status);
if (!status.IsOK()) {
writer << resp.Set(status.code, status.msg);
LOG(WARNING) << "failed at: code " << status.code << ", msg " << status.msg;
return;
}

QueryResp query_resp;
query_resp.rs = rs;
writer << query_resp;
});
provider_.post("/dbs/:db_name", [this](const InterfaceProvider::Params& param, const butil::IOBuf& req_body,
JsonWriter& writer) {
auto resp = GeneralResp();
auto db_it = param.find("db_name");
if (db_it == param.end()) {
writer << resp.Set("url has no db_name");
return;
}
auto db = db_it->second;

// default mode is offsync
QueryReq req;
JsonReader query_reader(req_body.to_string().c_str());
query_reader >> req;
if (!query_reader) {
writer << resp.Set("Json parse failed, " + req_body.to_string());
return;
}
auto mode = boost::to_lower_copy(req.mode);
auto it = mode_map.find(mode);
if (it == mode_map.end()) {
writer << resp.Set("Invalid mode " + mode);
return;
}
ExecContext ctx = it->second;

const auto& sql = req.sql;
const auto parameter = req.parameter;
VLOG(1) << "post [" << ctx.ToString() << "] query on db [" << db << "], sql: " << sql;
// TODO(hw): if api server supports standalone, we should check if cluster mode here

hybridse::sdk::Status status;
// TODO(hw): if sql is not a query, it may be a ddl, we use ExecuteSQL to execute it before we
// supports ddl http api. It's useful for api server tests(We can create table when we only
// connect to the api server).
auto rs = sql_router_->ExecuteSQL(db, sql, parameter, ctx.is_online, ctx.is_sync, ctx.job_timeout, &status);
if (!status.IsOK()) {
writer << resp.Set(status.code, status.msg);
LOG(WARNING) << "failed at: code " << status.code << ", msg " << status.msg;
return;
}

QueryResp query_resp;
query_resp.rs = rs;
writer << query_resp;
});
}

bool APIServerImpl::Json2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
Expand Down Expand Up @@ -632,6 +633,148 @@ std::string APIServerImpl::InnerTypeTransform(const std::string& s) {
return out;
}

JsonReader& operator&(JsonReader& ar, QueryReq& s) { // NOLINT
ar.StartObject();
// mode is not optional
ar.Member("mode") & s.mode;
ar.Member("sql") & s.sql;
if (ar.HasMember("parameters")) {
ar.Member("parameters") & s.parameter;
}
return ar.EndObject();
}

JsonReader& operator&(JsonReader& ar, std::shared_ptr<openmldb::sdk::SQLRequestRow>& parameter) { // NOLINT
ar.StartObject(); // start "parameter"
if (!ar.HasMember("schema") || !ar.HasMember("data")) return ar;

::hybridse::vm::Schema schema;
{
ar.Member("schema");
size_t size;
ar.StartArray(&size); // start "schema"
for (auto i = 0; i < size; i++) {
std::string type;
ar& type;
// uppercase
std::transform(type.begin(), type.end(), type.begin(), [](unsigned char c) { return std::toupper(c); });

auto col = schema.Add();
if (type == "BOOL") {
col->set_type(::hybridse::type::kBool);
} else if (type == "SMALLINT" || type == "INT16") {
col->set_type(::hybridse::type::kInt16);
} else if (type == "INT" || type == "INT32") {
col->set_type(::hybridse::type::kInt32);
} else if (type == "BIGINT" || type == "INT64") {
col->set_type(::hybridse::type::kInt64);
} else if (type == "FLOAT") {
col->set_type(::hybridse::type::kFloat);
} else if (type == "DOUBLE") {
col->set_type(::hybridse::type::kDouble);
} else if (type == "STRING") {
col->set_type(::hybridse::type::kVarchar);
} else if (type == "DATE") {
col->set_type(::hybridse::type::kDate);
} else if (type == "TIMESTAMP") {
col->set_type(::hybridse::type::kTimestamp);
} else {
return ar;
}
}
ar.EndArray(); // end "schema"
}

int32_t str_length = 0;
{
ar.Member("data");
size_t size;
ar.StartArray(&size); // start first iter "data"
if (size != schema.size()) return ar;

for (auto col = schema.begin(); col != schema.end(); col++) {
if (col->type() == ::hybridse::type::kVarchar) {
std::string str;
ar& str;
str_length += str.length();
} else {
ar.Next();
}
}
ar.EndArray(); // end first iter "data"
}
{
::hybridse::sdk::SchemaImpl* schema_impl = new ::hybridse::sdk::SchemaImpl(schema);
parameter.reset(new openmldb::sdk::SQLRequestRow(std::shared_ptr<::hybridse::sdk::Schema>(schema_impl),
std::set<std::string>({})));

ar.Member("data");
size_t size;
ar.StartArray(&size); // start second iter "data"
if (!parameter->Init(str_length)) return ar;

for (auto col = schema.begin(); col != schema.end(); col++) {
bool ok;
switch (col->type()) {
case ::hybridse::type::kBool: {
bool b;
ar& b;
ok = parameter->AppendBool(b);
} break;
case ::hybridse::type::kInt16: {
int16_t i;
ar& i;
ok = parameter->AppendInt16(i);
} break;
case ::hybridse::type::kInt32: {
int32_t i;
ar& i;
ok = parameter->AppendInt32(i);
} break;
case ::hybridse::type::kInt64: {
int64_t i;
ar& i;
ok = parameter->AppendInt64(i);
} break;
case ::hybridse::type::kFloat: {
double f;
ar& f;
ok = parameter->AppendFloat(f);
} break;
case ::hybridse::type::kDouble: {
double d;
ar& d;
ok = parameter->AppendDouble(d);
} break;
case ::hybridse::type::kVarchar: {
std::string s;
ar& s;
ok = parameter->AppendString(s.c_str(), s.length());
} break;
case ::hybridse::type::kDate: {
int32_t date;
ar& date;
ok = parameter->AppendDate(date);
} break;
case ::hybridse::type::kTimestamp: {
int64_t timestamp;
ar& timestamp;
ok = parameter->AppendTimestamp(timestamp);
} break;
default:
ok = false;
}
if (!ok) return ar;
}

if (!parameter->Build()) return ar;

ar.EndArray(); // end second iter "data"
}

return ar.EndObject(); // end "parameter"
}

void WriteSchema(JsonWriter& ar, const std::string& name, const hybridse::sdk::Schema& schema, // NOLINT
bool only_const) {
ar.Member(name.c_str());
Expand Down Expand Up @@ -970,19 +1113,68 @@ JsonWriter& operator&(JsonWriter& ar, QueryResp& s) { // NOLINT
ar.Member("code") & s.code;
ar.Member("msg") & s.msg;
if (s.rs) {
ar.Member("data");
ar.StartArray();
auto& rs = s.rs;
rs->Reset();
auto& schema = *rs->GetSchema();
while (rs->Next()) {
ar.Member("result");
ar.StartObject(); // start result
{
ar.Member("schema");
ar.StartArray(); // start schema
auto& rs = s.rs;
rs->Reset();
auto& schema = *rs->GetSchema();
for (auto n = schema.GetColumnCnt(), i = 0; i < n; i++) {
std::string type;
switch (schema.GetColumnType(i)) {
case hybridse::sdk::kTypeBool:
type = "Bool";
break;
case hybridse::sdk::kTypeInt16:
type = "Int16";
break;
case hybridse::sdk::kTypeInt32:
type = "Int32";
break;
case hybridse::sdk::kTypeInt64:
type = "Int64";
break;
case hybridse::sdk::kTypeFloat:
type = "Float";
break;
case hybridse::sdk::kTypeDouble:
type = "Double";
break;
case hybridse::sdk::kTypeString:
type = "String";
break;
case hybridse::sdk::kTypeDate:
type = "Date";
break;
case hybridse::sdk::kTypeTimestamp:
type = "Timestamp";
break;
default:
type = "Unknown";
break;
}
ar& type;
}
ar.EndArray(); // end schema
}
{
ar.Member("data");
ar.StartArray();
for (decltype(schema.GetColumnCnt()) i = 0; i < schema.GetColumnCnt(); i++) {
WriteValue(ar, rs, i);
auto& rs = s.rs;
rs->Reset();
auto& schema = *rs->GetSchema();
while (rs->Next()) {
ar.StartArray();
for (decltype(schema.GetColumnCnt()) i = 0; i < schema.GetColumnCnt(); i++) {
WriteValue(ar, rs, i);
}
ar.EndArray();
}
ar.EndArray();
}
ar.EndArray();
ar.EndObject(); // end result
}
return ar.EndObject();
}
Expand Down
Loading