Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit e5ead33dd247a6b8e301209b86f19a2089103551
Author: plat1ko <[email protected]>
Date:   Mon Jun 20 15:37:21 2022 +0800

    temporarily avoid src/io conflicts with doris/master

commit 2ffee9d8906af61c40203a71fd0c30db14fbb47a
Author: plat1ko <[email protected]>
Date:   Fri Jun 17 20:59:41 2022 +0800

    (cold_on_s3) Report remote data size in tablets (apache#195)

    * be report tablet remote size

    * update TTabletStat

    * fe report tablet remote data size

commit ee0d856685c1f089f585d032f26203d4397dc3e7
Author: deardeng <[email protected]>
Date:   Fri Jun 17 16:06:23 2022 +0800

    (cold_on_s3) support modify partition add storage policy

commit bbf81a905ff3bda38f80f629755ad983c6f6d556
Author: deardeng <[email protected]>
Date:   Fri Jun 17 10:24:14 2022 +0800

    (cold_on_s3) support alter table add storage policy

commit 1c662b90d59ef8a870ebb7cc89801433fdab1f8a
Author: plat1ko <[email protected]>
Date:   Thu Jun 16 16:01:02 2022 +0800

    (cold_on_s3) directly upload rowset exceeded ttl (apache#186)

    * directly upload rowset exceed ttl

    * clang-format

commit f17709819a2a51b2783cceee2b32451740f95256
Author: plat1ko <[email protected]>
Date:   Thu Jun 16 11:39:08 2022 +0800

    (cold_on_s3) Implement and rewrite some filesystem methods (apache#183)

    * remove temporary test code

    * no need to  send fs to LocalFileWriter ctor

    * rewrite some fs methods

    * fix opened fd leak

commit 1f7a09d90e38a16574f841701fa04b8164c5155a
Author: Luwei <[email protected]>
Date:   Tue Jun 14 18:07:52 2022 +0800

    (cold_on_s3) fix log printing (apache#176)

commit 41b2ca65c6340e9432e58d53816b380bdfa9e9b7
Author: Luwei <[email protected]>
Date:   Tue Jun 14 17:02:48 2022 +0800

    (cold_on_s3) fix rowset write timestamp bug (apache#175)

commit f987fdffdc9d3245e44f226ea19fa489a3951652
Author: deardeng <[email protected]>
Date:   Tue Jun 14 16:14:56 2022 +0800

    (cold_on_s3) fix be core in file cache and fix be report remote capacity error (apache#174)

commit 4190cd3784a7c308afdc0f29878a40bc476999f5
Author: deardeng <[email protected]>
Date:   Tue Jun 14 11:17:23 2022 +0800

    (cold_on_s3) fix compile error and thread pool core

commit f76186517b65b8bf70dc2fa456f2ec32d791e4e7
Author: plat1ko <[email protected]>
Date:   Mon Jun 13 19:15:37 2022 +0800

    (cold_on_s3) Revert FilePathDesc (apache#136)

    * change cooldown pick rowset strategy; add/modify some log

    * revert FilePathDesc

    * return unsupport error to avoid undefined behavior

    * patch to reduce path copy

    * fix be ut

commit 9ddac6074840417b7a73a4eb8c3f4da00f97a4d1
Author: Luwei <[email protected]>
Date:   Mon Jun 13 16:07:36 2022 +0800

    (cold_on_s3) Add ut for s3 read (apache#147)

commit d325739ab1ba4e13539ec0b6401ebc13a2dad851
Author: Luwei <[email protected]>
Date:   Mon Jun 13 16:07:21 2022 +0800

    (cold_on_s3) Add metrics for file system operation (apache#146)

    * Add metrics for file system operation

    * fix status code

commit b42bb65cc24870f78f3f4a5c7682d0983eb7ac8b
Author: deardeng <[email protected]>
Date:   Mon Jun 13 12:53:10 2022 +0800

    (cold_on_s3) support show proc backends return remote used capacity

commit fdffe69455fe867d3d95a85311684664abaaf5bb
Author: deardeng <[email protected]>
Date:   Fri Jun 10 16:08:09 2022 +0800

    (cold_on_s3) support storage policy use default policy

commit 1921e75bce6b85bac61f278b3077a47c916aab00
Author: deardeng <[email protected]>
Date:   Thu Jun 9 10:40:46 2022 +0800

    (cold_on_s3) create table support different partition use their own storage policy

commit ebb0b585a1dbc8fae146cc742f8000b1b4033507
Author: plat1ko <[email protected]>
Date:   Fri Jun 10 22:28:40 2022 +0800

    (cold_on_s3) Implement some fs methods (apache#163)

    * implement some fs methods

    * add more log info

commit 89e820d3f1a9f6b4680bf445ff60a825c315bcab
Author: Lei Zhang <[email protected]>
Date:   Thu Jun 9 23:12:58 2022 +0800

    [fix](be) fix asan be set_storage_medium core (apache#9986) (apache#9987)

    apache#9986

commit ffe2504e24c9afb06b7e843b455f341f1e8864d7
Author: plat1ko <[email protected]>
Date:   Thu Jun 9 16:17:06 2022 +0800

    [hot-fix](cold_on_s3) do not skip 0 segment rowset (apache#143)

commit f8cd4a12a838d2ee25d0ec72acc3d346f8c24db9
Author: Luwei <[email protected]>
Date:   Wed Jun 8 16:09:35 2022 +0800

     (cold_on_s3) Optimize cooldown policy (apache#113)

    * Optimize cooldown policy

    * fix ut

    * remove useless log

    * add ut

    * add ut

    * rename var

    * fix typo and ut

commit 08aae82daae19d9d7dc4bc0dd294def6d1ea6157
Author: plat1ko <[email protected]>
Date:   Wed Jun 8 16:02:41 2022 +0800

    provide quick_cooldown switch for regression test (apache#132)

commit 704af00135ff3aacfe746ff617efebfe19e7c53a
Author: deardeng <[email protected]>
Date:   Tue Jun 7 18:05:01 2022 +0800

    (cold_on_s3) fix multiple fe synchronization resource storage policy exit

commit 58f09c47d4128d6cfe4959a61a0d0e3bde0cbc82
Author: platoneko <[email protected]>
Date:   Thu Jun 2 11:38:46 2022 +0800

    (cold_on_s3) fix move bug

commit cb4d4d766828103355c47dff8cc37c018c767eb5
Author: deardeng <[email protected]>
Date:   Thu Jun 2 11:24:16 2022 +0800

    (cold_on_s3) fix clang report '_exec_env' is not used

commit 6b8f6f2b62e01abfdd30b5c42b9c72d554ec5f4e
Author: platoneko <[email protected]>
Date:   Thu Jun 2 00:05:42 2022 +0800

    (cold_on_s3) reformat be code to make ci happy

commit 4168d9fb1c2ddad57cff7afb9da20272f1a0d389
Author: platoneko <[email protected]>
Date:   Wed Jun 1 23:44:56 2022 +0800

    (cold_on_s3) fix be ut

commit 110fc7421ceba32a8cf46b237e65309d94a60752
Author: platoneko <[email protected]>
Date:   Wed Jun 1 22:36:37 2022 +0800

    (cold_on_s3) fix fe compile error

commit a74f84a7f5980c5c7cc5d5c80b0bfd379cfcf11d
Author: plat1ko <[email protected]>
Date:   Thu May 26 11:15:31 2022 +0800

    [Enhancement](cold_on_s3) refresh fs map (apache#67)

    * refresh fs map

    * fetch cooldown ttl from storage policy

commit 71a4f364133e1686d0e86a04ab785a64c47ae701
Author: plat1ko <[email protected]>
Date:   Tue May 24 21:28:06 2022 +0800

    (cold_on_s3) Do not compact remote rowsets; modify pick cooldown rowset strategy (apache#53)

    * time from the beginning of upload

    * compact to appropriate state before upload

    * only do compaction on local rowsets

commit b434d2c9a9c4f4d9d81b4fd3bbabb6cd913e7262
Author: plat1ko <[email protected]>
Date:   Tue May 24 19:22:25 2022 +0800

    (cold_on_s3) use noncopy iostream for s3 get object (apache#54)

commit 606b98217eb261b9001da77bf766b52c16efb426
Author: plat1ko <[email protected]>
Date:   Tue May 24 18:42:09 2022 +0800

    (cold_on_s3) copy remote rowset meta to new tablet meta (apache#52)

commit e218f051e685e27a1392befb40ee7fef832a1515
Author: deardeng <[email protected]>
Date:   Tue May 24 12:58:43 2022 +0800

    (cold_on_s3) support storage policy change fix thrift merge redefine

commit 152fa1856fb3034afcf0b3dc4272b0bafac8f630
Author: deardeng <[email protected]>
Date:   Tue May 24 11:45:13 2022 +0800

    (cold_on_s3) support storage policy change and create table add storage policy name (apache#39)

    * [feature](cold-hot)support storage policy change and create table add storage policy name

    * support storage policy change and create table add storage policy name, check resource policy exist when create table.

    * when create table use storage policy check it exist

    * support storage policy change and create table add storage policy name and  when create table use storage policy check it exist

    * support storage policy change fix some bug.

    * fix some code style

commit 77101dd7f563e1d0ddd03fbe6b35b4196811a87e
Author: deardeng <[email protected]>
Date:   Sat May 21 07:37:43 2022 -0700

    (cold_on_s3) support storage policy (apache#28)

    * [feature](cold-hot)support storage policy

    * fix review

    * fix review v1

commit 9923d59e97c25e6d6fb54bf09ed6e0a00a5704c3
Author: plat1ko <[email protected]>
Date:   Fri May 20 16:22:36 2022 +0800

    (cold_on_s3) Batch upload to improve upload speed (apache#38)

    * test upload duration

    * use s3 transfer

    * batch upload

    * some fix in s3 filesystem

commit 29656e73f33e94bb85a954eb31dca9e6eea33cf4
Author: Luwei <[email protected]>
Date:   Wed May 18 15:08:40 2022 +0800

    (cold_on_s3) Implement periodic uploading of rowsets to remote storage (apache#18)

    * Implement periodic uploading of rowsets to remote storage

    * add queue size check

    * add test config for cooldwon

    * modify read path

commit c35ed8bc87d212d442f1b57be0340230de4fb619
Author: plat1ko <[email protected]>
Date:   Tue May 17 17:22:19 2022 +0800

    (cold_on_s3) fe distinguish drop partition (apache#26)

commit e216b0320d71d04a4fde829f13fd8c8bfa94854b
Author: deardeng <[email protected]>
Date:   Mon May 16 21:02:51 2022 -0700

    (cold_on_s3) support clone task don't download file in s3 (apache#15)

    * apache#12 support clone task don't download file in s3

    * apache#12 add clone ut

    * fix code format

    * fix commit apache#12

commit be7521900726c9cd401e3a6503177e0365fc8a9a
Author: plat1ko <[email protected]>
Date:   Tue May 17 12:01:32 2022 +0800

    [Enhancement](cold_on_s3) FE distinguish drop table or drop replica;  disk capacity distinguish local or remote (apache#24)

    * disk capacity differentiates between local and remote

    * fe drop table

commit 414a1cb0c5d9777b6eb0b2ad5d7557dc1bf4a243
Author: plat1ko <[email protected]>
Date:   Mon May 16 11:55:42 2022 +0800

    [Enhancement](cold_on_s3) Physically remove remote rowsets when drop table/partition (apache#14)

    * update is_local logical

    * remove remote rowsets when drop table/partition

commit ed7cf258051716ff99c37812a7fa63ac1fc69f3f
Author: Luwei <[email protected]>
Date:   Mon May 16 11:17:30 2022 +0800

    (cold_on_s3) Implement reading rowset data from remote storage (apache#11)

    * adjust io stack

    * support s3

    * refine code

    * fix ut

    * remove useless code

    * rename rblock

    * fix err msg

commit da6ca91f22d30e9d16e4dd70dbcad30714d321c9
Author: plat1ko <[email protected]>
Date:   Wed May 11 16:29:54 2022 +0800

    (cold_on_s3) Implement tablet cooldown (apache#10)

    * tablet cooldown

    * add global_local_filesystem to keep compatible

    * test tablet cooldown

    * revert engine_storage_migration_task_test.cpp

commit 83f4a3bfcd70cd747941338cd89148fb69d272ae
Author: platoneko <[email protected]>
Date:   Sun May 1 12:06:41 2022 +0800

    (cold_on_s3) Rowset write path using fs (apache#7)

    * replace WritableBlock with FileWriter

    * local file system

    * set fs when load dir

    * fix be ut

commit 3585fe6
Author: Xinyi Zou <[email protected]>
Date:   Tue Jun 7 11:31:49 2022 +0800

    [fix][mem tracker] Fix logout load task mem tracker dcheck fail (apache#9943)

    * fix tracker 0602

    * fix format

commit 429a41d
Author: Xinyi Zou <[email protected]>
Date:   Tue May 31 19:12:42 2022 +0800

    [bugfix] Fix BE core about vectorized join build thread memtracker switch, and FileStat duplicate

commit 89d3a36
Author: Pxl <[email protected]>
Date:   Thu May 26 16:51:01 2022 +0800

    [Bug] [Vectorized] add padding when load char type data (apache#9734)

Co-authored-by: Luwei <[email protected]>
Co-authored-by: deardeng <[email protected]>
  • Loading branch information
3 people committed Jun 20, 2022
1 parent a52f40e commit 4491627
Show file tree
Hide file tree
Showing 204 changed files with 4,596 additions and 1,550 deletions.
2 changes: 2 additions & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ set(KRB5_LIBS
set(AWS_LIBS
aws-sdk-s3
aws-sdk-core
aws-sdk-transfer
aws-checksums
aws-c-io
aws-c-event-stream
Expand Down Expand Up @@ -727,6 +728,7 @@ add_subdirectory(${SRC_DIR}/geo)
add_subdirectory(${SRC_DIR}/io)
add_subdirectory(${SRC_DIR}/gutil)
add_subdirectory(${SRC_DIR}/http)
add_subdirectory(${SRC_DIR}/io)
add_subdirectory(${SRC_DIR}/olap)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/service)
Expand Down
9 changes: 5 additions & 4 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
: _exec_env(exec_env), _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
for (auto& path : exec_env->store_paths()) {
try {
string dpp_download_path_str = path.path + DPP_PREFIX;
string dpp_download_path_str = path.path + "/" + DPP_PREFIX;
std::filesystem::path dpp_download_path(dpp_download_path_str);
if (std::filesystem::exists(dpp_download_path)) {
std::filesystem::remove_all(dpp_download_path);
Expand Down Expand Up @@ -90,7 +90,8 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
CREATE_AND_START_THREAD(REPORT_DISK_STATE, _report_disk_state_workers);
CREATE_AND_START_THREAD(REPORT_OLAP_TABLE, _report_tablet_workers);
CREATE_AND_START_POOL(SUBMIT_TABLE_COMPACTION, _submit_table_compaction_workers);
CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE_V2, _storage_medium_migrate_v2_workers);
CREATE_AND_START_POOL(REFRESH_STORAGE_POLICY, _storage_refresh_policy_workers);
CREATE_AND_START_POOL(UPDATE_STORAGE_POLICY, _storage_update_policy_workers);
#undef CREATE_AND_START_POOL
#undef CREATE_AND_START_THREAD

Expand Down Expand Up @@ -153,8 +154,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers,
update_tablet_meta_info_req);
HANDLE_TYPE(TTaskType::COMPACTION, _submit_table_compaction_workers, compaction_req);
HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE_V2, _storage_medium_migrate_v2_workers,
storage_migration_req_v2);
HANDLE_TYPE(TTaskType::NOTIFY_UPDATE_STORAGE_POLICY, _storage_update_policy_workers,
update_policy);

case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down
4 changes: 2 additions & 2 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class AgentServer {

std::unique_ptr<TaskWorkerPool> _submit_table_compaction_workers;

std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_v2_workers;

std::unique_ptr<TaskWorkerPool> _storage_refresh_policy_workers;
std::unique_ptr<TaskWorkerPool> _storage_update_policy_workers;
std::unique_ptr<TopicSubscriber> _topic_subscriber;
};

Expand Down
224 changes: 107 additions & 117 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
#include "olap/olap_common.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy_mgr.h"
#include "olap/tablet.h"
#include "olap/task/engine_alter_tablet_task.h"
#include "olap/task/engine_batch_load_task.h"
#include "olap/task/engine_checksum_task.h"
#include "olap/task/engine_clone_task.h"
#include "olap/task/engine_publish_version_task.h"
#include "olap/task/engine_storage_migration_task.h"
#include "olap/task/engine_storage_migration_task_v2.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
#include "runtime/snapshot_loader.h"
Expand Down Expand Up @@ -193,9 +193,14 @@ void TaskWorkerPool::start() {
cb = std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
this);
break;
case TaskWorkerType::STORAGE_MEDIUM_MIGRATE_V2:
case TaskWorkerType::REFRESH_STORAGE_POLICY:
_worker_count = 1;
cb = std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback,
cb = std::bind<void>(
&TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback, this);
break;
case TaskWorkerType::UPDATE_STORAGE_POLICY:
_worker_count = 1;
cb = std::bind<void>(&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback,
this);
break;
default:
Expand Down Expand Up @@ -361,6 +366,7 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() {
TStatus task_status;

std::vector<TTabletInfo> finish_tablet_infos;
LOG(INFO) << "create tablet: " << create_tablet_req;
Status create_status = _env->storage_engine()->create_tablet(create_tablet_req);
if (!create_status.ok()) {
LOG(WARNING) << "create table failed. status: " << create_status
Expand Down Expand Up @@ -437,6 +443,11 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
drop_tablet_req.schema_hash, dropped_tablet->tablet_uid());
// We remove remote rowset directly.
// TODO(cyx): do remove in background
if (drop_tablet_req.is_drop_table_or_partition) {
dropped_tablet->remove_all_remote_rowsets();
}
} else {
status_code = TStatusCode::NOT_FOUND;
error_msgs.push_back(err);
Expand Down Expand Up @@ -854,8 +865,15 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
tablet->set_partition_id(tablet_meta_info.partition_id);
break;
case TTabletMetaType::INMEMORY:
tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
tablet_meta_info.is_in_memory);
if (tablet_meta_info.storage_policy.empty()) {
tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
tablet_meta_info.is_in_memory);
} else {
LOG(INFO) << "set tablet cooldown resource "
<< tablet_meta_info.storage_policy;
tablet->tablet_meta()->set_cooldown_resource(
tablet_meta_info.storage_policy);
}
break;
}
}
Expand Down Expand Up @@ -1036,8 +1054,8 @@ Status TaskWorkerPool::_check_migrate_request(const TStorageMediumMigrateReq& re
return Status::OLAPInternalError(OLAP_REQUEST_FAILED);
}

// check disk capacity
int64_t tablet_size = tablet->tablet_footprint();
// check local disk capacity
int64_t tablet_size = tablet->tablet_local_size();
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
<< ", tablet size: " << tablet_size;
Expand Down Expand Up @@ -1171,14 +1189,15 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
map<string, TDisk> disks;
for (auto& root_path_info : data_dir_infos) {
TDisk disk;
disk.__set_root_path(root_path_info.path_desc.filepath);
disk.__set_root_path(root_path_info.path);
disk.__set_path_hash(root_path_info.path_hash);
disk.__set_storage_medium(root_path_info.storage_medium);
disk.__set_disk_total_capacity(root_path_info.disk_capacity);
disk.__set_data_used_capacity(root_path_info.data_used_capacity);
disk.__set_data_used_capacity(root_path_info.local_used_capacity);
disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
disk.__set_disk_available_capacity(root_path_info.available);
disk.__set_used(root_path_info.is_used);
disks[root_path_info.path_desc.filepath] = disk;
disks[root_path_info.path] = disk;
}
request.__set_disks(disks);
_handle_report(request, ReportType::DISK);
Expand Down Expand Up @@ -1565,7 +1584,6 @@ Status TaskWorkerPool::_move_dir(const TTabletId tablet_id, const std::string& s
return Status::InvalidArgument("Could not find tablet");
}

std::string dest_tablet_dir = tablet->tablet_path_desc().filepath;
SnapshotLoader loader(_env, job_id, tablet_id);
Status status = loader.move(src, tablet, overwrite);

Expand Down Expand Up @@ -1683,9 +1701,64 @@ void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
}
}

void TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback() {
void TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback() {
while (_is_work) {
_is_doing_work = false;
// wait at most report_task_interval_seconds, or being notified
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait_for(
worker_thread_lock,
std::chrono::seconds(config::storage_refresh_storage_policy_task_interval_seconds));
if (!_is_work) {
break;
}

if (_master_info.network_address.port == 0) {
// port == 0 means not received heartbeat yet
// sleep a short time and try again
LOG(INFO)
<< "waiting to receive first heartbeat from frontend before doing task report";
continue;
}

_is_doing_work = true;

TGetStoragePolicyResult result;
Status status = _master_client->refresh_storage_policy(&result);
if (!status.ok()) {
LOG(WARNING) << "refresh storage policy status not ok. ";
} else if (result.status.status_code != TStatusCode::OK) {
LOG(WARNING) << "refresh storage policy result status status_code not ok. ";
} else {
// update storage policy mgr.
StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr();
for (const auto& iter : result.result_entrys) {
shared_ptr<StoragePolicy> policy_ptr = make_shared<StoragePolicy>();
policy_ptr->storage_policy_name = iter.policy_name;
policy_ptr->cooldown_datetime = iter.cooldown_datetime;
policy_ptr->cooldown_ttl = iter.cooldown_ttl;
policy_ptr->s3_endpoint = iter.s3_storage_param.s3_endpoint;
policy_ptr->s3_region = iter.s3_storage_param.s3_region;
policy_ptr->s3_ak = iter.s3_storage_param.s3_ak;
policy_ptr->s3_sk = iter.s3_storage_param.s3_sk;
policy_ptr->root_path = iter.s3_storage_param.root_path;
policy_ptr->bucket = iter.s3_storage_param.bucket;
policy_ptr->s3_conn_timeout_ms = iter.s3_storage_param.s3_conn_timeout_ms;
policy_ptr->s3_max_conn = iter.s3_storage_param.s3_max_conn;
policy_ptr->s3_request_timeout_ms = iter.s3_storage_param.s3_request_timeout_ms;
policy_ptr->md5_sum = iter.md5_checksum;

LOG(INFO) << "refresh storage policy task, policy " << *policy_ptr;
spm->periodic_put(iter.policy_name, std::move(policy_ptr));
}
}
}
}

void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TGetStoragePolicy get_storage_policy_req;
{
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
Expand All @@ -1696,115 +1769,32 @@ void TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback() {
}

agent_task_req = _tasks.front();
get_storage_policy_req = agent_task_req.update_policy;
_tasks.pop_front();
}
int64_t signature = agent_task_req.signature;
LOG(INFO) << "get migration table v2 task, signature: " << agent_task_req.signature;
bool is_task_timeout = false;
if (agent_task_req.__isset.recv_time) {
int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
if (time_elapsed > config::report_task_interval_seconds * 20) {
LOG(INFO) << "task elapsed " << time_elapsed
<< " seconds since it is inserted to queue, it is timeout";
is_task_timeout = true;
}
}
if (!is_task_timeout) {
TFinishTaskRequest finish_task_request;
TTaskType::type task_type = agent_task_req.task_type;
switch (task_type) {
case TTaskType::STORAGE_MEDIUM_MIGRATE_V2:
_storage_medium_migrate_v2(agent_task_req, signature, task_type,
&finish_task_request);
break;
default:
// pass
break;
}
_finish_task(finish_task_request);
}
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}

void TaskWorkerPool::_storage_medium_migrate_v2(const TAgentTaskRequest& agent_task_req,
int64_t signature, const TTaskType::type task_type,
TFinishTaskRequest* finish_task_request) {
Status status = Status::OK();
TStatus task_status;
std::vector<string> error_msgs;

string process_name;
switch (task_type) {
case TTaskType::STORAGE_MEDIUM_MIGRATE_V2:
process_name = "StorageMediumMigrationV2";
break;
default:
std::string task_name;
EnumToString(TTaskType, task_type, task_name);
LOG(WARNING) << "Storage medium migration v2 type invalid. type: " << task_name
<< ", signature: " << signature;
status = Status::NotSupported("Storage medium migration v2 type invalid");
break;
}

// Check last storage medium migration v2 status, if failed delete tablet file
// Do not need to adjust delete success or not
// Because if delete failed task will failed
TTabletId new_tablet_id;
TSchemaHash new_schema_hash = 0;
if (status.ok()) {
new_tablet_id = agent_task_req.storage_migration_req_v2.new_tablet_id;
new_schema_hash = agent_task_req.storage_migration_req_v2.new_schema_hash;
EngineStorageMigrationTaskV2 engine_task(agent_task_req.storage_migration_req_v2);
Status sc_status = _env->storage_engine()->execute_task(&engine_task);
if (!sc_status.ok()) {
if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) {
error_msgs.push_back("The data quality does not satisfy, please check your data. ");
}
status = sc_status;
} else {
status = Status::OK();
}
}

if (status.ok()) {
++_s_report_version;
LOG(INFO) << process_name << " finished. signature: " << signature;
}

// Return result to fe
finish_task_request->__set_backend(_backend);
finish_task_request->__set_report_version(_s_report_version);
finish_task_request->__set_task_type(task_type);
finish_task_request->__set_signature(signature);

std::vector<TTabletInfo> finish_tablet_infos;
if (status.ok()) {
TTabletInfo tablet_info;
status = _get_tablet_info(new_tablet_id, new_schema_hash, signature, &tablet_info);

if (!status.ok()) {
LOG(WARNING) << process_name << " success, but get new tablet info failed."
<< "tablet_id: " << new_tablet_id << ", schema_hash: " << new_schema_hash
<< ", signature: " << signature;
} else {
finish_tablet_infos.push_back(tablet_info);
}
}

if (status.ok()) {
finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
LOG(INFO) << process_name << " success. signature: " << signature;
error_msgs.push_back(process_name + " success");
} else {
LOG(WARNING) << process_name << " failed. signature: " << signature;
error_msgs.push_back(process_name + " failed");
error_msgs.push_back("status: " + status.to_string());
StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr();
shared_ptr<StoragePolicy> policy_ptr = make_shared<StoragePolicy>();
policy_ptr->storage_policy_name = get_storage_policy_req.policy_name;
policy_ptr->cooldown_datetime = get_storage_policy_req.cooldown_datetime;
policy_ptr->cooldown_ttl = get_storage_policy_req.cooldown_ttl;
policy_ptr->s3_endpoint = get_storage_policy_req.s3_storage_param.s3_endpoint;
policy_ptr->s3_region = get_storage_policy_req.s3_storage_param.s3_region;
policy_ptr->s3_ak = get_storage_policy_req.s3_storage_param.s3_ak;
policy_ptr->s3_sk = get_storage_policy_req.s3_storage_param.s3_sk;
policy_ptr->root_path = get_storage_policy_req.s3_storage_param.root_path;
policy_ptr->bucket = get_storage_policy_req.s3_storage_param.bucket;
policy_ptr->s3_conn_timeout_ms = get_storage_policy_req.s3_storage_param.s3_conn_timeout_ms;
policy_ptr->s3_max_conn = get_storage_policy_req.s3_storage_param.s3_max_conn;
policy_ptr->s3_request_timeout_ms =
get_storage_policy_req.s3_storage_param.s3_request_timeout_ms;
policy_ptr->md5_sum = get_storage_policy_req.md5_checksum;

LOG(INFO) << "get storage update policy task, update policy " << *policy_ptr;

spm->update(get_storage_policy_req.policy_name, std::move(policy_ptr));
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
task_status.__set_status_code(status.code());
task_status.__set_error_msgs(error_msgs);
finish_task_request->__set_task_status(task_status);
}

} // namespace doris
Loading

0 comments on commit 4491627

Please sign in to comment.