Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Aug 1, 2024
1 parent 2941096 commit ffb0907
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 25 deletions.
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Status CloudBaseCompaction::prepare_compact() {
} else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
// tablet not found
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) {
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
Expand Down Expand Up @@ -332,7 +332,7 @@ Status CloudBaseCompaction::modify_rowsets() {
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) {
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
Expand Down
11 changes: 6 additions & 5 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;

Status CloudCumulativeCompaction::prepare_compact() {
if (_tablet->tablet_state() != TABLET_RUNNING && config::enable_new_tablet_do_compaction &&
dynamic_cast<CloudTablet*>(_tablet.get())->alter_version() == -1) {
if (_tablet->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction ||
static_cast<CloudTablet*>(_tablet.get())->alter_version() == -1)) {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
}

Expand Down Expand Up @@ -142,8 +143,8 @@ Status CloudCumulativeCompaction::prepare_compact() {
.tag("msg", resp.status().msg());
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
}
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) {
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
Expand Down Expand Up @@ -272,7 +273,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) {
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
tablet->last_sync_time_s = now;

// If is mow, the tablet has no delete bitmap in base rowsets.
// So dont need to sync it.
if (tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
DeleteBitmap delete_bitmap(tablet_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ DEFINE_mBool(save_load_error_log_to_s3, "false");

DEFINE_mInt32(sync_load_for_tablets_thread, "32");

DEFINE_mBool(enable_new_tablet_do_compaction, "true");
DEFINE_mBool(enable_new_tablet_do_compaction, "false");

} // namespace doris::config
33 changes: 16 additions & 17 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;
bool check_compaction_input_verions(const TabletCompactionJobPB& compaction,
const TabletJobInfoPB& job_pb) {
if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) return true;
// compaction need to know [start_version, end_version]
DCHECK_EQ(compaction.input_versions_size(), 2) << proto_to_json(compaction);
DCHECK_LE(compaction.input_versions(0), compaction.input_versions(1))
<< proto_to_json(compaction);
if (compaction.input_versions_size() != 2 ||
compaction.input_versions(0) > compaction.input_versions(1)) {
LOG(WARNING) << "The compaction need to know [start_version, end_version], and \
the start_version should LE end_version. \n"
<< proto_to_json(compaction);
return false;
}

int64_t alter_version = job_pb.schema_change().alter_version();
return (compaction.type() == TabletCompactionJobPB_CompactionType_BASE &&
Expand Down Expand Up @@ -154,7 +157,7 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst
<< " schema_change_alter_version=" << job_pb.schema_change().alter_version();
msg = ss.str();
INSTANCE_LOG(INFO) << msg;
code = MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL;
code = MetaServiceCode::JOB_CHECK_ALTER_VERSION;
response->set_alter_version(job_pb.schema_change().alter_version());
return;
}
Expand Down Expand Up @@ -327,16 +330,6 @@ void start_schema_change_job(MetaServiceCode& code, std::string& msg, std::strin
std::string job_val;
TabletJobInfoPB job_pb;
err = txn->get(job_key, &job_val);
if (err == TxnErrorCode::TXN_OK) {
job_pb.ParseFromString(job_val);
if (job_pb.has_schema_change() && job_pb.schema_change().has_alter_version() &&
job_pb.schema_change().id() == schema_change.id() &&
job_pb.schema_change().initiator() == schema_change.initiator()) {
TEST_SYNC_POINT_CALLBACK("restart_compaction_job");
response->set_alter_version(job_pb.schema_change().alter_version());
return;
}
}
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
SS << "failed to get tablet job, instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " key=" << hex(job_key) << " err=" << err;
Expand All @@ -349,6 +342,13 @@ void start_schema_change_job(MetaServiceCode& code, std::string& msg, std::strin
msg = "pb deserialization failed";
return;
}
if (job_pb.has_schema_change() && job_pb.schema_change().has_alter_version() &&
job_pb.schema_change().id() == schema_change.id() &&
job_pb.schema_change().initiator() == schema_change.initiator()) {
TEST_SYNC_POINT_CALLBACK("restart_compaction_job");
response->set_alter_version(job_pb.schema_change().alter_version());
return;
}
job_pb.mutable_idx()->CopyFrom(request->job().idx());
// FE can ensure that a tablet does not have more than one schema_change job at the same time,
// so we can directly preempt previous schema_change job.
Expand Down Expand Up @@ -620,7 +620,7 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
INSTANCE_LOG(INFO) << msg;
abort_compaction = true;
response->set_alter_version(recorded_job.schema_change().alter_version());
code = MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL;
code = MetaServiceCode::JOB_CHECK_ALTER_VERSION;
}

//==========================================================================
Expand Down Expand Up @@ -1056,7 +1056,6 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
recorded_schema_change.new_tablet_idx().index_id() &&
schema_change.new_tablet_idx().tablet_id() ==
recorded_schema_change.new_tablet_idx().tablet_id()) {
// TODO(cyx)
// remove schema change
recorded_job.clear_schema_change();
new_recorded_job.clear_schema_change();
Expand Down

0 comments on commit ffb0907

Please sign in to comment.