Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Aug 1, 2024
1 parent ffb0907 commit 61cf71c
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 10 deletions.
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ Status CloudBaseCompaction::prepare_compact() {
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
cloud::StartTabletJobResponse resp;
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
// set last_sync_time to 0 to force sync tablet next time
Expand All @@ -113,7 +116,6 @@ Status CloudBaseCompaction::prepare_compact() {
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,13 @@ Status CloudCumulativeCompaction::modify_rowsets() {

cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
Expand All @@ -288,6 +290,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
}
return st;
}

auto& stats = resp.stats();
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
{
Expand Down
10 changes: 6 additions & 4 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,10 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "remove compaction job tabelt_id=" << tablet_id
<< " key=" << hex(job_key);

response->set_alter_version(recorded_job.has_schema_change() &&
recorded_job.schema_change().has_alter_version()
? recorded_job.schema_change().alter_version()
: -1);
need_commit = true;
}

Expand Down Expand Up @@ -1007,9 +1010,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
}

// MUST check initiator to let the retried BE commit this schema_change job.
if (request->action() == FinishTabletJobRequest::COMMIT &&
(schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator())) {
if (schema_change.id() != recorded_schema_change.id() ||
schema_change.initiator() != recorded_schema_change.initiator()) {
SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
<< " given_id=" << schema_change.id()
<< " recorded_job=" << proto_to_json(recorded_schema_change)
Expand Down
10 changes: 7 additions & 3 deletions cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,11 @@ TEST(MetaServiceJobTest, ProcessSchemaChangeArguments) {
recorded_sc->set_id("sc1");
recorded_sc->set_initiator("BE1");
job_val = recorded_job.SerializeAsString();
auto new_job_key =
job_tablet_key({instance_id, table_id, new_index_id, partition_id, new_tablet_id});
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(job_key, job_val);
txn->put(new_job_key, job_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg();
Expand Down Expand Up @@ -2342,12 +2345,12 @@ TEST(MetaServiceJobTest, DoCompactionWhenSC) {
StartTabletJobResponse res;
start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
TabletCompactionJobPB::CUMULATIVE, res, {7, 10});
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL);
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION);
res.Clear();

start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
TabletCompactionJobPB::BASE, res, {0, 10});
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL);
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION);
res.Clear();

start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
Expand Down Expand Up @@ -2499,7 +2502,8 @@ TEST(MetaServiceJobTest, CancelSC) {
FinishTabletJobResponse finish_res;
finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, "job_sc", "BE1", {},
finish_res, FinishTabletJobRequest::ABORT);
ASSERT_EQ(finish_res.status().code(), MetaServiceCode::OK);
ASSERT_NE(finish_res.status().msg().find("unmatched job id or initiator"),
std::string::npos);
}
{
std::unique_ptr<Transaction> txn;
Expand Down
2 changes: 1 addition & 1 deletion gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,7 @@ enum MetaServiceCode {
JOB_ALREADY_SUCCESS = 5002;
ROUTINE_LOAD_DATA_INCONSISTENT = 5003;
ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004;
JOB_CHECK_ALTER_VERSION_FAIL = 5005;
JOB_CHECK_ALTER_VERSION = 5005;

// Rate limit
MAX_QPS_LIMIT = 6001;
Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/cloud_p0/conf/be_custom.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ save_load_error_log_to_s3 = true
enable_stream_load_record = true
stream_load_record_batch_size = 500
webserver_num_workers = 128
enable_new_tablet_do_compaction = true
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ suite('test_schema_change_with_compaction10') {
options.cloudMode = true
options.enableDebugPoints()
options.beConfigs += [ "enable_java_support=false" ]
options.beConfigs += [ "enable_new_tablet_do_compaction=true" ]
options.beConfigs += [ "disable_auto_compaction=true" ]
options.beNum = 1
docker(options) {
Expand Down
Loading

0 comments on commit 61cf71c

Please sign in to comment.