Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch get #14

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,12 +510,13 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
return Status::InternalError("failed to get rowset meta: {}", resp.status().msg());
}
if (latency > 100 * 1000) { // 100ms
LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size()
LOG(INFO) << "finish get_rowset rpc. tablet_id=" << tablet_id
<< ", rowset_meta.size()=" << resp.rowset_meta().size()
<< ", latency=" << latency << "us";
} else {
LOG_EVERY_N(INFO, 100)
<< "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size()
<< ", latency=" << latency << "us";
LOG_EVERY_N(INFO, 100) << "finish get_rowset rpc. tablet_id=" << tablet_id
<< ", rowset_meta.size()=" << resp.rowset_meta().size()
<< ", latency=" << latency << "us";
}

int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
Expand Down
12 changes: 12 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -860,16 +860,21 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
}

// 2. calc delete bitmap for incremental data
int64_t t1 = MonotonicMicros();
RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
*this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
int64_t t2 = MonotonicMicros();
RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
int64_t t3 = MonotonicMicros();

calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
int64_t t4 = MonotonicMicros();
if (location_map) {
RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
}
int64_t t5 = MonotonicMicros();
if (missed_rows) {
DCHECK_EQ(missed_rows->size(), missed_rows_size);
if (missed_rows->size() != missed_rows_size) {
Expand All @@ -881,6 +886,13 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
// 3. store delete bitmap
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, -1, initiator,
output_rowset_delete_bitmap.get()));
int64_t t6 = MonotonicMicros();
LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id()
<< ", get lock cost " << (t2 - t1)
<< " us, sync rowsets cost " << (t3 - t2)
<< " us, calc delete bitmap cost " << (t4 - t3)
<< " us, check rowid conversion cost " << (t5 - t4)
<< " us, store delete bitmap cost " << (t6 - t5) << " us";
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,8 @@ CONF_Bool(enable_check_instance_id, "true");

// Check if ip eq 127.0.0.1, ms/recycler exit
CONF_Bool(enable_loopback_address_for_ms, "false");

// Used for get_delete_bitmap_update_lock
CONF_mBool(enable_batch_get_mow_tablet_stats, "true");
CONF_mInt32(max_mow_tablet_stat_num_per_batch, "1000");
} // namespace doris::cloud::config
134 changes: 91 additions & 43 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,10 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
}

constexpr size_t BATCH_SIZE = 500;
std::vector<std::string> version_keys;
std::vector<std::optional<std::string>> version_values;
version_keys.reserve(BATCH_SIZE);
version_values.reserve(BATCH_SIZE);
std::vector<std::string> stats_keys;
std::vector<std::optional<std::string>> stats_values;
stats_keys.reserve(BATCH_SIZE);
stats_values.reserve(BATCH_SIZE);

while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) &&
response->versions_size() < num_acquired) {
Expand All @@ -358,8 +358,8 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr

for (size_t i = response->versions_size(); i < num_acquired; i += BATCH_SIZE) {
size_t limit = (i + BATCH_SIZE < num_acquired) ? i + BATCH_SIZE : num_acquired;
version_keys.clear();
version_values.clear();
stats_keys.clear();
stats_values.clear();
for (size_t j = i; j < limit; j++) {
int64_t db_id = request->db_ids(j);
int64_t table_id = request->table_ids(j);
Expand All @@ -370,10 +370,10 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
int64_t partition_id = request->partition_ids(j);
partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key);
}
version_keys.push_back(std::move(ver_key));
stats_keys.push_back(std::move(ver_key));
}

err = txn->batch_get(&version_values, version_keys);
err = txn->batch_get(&stats_values, stats_keys);
TEST_SYNC_POINT_CALLBACK("batch_get_version_err", &err);
if (err == TxnErrorCode::TXN_TOO_OLD) {
// txn too old, fallback to non-snapshot versions.
Expand All @@ -387,7 +387,7 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
break;
}

for (auto&& value : version_values) {
for (auto&& value : stats_values) {
if (!value.has_value()) {
// return -1 if the target version is not exists.
response->add_versions(-1);
Expand Down Expand Up @@ -2222,47 +2222,95 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
// these steps can be done in different fdb txns

StopWatch read_stats_sw;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to init txn";
return;
}
if (!config::enable_batch_get_mow_tablet_stats) {
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to init txn";
return;
}

for (const auto& tablet_idx : request->tablet_indexes()) {
TabletStatsPB tablet_stat;
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string stats_val;
TxnErrorCode err = txn->get(stats_key, &stats_val);
TEST_SYNC_POINT_CALLBACK(
"get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", &err);
if (err == TxnErrorCode::TXN_TOO_OLD) {
code = MetaServiceCode::OK;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to init txn when get tablet stats";
msg = ss.str();
return;
}
err = txn->get(stats_key, &stats_val);
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
tablet_idx.tablet_id());
return;
}
if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key));
return;
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());
}
} else {
int32_t batch_size = config::max_mow_tablet_stat_num_per_batch;
std::vector<std::string> stats_keys;
std::vector<std::optional<std::string>> stats_values;
stats_keys.reserve(batch_size);
stats_values.reserve(batch_size);
size_t num_acquired = request->tablet_indexes_size();

for (size_t i = response->base_compaction_cnts_size(); i < num_acquired; i += batch_size) {
size_t limit = (i + batch_size < num_acquired) ? i + batch_size : num_acquired;
stats_keys.clear();
stats_values.clear();
for (size_t j = i; j < limit; j++) {
auto& tablet_idx = request->tablet_indexes(j);
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
stats_keys.push_back(std::move(stats_key));
}

for (const auto& tablet_idx : request->tablet_indexes()) {
TabletStatsPB tablet_stat;
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string stats_val;
TxnErrorCode err = txn->get(stats_key, &stats_val);
TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error",
&err);
if (err == TxnErrorCode::TXN_TOO_OLD) {
code = MetaServiceCode::OK;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to init txn when get tablet stats";
msg = ss.str();
msg = "failed to init txn";
return;
}
err = txn->get(stats_key, &stats_val);
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
tablet_idx.tablet_id());
return;
}
if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key));
return;

err = txn->batch_get(&stats_values, stats_keys);
if (err != TxnErrorCode::TXN_OK) {
msg = fmt::format("failed to batch get tablet stats, index={}, err={}", i, err);
code = cast_as<ErrCategory::READ>(err);
return;
}

for (auto&& value : stats_values) {
TabletStatsPB tablet_stat;
if (!tablet_stat.ParseFromString(*value)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "marformed tablet stats value";
return;
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());
}
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());
}

read_stats_sw.pause();
Expand Down
7 changes: 7 additions & 0 deletions cloud/src/meta-service/meta_service_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ void begin_rpc(std::string_view func_name, brpc::Controller* ctrl, const Request
} else if constexpr (std::is_same_v<Request, RemoveDeleteBitmapRequest>) {
LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side()
<< " tablet_id=" << req->tablet_id() << " rowset_size=" << req->rowset_ids_size();
} else if constexpr (std::is_same_v<Request, GetDeleteBitmapUpdateLockRequest>) {
LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side()
<< ", table_id=" << req->table_id() << ", lock_id=" << req->lock_id()
<< ", initiator=" << req->initiator() << ", expiration=" << req->expiration()
<< ", require_compaction_stats=" << req->require_compaction_stats();
} else {
LOG(INFO) << "begin " << func_name << " from " << ctrl->remote_side()
<< " request=" << req->ShortDebugString();
Expand Down Expand Up @@ -124,6 +129,8 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re
res->clear_cumulative_compaction_cnts();
res->clear_cumulative_points();
}
LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side()
<< " status=" << res->status().ShortDebugString();
} else if constexpr (std::is_same_v<Response, GetObjStoreInfoResponse> ||
std::is_same_v<Response, GetStageResponse>) {
std::string debug_string = res->DebugString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
throws UserException {
List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos);
try {
LOG.info("try to commit transaction, transactionId: {}", transactionId);
LOG.info("try to commit transaction, transactionId: {}, tableIds: {}", transactionId,
tableList.stream().map(Table::getId).collect(Collectors.toList()));
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
Expand Down Expand Up @@ -922,11 +923,9 @@ private void getDeleteBitmapUpdateLock(long transactionId, List<OlapTable> mowTa
totalRetryTime += retryTime;
}
stopWatch.stop();
if (totalRetryTime > 0 || stopWatch.getTime() > 20) {
LOG.info("get delete bitmap lock successfully. txns: {}. totalRetryTime: {}. "
+ "partitionSize: {}. time cost: {} ms.", transactionId, totalRetryTime,
lockContext.getTableToPartitions().size(), stopWatch.getTime());
}
LOG.info("get delete bitmap lock successfully. txnId: {}. totalRetryTime: {}. "
+ "tableSize: {}. cost: {} ms.", transactionId, totalRetryTime,
lockContext.getTableToPartitions().size(), stopWatch.getTime());
}

private void removeDeleteBitmapUpdateLock(List<OlapTable> tableList, long transactionId) {
Expand Down
Loading