Skip to content

Commit

Permalink
feat: deploy with data (#3288)
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 committed Jun 29, 2023
1 parent feca8ef commit d5118ed
Show file tree
Hide file tree
Showing 33 changed files with 2,414 additions and 3,038 deletions.
9 changes: 9 additions & 0 deletions docs/en/reference/sql/deployment_manage/DEPLOY_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ DEPLOY demo OPTIONS (SKIP_INDEX_CHECK="TRUE")
SELECT * FROM t1 LAST JOIN t2 ORDER BY t2.col3 ON t1.col1 = t2.col1;
```

### Synchronization/Asynchronization Settings
When executing deploy, you can set the synchronous/asynchronous mode through the `SYNC` option. The default value of `SYNC` is `true`, that is, the synchronous mode. If the relevant tables involved in the deploy statement have data and needs to add one or more indexs, executing deploy will initiate a job to execute a series of tasks such as loading data. In this case a job id will be returned if the `SYNC` option is set to `false`. You can get the job execution status by `SHOW JOBS FROM NAMESERVER LIKE '{job_id}'`

**Example**
```sql
deploy demo options(SYNC="false") SELECT t1.col1, t2.col2, sum(col4) OVER w1 as w1_col4_sum FROM t1 LAST JOIN t2 ORDER BY t2.col3 ON t1.col2 = t2.col2
WINDOW w1 AS (PARTITION BY t1.col2 ORDER BY t1.col3 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);
```

## Relevant SQL

[USE DATABASE](../ddl/USE_DATABASE_STATEMENT.md)
Expand Down
10 changes: 10 additions & 0 deletions docs/zh/openmldb_sql/deployment_manage/DEPLOY_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ DeployOption
DeployOptionItem
::= 'LONG_WINDOWS' '=' LongWindowDefinitions
| 'SKIP_INDEX_CHECK' '=' string_literal
| 'SYNC' '=' string_literal
```

#### 长窗口优化
Expand Down Expand Up @@ -160,6 +161,15 @@ DEPLOY demo OPTIONS (SKIP_INDEX_CHECK="TRUE")
SELECT * FROM t1 LAST JOIN t2 ORDER BY t2.col3 ON t1.col1 = t2.col1;
```

### 设置同步/异步
执行deploy的时候可以通过`SYNC`选项来设置同步/异步模式, 默认为`true`即同步模式。如果deploy语句中涉及的相关表有数据,并且需要添加索引的情况下,执行deploy会发起数据加载等任务,如果`SYNC`选项设置为`false`就会返回一个任务id。可以通过`SHOW JOBS FROM NAMESERVER LIKE '{job_id}'`来查看任务执行状态。

**Example**
```sql
deploy demo options(SYNC="false") SELECT t1.col1, t2.col2, sum(col4) OVER w1 as w1_col4_sum FROM t1 LAST JOIN t2 ORDER BY t2.col3 ON t1.col2 = t2.col2
WINDOW w1 AS (PARTITION BY t1.col2 ORDER BY t1.col3 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);
```

## 相关SQL

[USE DATABASE](../ddl/USE_DATABASE_STATEMENT.md)
Expand Down
53 changes: 45 additions & 8 deletions src/client/ns_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,19 @@ bool NsClient::MakeSnapshot(const std::string& name, const std::string& db, uint
return false;
}

bool NsClient::ShowOPStatus(::openmldb::nameserver::ShowOPStatusResponse& response, const std::string& name,
uint32_t pid, std::string& msg) {
base::Status NsClient::ShowOPStatus(uint64_t op_id, nameserver::ShowOPStatusResponse* response) {
::openmldb::nameserver::ShowOPStatusRequest request;
request.set_op_id(op_id);
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::ShowOPStatus, &request, response,
FLAGS_request_timeout_ms, 1);
if (ok && response->code() == 0) {
return {};
}
return {base::ReturnCode::kError, response->msg()};
}

base::Status NsClient::ShowOPStatus(const std::string& name, uint32_t pid,
::openmldb::nameserver::ShowOPStatusResponse* response) {
::openmldb::nameserver::ShowOPStatusRequest request;
if (const std::string& db = GetDb(); !db.empty()) {
request.set_db(db);
Expand All @@ -199,13 +210,12 @@ bool NsClient::ShowOPStatus(::openmldb::nameserver::ShowOPStatusResponse& respon
if (pid != INVALID_PID) {
request.set_pid(pid);
}
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::ShowOPStatus, &request, &response,
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::ShowOPStatus, &request, response,
FLAGS_request_timeout_ms, 1);
msg = response.msg();
if (ok && response.code() == 0) {
return true;
if (ok && response->code() == 0) {
return {};
}
return false;
return {base::ReturnCode::kError, response->msg()};
}

bool NsClient::CancelOP(uint64_t op_id, std::string& msg) {
Expand Down Expand Up @@ -917,7 +927,7 @@ bool NsClient::AddIndex(const std::string& db_name,
}

base::Status NsClient::AddMultiIndex(const std::string& db, const std::string& table_name,
const std::vector<::openmldb::common::ColumnKey>& column_keys) {
const std::vector<::openmldb::common::ColumnKey>& column_keys, bool skip_load_data) {
::openmldb::nameserver::AddIndexRequest request;
::openmldb::nameserver::GeneralResponse response;
if (column_keys.empty()) {
Expand All @@ -929,6 +939,7 @@ base::Status NsClient::AddMultiIndex(const std::string& db, const std::string& t
}
request.set_name(table_name);
request.set_db(db);
request.set_skip_load_data(skip_load_data);
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::AddIndex, &request, &response,
FLAGS_request_timeout_ms, 1);
if (ok && response.code() == 0) {
Expand Down Expand Up @@ -1065,5 +1076,31 @@ base::Status NsClient::ShowFunction(const std::string& name,
return {};
}

base::Status NsClient::DeploySQL(const ::openmldb::api::ProcedureInfo& sp_info,
const std::map<std::string, std::vector<::openmldb::common::ColumnKey>>& new_index_map,
uint64_t* op_id) {
if (new_index_map.empty()) {
return {base::ReturnCode::kError, "no index to add"};
}
nameserver::DeploySQLRequest request;
request.mutable_sp_info()->CopyFrom(sp_info);
for (const auto& kv : new_index_map) {
auto index = request.add_index();
index->set_name(kv.first);
index->set_db(sp_info.db_name());
for (const auto& column_key : kv.second) {
index->add_column_key()->CopyFrom(column_key);
}
}
nameserver::DeploySQLResponse response;
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::DeploySQL, &request, &response,
FLAGS_request_timeout_ms, 1);
if (!ok || response.code() != 0) {
return {base::ReturnCode::kError, response.msg()};
}
*op_id = response.op_id();
return {};
}

} // namespace client
} // namespace openmldb
15 changes: 10 additions & 5 deletions src/client/ns_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ class NsClient : public Client {
bool MakeSnapshot(const std::string& name, const std::string& db, uint32_t pid, uint64_t end_offset,
std::string& msg); // NOLINT

bool ShowOPStatus(::openmldb::nameserver::ShowOPStatusResponse& response, // NOLINT
const std::string& name, uint32_t pid, std::string& msg); // NOLINT
base::Status ShowOPStatus(const std::string& name, uint32_t pid,
::openmldb::nameserver::ShowOPStatusResponse* response);

base::Status ShowOPStatus(uint64_t op_id, ::openmldb::nameserver::ShowOPStatusResponse* response);

bool CancelOP(uint64_t op_id, std::string& msg); // NOLINT

Expand Down Expand Up @@ -208,8 +210,7 @@ class NsClient : public Client {
bool RemoveReplicaCluster(const std::string& alias,
std::string& msg); // NOLINT

bool SwitchMode(const ::openmldb::nameserver::ServerMode& mode,
std::string& msg); // NOLINT
bool SwitchMode(const ::openmldb::nameserver::ServerMode& mode, std::string& msg); // NOLINT

bool AddIndex(const std::string& table_name, const ::openmldb::common::ColumnKey& column_key,
std::vector<openmldb::common::ColumnDesc>* cols,
Expand All @@ -222,7 +223,7 @@ class NsClient : public Client {
std::string& msg); // NOLINT

base::Status AddMultiIndex(const std::string& db, const std::string& table_name,
const std::vector<::openmldb::common::ColumnKey>& column_keys);
const std::vector<::openmldb::common::ColumnKey>& column_keys, bool skip_load_data);

bool DeleteIndex(const std::string& table_name, const std::string& idx_name,
std::string& msg); // NOLINT
Expand All @@ -246,6 +247,10 @@ class NsClient : public Client {

base::Status UpdateOfflineTableInfo(const nameserver::TableInfo& table_info);

base::Status DeploySQL(const ::openmldb::api::ProcedureInfo& sp_info,
const std::map<std::string, std::vector<::openmldb::common::ColumnKey>>& new_index_map,
uint64_t* op_id);

private:
::openmldb::RpcClient<::openmldb::nameserver::NameServer_Stub> client_;
std::string db_;
Expand Down
73 changes: 20 additions & 53 deletions src/client/tablet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -958,10 +958,10 @@ bool TabletClient::DeleteIndex(uint32_t tid, uint32_t pid, const std::string& id

bool TabletClient::AddIndex(uint32_t tid, uint32_t pid, const ::openmldb::common::ColumnKey& column_key,
std::shared_ptr<TaskInfo> task_info) {
return AddMultiIndex(tid, pid, {column_key}, task_info).OK();
return AddMultiIndex(tid, pid, {column_key}, task_info);
}

base::Status TabletClient::AddMultiIndex(uint32_t tid, uint32_t pid,
bool TabletClient::AddMultiIndex(uint32_t tid, uint32_t pid,
const std::vector<::openmldb::common::ColumnKey>& column_keys,
std::shared_ptr<TaskInfo> task_info) {
::openmldb::api::AddIndexRequest request;
Expand All @@ -972,7 +972,7 @@ base::Status TabletClient::AddMultiIndex(uint32_t tid, uint32_t pid,
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kFailed);
}
return {base::ReturnCode::kError, "no column key"};
return false;
} else if (column_keys.size() == 1) {
request.mutable_column_key()->CopyFrom(column_keys[0]);
} else {
Expand All @@ -986,33 +986,11 @@ base::Status TabletClient::AddMultiIndex(uint32_t tid, uint32_t pid,
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kFailed);
}
return {base::ReturnCode::kError, response.msg()};
return false;
}
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kDone);
}
return {};
}

bool TabletClient::DumpIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const ::openmldb::common::ColumnKey& column_key, uint32_t idx,
std::shared_ptr<TaskInfo> task_info) {
::openmldb::api::DumpIndexDataRequest request;
::openmldb::api::GeneralResponse response;
request.set_tid(tid);
request.set_pid(pid);
request.set_partition_num(partition_num);
request.set_idx(idx);
::openmldb::common::ColumnKey* cur_column_key = request.mutable_column_key();
cur_column_key->CopyFrom(column_key);
if (task_info) {
request.mutable_task_info()->CopyFrom(*task_info);
}
bool ok = client_.SendRequest(&openmldb::api::TabletServer_Stub::DumpIndexData, &request, &response,
FLAGS_request_timeout_ms, 1);
if (!ok || response.code() != 0) {
return false;
}
return true;
}

Expand Down Expand Up @@ -1057,16 +1035,25 @@ bool TabletClient::LoadIndexData(uint32_t tid, uint32_t pid, uint32_t partition_
}

bool TabletClient::ExtractIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const ::openmldb::common::ColumnKey& column_key, uint32_t idx,
const std::vector<::openmldb::common::ColumnKey>& column_key,
uint64_t offset, bool dump_data,
std::shared_ptr<TaskInfo> task_info) {
if (column_key.empty()) {
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kFailed);
}
return false;
}
::openmldb::api::ExtractIndexDataRequest request;
::openmldb::api::GeneralResponse response;
request.set_tid(tid);
request.set_pid(pid);
request.set_partition_num(partition_num);
request.set_idx(idx);
::openmldb::common::ColumnKey* cur_column_key = request.mutable_column_key();
cur_column_key->CopyFrom(column_key);
request.set_offset(offset);
request.set_dump_data(dump_data);
for (const auto& cur_column_key : column_key) {
request.add_column_key()->CopyFrom(cur_column_key);
}
if (task_info) {
request.mutable_task_info()->CopyFrom(*task_info);
}
Expand All @@ -1078,25 +1065,6 @@ bool TabletClient::ExtractIndexData(uint32_t tid, uint32_t pid, uint32_t partiti
return true;
}

bool TabletClient::ExtractMultiIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const std::vector<::openmldb::common::ColumnKey>& column_key_vec) {
::openmldb::api::ExtractMultiIndexDataRequest request;
::openmldb::api::GeneralResponse response;
request.set_tid(tid);
request.set_pid(pid);
request.set_partition_num(partition_num);
for (const auto& column_key : column_key_vec) {
auto cur_column_key = request.add_column_key();
cur_column_key->CopyFrom(column_key);
}
bool ok = client_.SendRequest(&openmldb::api::TabletServer_Stub::ExtractMultiIndexData, &request, &response,
FLAGS_request_timeout_ms, 1);
if (!ok || response.code() != 0) {
return false;
}
return true;
}

bool TabletClient::CancelOP(const uint64_t op_id) {
::openmldb::api::CancelOPRequest request;
::openmldb::api::GeneralResponse response;
Expand Down Expand Up @@ -1140,15 +1108,14 @@ bool TabletClient::UpdateRealEndpointMap(const std::map<std::string, std::string
return true;
}

bool TabletClient::CreateProcedure(const openmldb::api::CreateProcedureRequest& sp_request, std::string& msg) {
base::Status TabletClient::CreateProcedure(const openmldb::api::CreateProcedureRequest& sp_request) {
openmldb::api::GeneralResponse response;
bool ok = client_.SendRequest(&::openmldb::api::TabletServer_Stub::CreateProcedure, &sp_request, &response,
sp_request.timeout_ms(), FLAGS_request_max_retry);
msg = response.msg();
if (!ok || response.code() != 0) {
return false;
return {base::ReturnCode::kError, response.msg()};
}
return true;
return {};
}

bool TabletClient::AsyncScan(const ::openmldb::api::ScanRequest& request,
Expand Down
15 changes: 4 additions & 11 deletions src/client/tablet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,10 @@ class TabletClient : public Client {
bool AddIndex(uint32_t tid, uint32_t pid, const ::openmldb::common::ColumnKey& column_key,
std::shared_ptr<TaskInfo> task_info);

base::Status AddMultiIndex(uint32_t tid, uint32_t pid,
bool AddMultiIndex(uint32_t tid, uint32_t pid,
const std::vector<::openmldb::common::ColumnKey>& column_keys,
std::shared_ptr<TaskInfo> task_info);

bool DumpIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const ::openmldb::common::ColumnKey& column_key, uint32_t idx,
std::shared_ptr<TaskInfo> task_info);

bool GetCatalog(uint64_t* version);

bool SendIndexData(uint32_t tid, uint32_t pid, const std::map<uint32_t, std::string>& pid_endpoint_map,
Expand All @@ -216,18 +212,15 @@ class TabletClient : public Client {
bool LoadIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num, std::shared_ptr<TaskInfo> task_info);

bool ExtractIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const ::openmldb::common::ColumnKey& column_key, uint32_t idx,
const std::vector<::openmldb::common::ColumnKey>& column_key,
uint64_t offset, bool dump_data,
std::shared_ptr<TaskInfo> task_info);

bool ExtractMultiIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const std::vector<::openmldb::common::ColumnKey>& column_key_vec);

bool CancelOP(const uint64_t op_id);

bool UpdateRealEndpointMap(const std::map<std::string, std::string>& map);

bool CreateProcedure(const openmldb::api::CreateProcedureRequest& sp_request,
std::string& msg); // NOLINT
base::Status CreateProcedure(const openmldb::api::CreateProcedureRequest& sp_request);

bool CallProcedure(const std::string& db, const std::string& sp_name, const std::string& row,
brpc::Controller* cntl, openmldb::api::QueryResponse* response, bool is_debug,
Expand Down
7 changes: 3 additions & 4 deletions src/cmd/openmldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2717,7 +2717,6 @@ void HandleNSShowOPStatus(const std::vector<std::string>& parts, ::openmldb::cli
::baidu::common::TPrinter tp(row.size(), FLAGS_max_col_display_length);
tp.AddRow(row);
::openmldb::nameserver::ShowOPStatusResponse response;
std::string msg;
std::string name;
uint32_t pid = ::openmldb::client::INVALID_PID;
if (parts.size() > 1) {
Expand All @@ -2731,9 +2730,9 @@ void HandleNSShowOPStatus(const std::vector<std::string>& parts, ::openmldb::cli
return;
}
}
bool ok = client->ShowOPStatus(response, name, pid, msg);
if (!ok) {
std::cout << "Fail to show tablets. error msg: " << msg << std::endl;
auto status = client->ShowOPStatus(name, pid, &response);
if (!status.OK()) {
std::cout << "Fail to show tablets. error msg: " << status.GetMsg() << std::endl;
return;
}
for (int idx = 0; idx < response.op_status_size(); idx++) {
Expand Down
Loading

0 comments on commit d5118ed

Please sign in to comment.