Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk_load): meta server adds bulk load ingestion concurrent count restriction #829

Merged
merged 10 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 78 additions & 28 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ DSN_DEFINE_uint32("meta_server",
"failed");
DSN_TAG_VARIABLE(bulk_load_max_rollback_times, FT_MUTABLE);

DSN_DEFINE_uint32("meta_server",
bulk_load_ingestion_concurrent_count,
4,
"max partition_count executing ingestion at the same time");
DSN_TAG_VARIABLE(bulk_load_ingestion_concurrent_count, FT_MUTABLE);

bulk_load_service::bulk_load_service(meta_service *meta_svc, const std::string &bulk_load_dir)
: _meta_svc(meta_svc), _state(meta_svc->get_server_state()), _bulk_load_root(bulk_load_dir)
{
Expand Down Expand Up @@ -210,6 +216,7 @@ void bulk_load_service::do_start_app_bulk_load(std::shared_ptr<app_state> app,
zauto_write_lock l(_lock);
_bulk_load_app_id.insert(app->app_id);
_apps_in_progress_count[app->app_id] = app->partition_count;
_apps_ingesting_count[app->app_id] = 0;
}
create_app_bulk_load_dir(
app->app_name, app->app_id, app->partition_count, std::move(rpc));
Expand Down Expand Up @@ -578,6 +585,7 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,
app_name,
pid,
kv.first.to_string());
decrease_app_ingestion_count(pid);
handle_bulk_load_failed(pid.get_app_id());
return;
}
Expand All @@ -597,6 +605,7 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,

if (response.is_group_ingestion_finished) {
ddebug_f("app({}) partition({}) ingestion files succeed", app_name, pid);
decrease_app_ingestion_count(pid);
update_partition_status_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED);
}
}
Expand Down Expand Up @@ -995,6 +1004,11 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk
_app_bulk_load_info[app_id] = ainfo;
_apps_pending_sync_flag[app_id] = false;
_apps_in_progress_count[app_id] = partition_count;
// when rollback from ingesting, ingesting_count should be reset
if (old_status == bulk_load_status::BLS_INGESTING &&
new_status == bulk_load_status::BLS_DOWNLOADING) {
_apps_ingesting_count[app_id] = 0;
}
}

ddebug_f("update app({}) status from {} to {}",
Expand All @@ -1004,12 +1018,7 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk

if (new_status == bulk_load_status::BLS_INGESTING) {
for (int i = 0; i < partition_count; ++i) {
tasking::enqueue(LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion,
this,
ainfo.app_name,
gpid(app_id, i)));
partition_ingestion(ainfo.app_name, gpid(app_id, i));
}
}

Expand All @@ -1024,10 +1033,41 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk
}
}

// ThreadPool: THREAD_POOL_DEFAULT
// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::partition_ingestion(const std::string &app_name, const gpid &pid)
{
FAIL_POINT_INJECT_F("meta_bulk_load_partition_ingestion", [](dsn::string_view) {});
FAIL_POINT_INJECT_F("meta_bulk_load_partition_ingestion", [=](dsn::string_view) {
if (_apps_ingesting_count[pid.get_app_id()] < FLAGS_bulk_load_ingestion_concurrent_count) {
_apps_ingesting_count[pid.get_app_id()]++;
}
});

{
zauto_read_lock l(_lock);
if (_apps_ingesting_count[pid.get_app_id()] >= FLAGS_bulk_load_ingestion_concurrent_count) {
dwarn_f("app({}) has already {} partitions executing ingestion, partition({}) will "
"wait and try it later",
app_name,
_apps_ingesting_count[pid.get_app_id()],
pid);
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid),
pid.thread_hash(),
std::chrono::seconds(5));
return;
}
}

auto app_status = get_app_bulk_load_status(pid.get_app_id());
if (app_status != bulk_load_status::BLS_INGESTING) {
dwarn_f("app({}) current status is {}, partition({}), ignore it",
app_name,
dsn::enum_to_string(app_status),
pid);
return;
}

rpc_address primary_addr;
{
Expand All @@ -1037,20 +1077,15 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
dwarn_f("app(name={}, id={}) is not existed, set bulk load failed",
app_name,
pid.get_app_id());
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(
&bulk_load_service::handle_app_unavailable, this, pid.get_app_id(), app_name));

handle_app_unavailable(pid.get_app_id(), app_name);
return;
}
primary_addr = app->partitions[pid.get_partition_index()].primary;
}

if (primary_addr.is_invalid()) {
dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(LPC_BULK_LOAD_INGESTION,
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid),
pid.thread_hash(),
Expand All @@ -1062,20 +1097,36 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
derror_f("app({}) partition({}) doesn't have bulk load metadata, set bulk load failed",
app_name,
pid);
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::handle_bulk_load_failed, this, pid.get_app_id()));
handle_bulk_load_failed(pid.get_app_id());
return;
}

tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::send_ingestion_request, this, app_name, pid, primary_addr));
{
zauto_write_lock l(_lock);
_apps_ingesting_count[pid.get_app_id()]++;
ddebug_f("send ingest_request to node({}), app({}) partition({}), ingestion_count({})",
primary_addr.to_string(),
app_name,
pid,
_apps_ingesting_count[pid.get_app_id()]);
}
}

// ThreadPool: THREAD_POOL_DEFAULT
void bulk_load_service::send_ingestion_request(const std::string &app_name,
const gpid &pid,
const rpc_address &primary_addr)
{
ingestion_request req;
req.app_name = app_name;
{
zauto_read_lock l(_lock);
req.metadata = _partition_bulk_load_info[pid].metadata;
}

// create a client request, whose gpid field in header should be pid
message_ex *msg = message_ex::create_request(dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
0,
Expand All @@ -1090,10 +1141,6 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
[this, app_name, pid](error_code err, ingestion_response &&resp) {
on_partition_ingestion_reply(err, std::move(resp), app_name, pid);
});
ddebug_f("send ingest_request to node({}), app({}) partition({})",
primary_addr.to_string(),
app_name,
pid);
_meta_svc->send_request(msg, primary_addr, rpc_callback);
}

Expand All @@ -1103,6 +1150,10 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err,
const std::string &app_name,
const gpid &pid)
{
if (err != ERR_OK || resp.err != ERR_OK || resp.rocksdb_error != ERR_OK) {
decrease_app_ingestion_count(pid);
}

if (err == ERR_NO_NEED_OPERATE) {
dwarn_f(
"app({}) partition({}) has already executing ingestion, ignore this repeated request",
Expand Down Expand Up @@ -1207,6 +1258,7 @@ void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::
erase_map_elem_by_id(app_id, _partitions_cleaned_up);
_apps_rolling_back.erase(app_id);
_apps_rollback_count.erase(app_id);
_apps_ingesting_count.erase(app_id);
_apps_cleaning_up.erase(app_id);
_bulk_load_app_id.erase(app_id);
ddebug_f("reset local app({}) bulk load context", app_name);
Expand Down Expand Up @@ -1679,6 +1731,7 @@ void bulk_load_service::do_continue_app_bulk_load(
zauto_write_lock l(_lock);
_apps_in_progress_count[app_id] = in_progress_partition_count;
_apps_rollback_count[app_id] = 0;
_apps_ingesting_count[app_id] = 0;
}

// if app is paused, no need to send bulk_load_request, just return
Expand Down Expand Up @@ -1713,10 +1766,7 @@ void bulk_load_service::do_continue_app_bulk_load(
gpid pid = gpid(app_id, i);
partition_bulk_load(ainfo.app_name, pid);
if (app_status == bulk_load_status::BLS_INGESTING) {
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, ainfo.app_name, pid));
partition_ingestion(ainfo.app_name, pid);
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace dsn {
namespace replication {

DSN_DECLARE_uint32(bulk_load_max_rollback_times);
DSN_DECLARE_uint32(bulk_load_ingestion_concurrent_count);

///
/// bulk load path on remote storage:
Expand Down Expand Up @@ -172,6 +173,10 @@ class bulk_load_service
// create ingestion_request and send it to primary
void partition_ingestion(const std::string &app_name, const gpid &pid);

void send_ingestion_request(const std::string &app_name,
const gpid &pid,
const rpc_address &primary_addr);

void on_partition_ingestion_reply(error_code err,
const ingestion_response &&resp,
const std::string &app_name,
Expand Down Expand Up @@ -370,6 +375,15 @@ class bulk_load_service
return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end());
}

inline void decrease_app_ingestion_count(const gpid &pid)
{
zauto_write_lock l(_lock);
auto app_id = pid.get_app_id();
if (_apps_ingesting_count.find(app_id) != _apps_ingesting_count.end()) {
_apps_ingesting_count[app_id]--;
}
}

private:
friend class bulk_load_service_test;
friend class meta_bulk_load_http_test;
Expand Down Expand Up @@ -408,6 +422,8 @@ class bulk_load_service
std::unordered_map<app_id, bool> _apps_rolling_back;
// Used for restrict bulk load rollback count
std::unordered_map<app_id, int32_t> _apps_rollback_count;
// app_id -> ingesting partition count
std::unordered_map<app_id, int32_t> _apps_ingesting_count;
};

} // namespace replication
Expand Down
Loading