Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: deploy with data #3288

Merged
merged 28 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
804aa73
refact: refact create task function
dl239 Apr 21, 2023
ecee416
refact: update
dl239 Apr 21, 2023
157ee9b
refact: update
dl239 Apr 23, 2023
4a57b63
fix: fix bug
dl239 Apr 24, 2023
9c5c0d5
fix: fix bug
dl239 Apr 25, 2023
3ef2c11
fix: fix delreplica
dl239 Apr 25, 2023
5c25e45
refact: rm CreateAddReplicaRemoteTask
dl239 Apr 25, 2023
cf0f769
fix: fix loadtable
dl239 Apr 25, 2023
fa0b73f
Merge branch 'main' of github.com:dl239/OpenMLDB into feat/deloy
dl239 May 4, 2023
54d0dfd
Merge branch 'main' of github.com:dl239/OpenMLDB into feat/deploy-1
dl239 May 4, 2023
5d15465
Merge branch 'feat/deloy' of github.com:dl239/OpenMLDB into feat/depl…
dl239 May 4, 2023
5e4ff91
feat: add DumpAndExtractIndexData
dl239 May 5, 2023
61e21e8
refact: refact makesnapshot
dl239 May 5, 2023
da3d6d9
refact: refact makesnapshot
dl239 May 5, 2023
cfc325f
feat: refact addindex
dl239 May 15, 2023
df6e8bc
refact: fix addindex
dl239 May 17, 2023
21ab117
fix: fix standalone addindex
dl239 May 21, 2023
24b6b26
feat: add task.cc
dl239 May 21, 2023
e9d5bd1
Merge remote-tracking branch 'origin' into feat/deloy
dl239 May 21, 2023
c8b4e48
merge main
dl239 May 21, 2023
7bf1d98
fix: fix deploy bug
dl239 May 22, 2023
2b8e20d
fix: fix test case
dl239 May 23, 2023
0042514
feat: add max wait time
dl239 May 24, 2023
9953abf
refact: update proto
dl239 May 24, 2023
94cd1a9
fix: fix get value
dl239 May 26, 2023
0cf49fd
merge main
dl239 Jun 2, 2023
c45c2f0
refact: update createtask
dl239 Jun 2, 2023
1f6ca99
docs: add docs
dl239 Jun 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/en/reference/sql/deployment_manage/DEPLOY_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ DEPLOY demo OPTIONS (SKIP_INDEX_CHECK="TRUE")
SELECT * FROM t1 LAST JOIN t2 ORDER BY t2.col3 ON t1.col1 = t2.col1;
```

### Synchronization/Asynchronization Settings
When executing deploy, you can set the synchronous/asynchronous mode through the `SYNC` option. The default value of `SYNC` is `true`, that is, the synchronous mode. If the relevant tables involved in the deploy statement have data and needs to add one or more indexs, executing deploy will initiate a job to execute a series of tasks such as loading data. In this case a job id will be returned if the `SYNC` option is set to `false`. You can get the job execution status by `SHOW JOBS FROM NAMESERVER LIKE '{job_id}'`

**Example**
```sql
deploy demo options(SYNC="false") SELECT t1.col1, t2.col2, sum(col4) OVER w1 as w1_col4_sum FROM t1 LAST JOIN t2 ORDER BY t2.col3 ON t1.col2 = t2.col2
WINDOW w1 AS (PARTITION BY t1.col2 ORDER BY t1.col3 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);
```

## Relevant SQL

[USE DATABASE](../ddl/USE_DATABASE_STATEMENT.md)
Expand Down
10 changes: 10 additions & 0 deletions docs/zh/openmldb_sql/deployment_manage/DEPLOY_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ DeployOption
DeployOptionItem
::= 'LONG_WINDOWS' '=' LongWindowDefinitions
| 'SKIP_INDEX_CHECK' '=' string_literal
| 'SYNC' '=' string_literal
```

#### 长窗口优化
Expand Down Expand Up @@ -160,6 +161,15 @@ DEPLOY demo OPTIONS (SKIP_INDEX_CHECK="TRUE")
SELECT * FROM t1 LAST JOIN t2 ORDER BY t2.col3 ON t1.col1 = t2.col1;
```

### 设置同步/异步
执行deploy的时候可以通过`SYNC`选项来设置同步/异步模式, 默认为`true`即同步模式。如果deploy语句中涉及的相关表有数据,并且需要添加索引的情况下,执行deploy会发起数据加载等任务,如果`SYNC`选项设置为`false`就会返回一个任务id。可以通过`SHOW JOBS FROM NAMESERVER LIKE '{job_id}'`来查看任务执行状态。

**Example**
```sql
deploy demo options(SYNC="false") SELECT t1.col1, t2.col2, sum(col4) OVER w1 as w1_col4_sum FROM t1 LAST JOIN t2 ORDER BY t2.col3 ON t1.col2 = t2.col2
WINDOW w1 AS (PARTITION BY t1.col2 ORDER BY t1.col3 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);
```

## 相关SQL

[USE DATABASE](../ddl/USE_DATABASE_STATEMENT.md)
Expand Down
53 changes: 45 additions & 8 deletions src/client/ns_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,19 @@ bool NsClient::MakeSnapshot(const std::string& name, const std::string& db, uint
return false;
}

bool NsClient::ShowOPStatus(::openmldb::nameserver::ShowOPStatusResponse& response, const std::string& name,
uint32_t pid, std::string& msg) {
base::Status NsClient::ShowOPStatus(uint64_t op_id, nameserver::ShowOPStatusResponse* response) {
::openmldb::nameserver::ShowOPStatusRequest request;
request.set_op_id(op_id);
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::ShowOPStatus, &request, response,
FLAGS_request_timeout_ms, 1);
if (ok && response->code() == 0) {
return {};
}
return {base::ReturnCode::kError, response->msg()};
}

base::Status NsClient::ShowOPStatus(const std::string& name, uint32_t pid,
::openmldb::nameserver::ShowOPStatusResponse* response) {
::openmldb::nameserver::ShowOPStatusRequest request;
if (const std::string& db = GetDb(); !db.empty()) {
request.set_db(db);
Expand All @@ -199,13 +210,12 @@ bool NsClient::ShowOPStatus(::openmldb::nameserver::ShowOPStatusResponse& respon
if (pid != INVALID_PID) {
request.set_pid(pid);
}
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::ShowOPStatus, &request, &response,
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::ShowOPStatus, &request, response,
FLAGS_request_timeout_ms, 1);
msg = response.msg();
if (ok && response.code() == 0) {
return true;
if (ok && response->code() == 0) {
return {};
}
return false;
return {base::ReturnCode::kError, response->msg()};
}

bool NsClient::CancelOP(uint64_t op_id, std::string& msg) {
Expand Down Expand Up @@ -917,7 +927,7 @@ bool NsClient::AddIndex(const std::string& db_name,
}

base::Status NsClient::AddMultiIndex(const std::string& db, const std::string& table_name,
const std::vector<::openmldb::common::ColumnKey>& column_keys) {
const std::vector<::openmldb::common::ColumnKey>& column_keys, bool skip_load_data) {
::openmldb::nameserver::AddIndexRequest request;
::openmldb::nameserver::GeneralResponse response;
if (column_keys.empty()) {
Expand All @@ -929,6 +939,7 @@ base::Status NsClient::AddMultiIndex(const std::string& db, const std::string& t
}
request.set_name(table_name);
request.set_db(db);
request.set_skip_load_data(skip_load_data);
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::AddIndex, &request, &response,
FLAGS_request_timeout_ms, 1);
if (ok && response.code() == 0) {
Expand Down Expand Up @@ -1065,5 +1076,31 @@ base::Status NsClient::ShowFunction(const std::string& name,
return {};
}

base::Status NsClient::DeploySQL(const ::openmldb::api::ProcedureInfo& sp_info,
const std::map<std::string, std::vector<::openmldb::common::ColumnKey>>& new_index_map,
uint64_t* op_id) {
if (new_index_map.empty()) {
return {base::ReturnCode::kError, "no index to add"};
}
nameserver::DeploySQLRequest request;
request.mutable_sp_info()->CopyFrom(sp_info);
for (const auto& kv : new_index_map) {
auto index = request.add_index();
index->set_name(kv.first);
index->set_db(sp_info.db_name());
for (const auto& column_key : kv.second) {
index->add_column_key()->CopyFrom(column_key);
}
}
nameserver::DeploySQLResponse response;
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::DeploySQL, &request, &response,
FLAGS_request_timeout_ms, 1);
if (!ok || response.code() != 0) {
return {base::ReturnCode::kError, response.msg()};
}
*op_id = response.op_id();
return {};
}

} // namespace client
} // namespace openmldb
15 changes: 10 additions & 5 deletions src/client/ns_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ class NsClient : public Client {
bool MakeSnapshot(const std::string& name, const std::string& db, uint32_t pid, uint64_t end_offset,
std::string& msg); // NOLINT

bool ShowOPStatus(::openmldb::nameserver::ShowOPStatusResponse& response, // NOLINT
const std::string& name, uint32_t pid, std::string& msg); // NOLINT
base::Status ShowOPStatus(const std::string& name, uint32_t pid,
::openmldb::nameserver::ShowOPStatusResponse* response);

base::Status ShowOPStatus(uint64_t op_id, ::openmldb::nameserver::ShowOPStatusResponse* response);

bool CancelOP(uint64_t op_id, std::string& msg); // NOLINT

Expand Down Expand Up @@ -208,8 +210,7 @@ class NsClient : public Client {
bool RemoveReplicaCluster(const std::string& alias,
std::string& msg); // NOLINT

bool SwitchMode(const ::openmldb::nameserver::ServerMode& mode,
std::string& msg); // NOLINT
bool SwitchMode(const ::openmldb::nameserver::ServerMode& mode, std::string& msg); // NOLINT

bool AddIndex(const std::string& table_name, const ::openmldb::common::ColumnKey& column_key,
std::vector<openmldb::common::ColumnDesc>* cols,
Expand All @@ -222,7 +223,7 @@ class NsClient : public Client {
std::string& msg); // NOLINT

base::Status AddMultiIndex(const std::string& db, const std::string& table_name,
const std::vector<::openmldb::common::ColumnKey>& column_keys);
const std::vector<::openmldb::common::ColumnKey>& column_keys, bool skip_load_data);

bool DeleteIndex(const std::string& table_name, const std::string& idx_name,
std::string& msg); // NOLINT
Expand All @@ -246,6 +247,10 @@ class NsClient : public Client {

base::Status UpdateOfflineTableInfo(const nameserver::TableInfo& table_info);

base::Status DeploySQL(const ::openmldb::api::ProcedureInfo& sp_info,
const std::map<std::string, std::vector<::openmldb::common::ColumnKey>>& new_index_map,
uint64_t* op_id);

private:
::openmldb::RpcClient<::openmldb::nameserver::NameServer_Stub> client_;
std::string db_;
Expand Down
73 changes: 20 additions & 53 deletions src/client/tablet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -958,10 +958,10 @@ bool TabletClient::DeleteIndex(uint32_t tid, uint32_t pid, const std::string& id

bool TabletClient::AddIndex(uint32_t tid, uint32_t pid, const ::openmldb::common::ColumnKey& column_key,
std::shared_ptr<TaskInfo> task_info) {
return AddMultiIndex(tid, pid, {column_key}, task_info).OK();
return AddMultiIndex(tid, pid, {column_key}, task_info);
}

base::Status TabletClient::AddMultiIndex(uint32_t tid, uint32_t pid,
bool TabletClient::AddMultiIndex(uint32_t tid, uint32_t pid,
const std::vector<::openmldb::common::ColumnKey>& column_keys,
std::shared_ptr<TaskInfo> task_info) {
::openmldb::api::AddIndexRequest request;
Expand All @@ -972,7 +972,7 @@ base::Status TabletClient::AddMultiIndex(uint32_t tid, uint32_t pid,
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kFailed);
}
return {base::ReturnCode::kError, "no column key"};
return false;
} else if (column_keys.size() == 1) {
request.mutable_column_key()->CopyFrom(column_keys[0]);
} else {
Expand All @@ -986,33 +986,11 @@ base::Status TabletClient::AddMultiIndex(uint32_t tid, uint32_t pid,
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kFailed);
}
return {base::ReturnCode::kError, response.msg()};
return false;
}
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kDone);
}
return {};
}

bool TabletClient::DumpIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const ::openmldb::common::ColumnKey& column_key, uint32_t idx,
std::shared_ptr<TaskInfo> task_info) {
::openmldb::api::DumpIndexDataRequest request;
::openmldb::api::GeneralResponse response;
request.set_tid(tid);
request.set_pid(pid);
request.set_partition_num(partition_num);
request.set_idx(idx);
::openmldb::common::ColumnKey* cur_column_key = request.mutable_column_key();
cur_column_key->CopyFrom(column_key);
if (task_info) {
request.mutable_task_info()->CopyFrom(*task_info);
}
bool ok = client_.SendRequest(&openmldb::api::TabletServer_Stub::DumpIndexData, &request, &response,
FLAGS_request_timeout_ms, 1);
if (!ok || response.code() != 0) {
return false;
}
return true;
}

Expand Down Expand Up @@ -1057,16 +1035,25 @@ bool TabletClient::LoadIndexData(uint32_t tid, uint32_t pid, uint32_t partition_
}

bool TabletClient::ExtractIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const ::openmldb::common::ColumnKey& column_key, uint32_t idx,
const std::vector<::openmldb::common::ColumnKey>& column_key,
uint64_t offset, bool dump_data,
std::shared_ptr<TaskInfo> task_info) {
if (column_key.empty()) {
if (task_info) {
task_info->set_status(::openmldb::api::TaskStatus::kFailed);
}
return false;
}
::openmldb::api::ExtractIndexDataRequest request;
::openmldb::api::GeneralResponse response;
request.set_tid(tid);
request.set_pid(pid);
request.set_partition_num(partition_num);
request.set_idx(idx);
::openmldb::common::ColumnKey* cur_column_key = request.mutable_column_key();
cur_column_key->CopyFrom(column_key);
request.set_offset(offset);
request.set_dump_data(dump_data);
for (const auto& cur_column_key : column_key) {
request.add_column_key()->CopyFrom(cur_column_key);
}
if (task_info) {
request.mutable_task_info()->CopyFrom(*task_info);
}
Expand All @@ -1078,25 +1065,6 @@ bool TabletClient::ExtractIndexData(uint32_t tid, uint32_t pid, uint32_t partiti
return true;
}

bool TabletClient::ExtractMultiIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const std::vector<::openmldb::common::ColumnKey>& column_key_vec) {
::openmldb::api::ExtractMultiIndexDataRequest request;
::openmldb::api::GeneralResponse response;
request.set_tid(tid);
request.set_pid(pid);
request.set_partition_num(partition_num);
for (const auto& column_key : column_key_vec) {
auto cur_column_key = request.add_column_key();
cur_column_key->CopyFrom(column_key);
}
bool ok = client_.SendRequest(&openmldb::api::TabletServer_Stub::ExtractMultiIndexData, &request, &response,
FLAGS_request_timeout_ms, 1);
if (!ok || response.code() != 0) {
return false;
}
return true;
}

bool TabletClient::CancelOP(const uint64_t op_id) {
::openmldb::api::CancelOPRequest request;
::openmldb::api::GeneralResponse response;
Expand Down Expand Up @@ -1140,15 +1108,14 @@ bool TabletClient::UpdateRealEndpointMap(const std::map<std::string, std::string
return true;
}

bool TabletClient::CreateProcedure(const openmldb::api::CreateProcedureRequest& sp_request, std::string& msg) {
base::Status TabletClient::CreateProcedure(const openmldb::api::CreateProcedureRequest& sp_request) {
openmldb::api::GeneralResponse response;
bool ok = client_.SendRequest(&::openmldb::api::TabletServer_Stub::CreateProcedure, &sp_request, &response,
sp_request.timeout_ms(), FLAGS_request_max_retry);
msg = response.msg();
if (!ok || response.code() != 0) {
return false;
return {base::ReturnCode::kError, response.msg()};
}
return true;
return {};
}

bool TabletClient::AsyncScan(const ::openmldb::api::ScanRequest& request,
Expand Down
15 changes: 4 additions & 11 deletions src/client/tablet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,10 @@ class TabletClient : public Client {
bool AddIndex(uint32_t tid, uint32_t pid, const ::openmldb::common::ColumnKey& column_key,
std::shared_ptr<TaskInfo> task_info);

base::Status AddMultiIndex(uint32_t tid, uint32_t pid,
bool AddMultiIndex(uint32_t tid, uint32_t pid,
const std::vector<::openmldb::common::ColumnKey>& column_keys,
std::shared_ptr<TaskInfo> task_info);

bool DumpIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const ::openmldb::common::ColumnKey& column_key, uint32_t idx,
std::shared_ptr<TaskInfo> task_info);

bool GetCatalog(uint64_t* version);

bool SendIndexData(uint32_t tid, uint32_t pid, const std::map<uint32_t, std::string>& pid_endpoint_map,
Expand All @@ -216,18 +212,15 @@ class TabletClient : public Client {
bool LoadIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num, std::shared_ptr<TaskInfo> task_info);

bool ExtractIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const ::openmldb::common::ColumnKey& column_key, uint32_t idx,
const std::vector<::openmldb::common::ColumnKey>& column_key,
uint64_t offset, bool dump_data,
std::shared_ptr<TaskInfo> task_info);

bool ExtractMultiIndexData(uint32_t tid, uint32_t pid, uint32_t partition_num,
const std::vector<::openmldb::common::ColumnKey>& column_key_vec);

bool CancelOP(const uint64_t op_id);

bool UpdateRealEndpointMap(const std::map<std::string, std::string>& map);

bool CreateProcedure(const openmldb::api::CreateProcedureRequest& sp_request,
std::string& msg); // NOLINT
base::Status CreateProcedure(const openmldb::api::CreateProcedureRequest& sp_request);

bool CallProcedure(const std::string& db, const std::string& sp_name, const std::string& row,
brpc::Controller* cntl, openmldb::api::QueryResponse* response, bool is_debug,
Expand Down
7 changes: 3 additions & 4 deletions src/cmd/openmldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2717,7 +2717,6 @@ void HandleNSShowOPStatus(const std::vector<std::string>& parts, ::openmldb::cli
::baidu::common::TPrinter tp(row.size(), FLAGS_max_col_display_length);
tp.AddRow(row);
::openmldb::nameserver::ShowOPStatusResponse response;
std::string msg;
std::string name;
uint32_t pid = ::openmldb::client::INVALID_PID;
if (parts.size() > 1) {
Expand All @@ -2731,9 +2730,9 @@ void HandleNSShowOPStatus(const std::vector<std::string>& parts, ::openmldb::cli
return;
}
}
bool ok = client->ShowOPStatus(response, name, pid, msg);
if (!ok) {
std::cout << "Fail to show tablets. error msg: " << msg << std::endl;
auto status = client->ShowOPStatus(name, pid, &response);
if (!status.OK()) {
std::cout << "Fail to show tablets. error msg: " << status.GetMsg() << std::endl;
return;
}
for (int idx = 0; idx < response.op_status_size(); idx++) {
Expand Down
Loading