Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jul 30, 2024
1 parent 3d026a8 commit b5c85f3
Show file tree
Hide file tree
Showing 29 changed files with 1,472 additions and 65 deletions.
16 changes: 12 additions & 4 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,20 @@ Status CloudBaseCompaction::prepare_compact() {
// tablet not found
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) {
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
Expand Down Expand Up @@ -329,16 +333,20 @@ Status CloudBaseCompaction::modify_rowsets() {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) {
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
Expand Down
6 changes: 3 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;

Status CloudCumulativeCompaction::prepare_compact() {
if (_tablet->tablet_state() != TABLET_RUNNING &&
if (_tablet->tablet_state() != TABLET_RUNNING && config::enable_new_tablet_do_compaction &&
dynamic_cast<CloudTablet*>(_tablet.get())->alter_version() == -1) {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
}
Expand Down Expand Up @@ -114,8 +114,8 @@ Status CloudCumulativeCompaction::prepare_compact() {

compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
// Set input version range to let meta-service judge version range conflict
compaction_job->set_judge_input_versions_range(config::enable_parallel_cumu_compaction);
// Set input version range to let meta-service check version range conflict
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
cloud::StartTabletJobResponse resp;
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (!st.ok()) {
Expand Down
18 changes: 10 additions & 8 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,22 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
};
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
auto sync_st = tablet->sync_rowsets();
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return sync_st;
}
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
return sync_st;
}
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
tablet->tablet_id());
}
}
auto sync_rowset_time_us = MonotonicMicros() - t2;
max_version = tablet->max_version_unlocked();
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
tablet->last_sync_time_s = now;

if (tablet->enable_unique_key_merge_on_write()) {
if (tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
Expand Down
53 changes: 53 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
RETURN_IF_ERROR(sync_if_not_running());

if (query_version > 0) {
std::shared_lock rlock(_meta_lock);
if (_max_version >= query_version) {
Expand All @@ -133,6 +135,57 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
return st;
}

// Sync tablet meta and all rowset meta if not running.
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
// It should be a quite rare situation.
Status CloudTablet::sync_if_not_running() {
if (tablet_state() == TABLET_RUNNING) {
return Status::OK();
}

// Serially execute sync to reduce unnecessary network overhead
std::lock_guard lock(_sync_meta_lock);

{
std::shared_lock rlock(_meta_lock);
if (tablet_state() == TABLET_RUNNING) {
return Status::OK();
}
}

TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
if (!st.ok()) {
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}

if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
// MoW may go to here when load while schema change
return Status::OK();
}

TimestampedVersionTracker empty_tracker;
{
std::lock_guard wlock(_meta_lock);
RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
_rs_version_map.clear();
_stale_rs_version_map.clear();
std::swap(_timestamped_version_tracker, empty_tracker);
_tablet_meta->clear_rowsets();
_tablet_meta->clear_stale_rowset();
_max_version = -1;
}

st = _engine.meta_mgr().sync_tablet_rowsets(this);
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rdlock(_meta_lock);
TabletSchemaSPtr target_schema;
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class CloudTablet final : public BaseTablet {

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

Status sync_if_not_running();

CloudStorageEngine& _engine;

// this mutex MUST ONLY be used when sync meta
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ DEFINE_mBool(save_load_error_log_to_s3, "false");

DEFINE_mInt32(sync_load_for_tablets_thread, "32");

DEFINE_mBool(enable_new_tablet_do_compaction, "true");

} // namespace doris::config
1 change: 1 addition & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ DECLARE_mInt32(tablet_sync_interval_s);

// Cloud compaction config
DECLARE_mInt64(min_compaction_failure_interval_ms);
DECLARE_mBool(enable_new_tablet_do_compaction);
// For cloud read/write separate mode
DECLARE_mInt64(base_compaction_freeze_interval_s);
DECLARE_mInt64(cu_compaction_freeze_interval_s);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ Status BaseTablet::update_delete_bitmap_without_lock(
<< ", rnd:" << rnd << ", percent: " << percent;
}
});
int64_t cur_version = rowset->end_version();
int64_t cur_version = rowset->start_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));

Expand Down
1 change: 0 additions & 1 deletion be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <memory>
#include <mutex>
#include <roaring/roaring.hh>
#include <thread>
#include <tuple>
#include <utility>

Expand Down
84 changes: 64 additions & 20 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,22 @@ namespace doris::cloud {
static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;

// check compaction input_versions are valid during schema change.
// If the schema change job doesnt have alter version, it dont need to check
// because the schema change job is come from old version BE.
// we will check they in prepare compaction and commit compaction.
// 1. When if base compaction, we need to guarantee the end version
// is less than or equal to alter_version.
// 2. When if cu compaction, we need to guarantee the start version
// is large than alter_version.
bool check_compaction_input_verions(const TabletCompactionJobPB& compaction,
const TabletJobInfoPB& job_pb) {
if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) return true;
// compaction need to know [start_version, end_version]
DCHECK_EQ(compaction.input_versions_size(), 2) << proto_to_json(compaction);
DCHECK_LE(compaction.input_versions(0), compaction.input_versions(1))
<< proto_to_json(compaction);

int64_t alter_version = job_pb.schema_change().alter_version();
return (compaction.type() == TabletCompactionJobPB_CompactionType_BASE &&
compaction.input_versions(1) <= alter_version) ||
Expand Down Expand Up @@ -136,7 +147,7 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst
}
while (err == TxnErrorCode::TXN_OK) {
job_pb.ParseFromString(job_val);
if (job_pb.has_schema_change() && !check_compaction_input_verions(compaction, job_pb)) {
if (!check_compaction_input_verions(compaction, job_pb)) {
SS << "Check compaction input versions failed in schema change. input_version_start="
<< compaction.input_versions(0)
<< " input_version_end=" << compaction.input_versions(1)
Expand Down Expand Up @@ -176,8 +187,10 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst
// for MOW table, so priority should be given to performing full
// compaction operations and canceling other types of compaction.
compactions.Clear();
} else if (!compaction.has_judge_input_versions_range() ||
!compaction.judge_input_versions_range()) {
} else if ((!compaction.has_check_input_versions_range() &&
compaction.input_versions().empty()) ||
(compaction.has_check_input_versions_range() &&
!compaction.check_input_versions_range())) {
// Unknown input version range, doesn't support parallel compaction of same type
for (auto& c : compactions) {
if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL)
Expand Down Expand Up @@ -316,8 +329,10 @@ void start_schema_change_job(MetaServiceCode& code, std::string& msg, std::strin
err = txn->get(job_key, &job_val);
if (err == TxnErrorCode::TXN_OK) {
job_pb.ParseFromString(job_val);
if (job_pb.has_schema_change() && job_pb.schema_change().id() == schema_change.id() &&
if (job_pb.has_schema_change() && job_pb.schema_change().has_alter_version() &&
job_pb.schema_change().id() == schema_change.id() &&
job_pb.schema_change().initiator() == schema_change.initiator()) {
TEST_SYNC_POINT_CALLBACK("restart_compaction_job");
response->set_alter_version(job_pb.schema_change().alter_version());
return;
}
Expand Down Expand Up @@ -596,7 +611,7 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
}

bool abort_compaction = false;
if (recorded_job.has_schema_change() && request->action() == FinishTabletJobRequest::COMMIT &&
if (request->action() == FinishTabletJobRequest::COMMIT &&
!check_compaction_input_verions(compaction, recorded_job)) {
SS << "Check compaction input versions failed in schema change. input_version_start="
<< compaction.input_versions(0) << " input_version_end=" << compaction.input_versions(1)
Expand Down Expand Up @@ -941,8 +956,6 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str

auto new_tablet_key = meta_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
auto new_tablet_job_key = job_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
std::string new_tablet_val;
doris::TabletMetaCloudPB new_tablet_meta;
TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
Expand Down Expand Up @@ -994,8 +1007,9 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
}

// MUST check initiator to let the retried BE commit this schema_change job.
if (schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator()) {
if (request->action() == FinishTabletJobRequest::COMMIT &&
(schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator())) {
SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
<< " given_id=" << schema_change.id()
<< " recorded_job=" << proto_to_json(recorded_schema_change)
Expand All @@ -1013,20 +1027,48 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
return;
}

auto new_tablet_job_key = job_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});

std::string new_tablet_job_val;
err = txn->get(new_tablet_job_key, &new_tablet_job_val);
if (err != TxnErrorCode::TXN_OK) {
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
}
TabletJobInfoPB new_recorded_job;
if (!new_recorded_job.ParseFromString(new_tablet_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed new tablet recorded job";
return;
}

//==========================================================================
// Abort
//==========================================================================
if (request->action() == FinishTabletJobRequest::ABORT) {
// TODO(cyx)
// remove schema change
recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
txn->remove(new_tablet_job_key);
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);

need_commit = true;
if (schema_change.new_tablet_idx().index_id() ==
recorded_schema_change.new_tablet_idx().index_id() &&
schema_change.new_tablet_idx().tablet_id() ==
recorded_schema_change.new_tablet_idx().tablet_id()) {
// TODO(cyx)
// remove schema change
recorded_job.clear_schema_change();
new_recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(job_key, job_val);
txn->put(new_tablet_job_key, new_tablet_job_val);
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);

need_commit = true;
}
return;
}

Expand Down Expand Up @@ -1185,9 +1227,11 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
// remove schema_change job
//==========================================================================
recorded_job.clear_schema_change();
new_recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
txn->remove(new_tablet_job_key);
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);

Expand Down
Loading

0 comments on commit b5c85f3

Please sign in to comment.