diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 044a64ff95786f8..23d4850d0002e8e 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -92,6 +92,9 @@ Status CloudBaseCompaction::prepare_compact() { compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); cloud::StartTabletJobResponse resp; auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (resp.has_alter_version()) { + (static_cast(_tablet.get()))->set_alter_version(resp.alter_version()); + } if (!st.ok()) { if (resp.status().code() == cloud::STALE_TABLET_CACHE) { // set last_sync_time to 0 to force sync tablet next time @@ -113,7 +116,6 @@ Status CloudBaseCompaction::prepare_compact() { << " schema_change_alter_version=" << resp.alter_version(); std::string msg = ss.str(); LOG(WARNING) << msg; - cloud_tablet->set_alter_version(resp.alter_version()); return Status::InternalError(msg); } return st; diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 0d01a4e5f58b48c..ea6062309f28c7f 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -270,11 +270,13 @@ Status CloudCumulativeCompaction::modify_rowsets() { cloud::FinishTabletJobResponse resp; auto st = _engine.meta_mgr().commit_tablet_job(job, &resp); + if (resp.has_alter_version()) { + (static_cast(_tablet.get()))->set_alter_version(resp.alter_version()); + } if (!st.ok()) { if (resp.status().code() == cloud::TABLET_NOT_FOUND) { cloud_tablet()->clear_cache(); } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) { - (dynamic_cast(_tablet.get()))->set_alter_version(resp.alter_version()); std::stringstream ss; ss << "failed to prepare cumu compaction. Check compaction input versions " "failed in schema change. " @@ -288,6 +290,7 @@ Status CloudCumulativeCompaction::modify_rowsets() { } return st; } + auto& stats = resp.stats(); LOG(INFO) << "tablet stats=" << stats.ShortDebugString(); { diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index ba56d5c5a0b93e5..f63a75f55b89139 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -919,7 +919,10 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string txn->put(job_key, job_val); INSTANCE_LOG(INFO) << "remove compaction job tabelt_id=" << tablet_id << " key=" << hex(job_key); - + response->set_alter_version(recorded_job.has_schema_change() && + recorded_job.schema_change().has_alter_version() + ? recorded_job.schema_change().alter_version() + : -1); need_commit = true; } @@ -1007,9 +1010,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str } // MUST check initiator to let the retried BE commit this schema_change job. - if (request->action() == FinishTabletJobRequest::COMMIT && - (schema_change.id() != recorded_schema_change.id() || - schema_change.initiator() != recorded_schema_change.initiator())) { + if (schema_change.id() != recorded_schema_change.id() || + schema_change.initiator() != recorded_schema_change.initiator()) { SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id() << " given_id=" << schema_change.id() << " recorded_job=" << proto_to_json(recorded_schema_change) diff --git a/cloud/test/meta_service_job_test.cpp b/cloud/test/meta_service_job_test.cpp index def9fb11ed8fec3..f0323eebb790bea 100644 --- a/cloud/test/meta_service_job_test.cpp +++ b/cloud/test/meta_service_job_test.cpp @@ -687,8 +687,11 @@ TEST(MetaServiceJobTest, ProcessSchemaChangeArguments) { recorded_sc->set_id("sc1"); recorded_sc->set_initiator("BE1"); job_val = recorded_job.SerializeAsString(); + auto new_job_key = + job_tablet_key({instance_id, table_id, new_index_id, partition_id, new_tablet_id}); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->put(job_key, job_val); + txn->put(new_job_key, job_val); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); meta_service->finish_tablet_job(&cntl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg(); @@ -2342,12 +2345,12 @@ TEST(MetaServiceJobTest, DoCompactionWhenSC) { StartTabletJobResponse res; start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7, TabletCompactionJobPB::CUMULATIVE, res, {7, 10}); - ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION); res.Clear(); start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7, TabletCompactionJobPB::BASE, res, {0, 10}); - ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL); + ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION); res.Clear(); start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7, @@ -2499,7 +2502,8 @@ TEST(MetaServiceJobTest, CancelSC) { FinishTabletJobResponse finish_res; finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, "job_sc", "BE1", {}, finish_res, FinishTabletJobRequest::ABORT); - ASSERT_EQ(finish_res.status().code(), MetaServiceCode::OK); + ASSERT_NE(finish_res.status().msg().find("unmatched job id or initiator"), + std::string::npos); } { std::unique_ptr txn; diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 06850f7db3aeaf1..f8acd97d05fb382 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1340,7 +1340,7 @@ enum MetaServiceCode { JOB_ALREADY_SUCCESS = 5002; ROUTINE_LOAD_DATA_INCONSISTENT = 5003; ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004; - JOB_CHECK_ALTER_VERSION_FAIL = 5005; + JOB_CHECK_ALTER_VERSION = 5005; // Rate limit MAX_QPS_LIMIT = 6001; diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf b/regression-test/pipeline/cloud_p0/conf/be_custom.conf index 9f2967b1972c116..1da9c9992d5f935 100644 --- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf @@ -33,3 +33,4 @@ save_load_error_log_to_s3 = true enable_stream_load_record = true stream_load_record_batch_size = 500 webserver_num_workers = 128 +enable_new_tablet_do_compaction = true diff --git a/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy b/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy index 6fc8003527dc026..b393979d44218af 100644 --- a/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy +++ b/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy @@ -25,6 +25,7 @@ suite('test_schema_change_with_compaction10') { options.cloudMode = true options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=true" ] options.beConfigs += [ "disable_auto_compaction=true" ] options.beNum = 1 docker(options) { diff --git a/regression-test/suites/cloud_p0/schema_change/compaction11/test_schema_change_with_compaction11.groovy b/regression-test/suites/cloud_p0/schema_change/compaction11/test_schema_change_with_compaction11.groovy new file mode 100644 index 000000000000000..a2df1540bda1d7c --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/compaction11/test_schema_change_with_compaction11.groovy @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_with_compaction11') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=false" ] + options.beConfigs += [ "disable_auto_compaction=true" ] + options.beNum = 1 + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + sql new File("""${context.file.parent}/../ddl/date_delete.sql""").text + def load_date_once = { String table -> + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + "_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + sql new File("""${context.file.parent}/../ddl/date_unique_create.sql""").text + def injectName = 'CloudSchemaChangeJob.process_alter_tablet.sleep' + def injectBe = null + def backends = sql_return_maparray('show backends') + def array = sql_return_maparray("SHOW TABLETS FROM date") + def injectBeId = array[0].BackendId + def originTabletId = array[0].TabletId + injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null) + assertNotNull(injectBe) + + def load_delete_compaction = { + load_date_once("date"); + sql "delete from date where d_datekey < 19900000" + sql "select count(*) from date" + // cu compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + boolean running = true + do { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + try { + load_delete_compaction() + load_delete_compaction() + load_delete_compaction() + + load_date_once("date"); + + sleep(1000) + GetDebugPoint().enableDebugPointForAllBEs(injectName) + sql "ALTER TABLE date MODIFY COLUMN d_holidayfl bigint(11)" + sleep(5000) + array = sql_return_maparray("SHOW TABLETS FROM date") + + for (int i = 0; i < 5; i++) { + load_date_once("date"); + } + + // base compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + def newTabletId = array[1].TabletId + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("invalid tablet state.")) + + + // cu compaction + tabletId = array[0].TabletId + logger.info("run compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + running = true + do { + Thread.sleep(100) + tabletId = array[0].TabletId + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + + // new tablet cannot do cu compaction + tabletId = array[1].TabletId + logger.info("run compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("invalid tablet state.")) + + } finally { + if (injectBe != null) { + GetDebugPoint().disableDebugPointForAllBEs(injectName) + } + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState("date") + if (result == "FINISHED" || result == "CANCELLED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + assertEquals(result, "FINISHED"); + def count = sql """ select count(*) from date; """ + assertEquals(count[0][0], 2556); + // check rowsets + logger.info("run show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[7-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-9]")) + assertTrue(out.contains("[13-13]")) + + // base compaction + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + // cu compaction + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-13]")) + + for (int i = 0; i < 4; i++) { + load_date_once("date"); + } + + sql """ select count(*) from date """ + + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + // wait for all compactions done + running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-17]")) + } + } +} \ No newline at end of file diff --git a/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy b/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy index c338dac907b2456..f5028ff9e818c3c 100644 --- a/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy +++ b/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy @@ -26,6 +26,7 @@ suite('test_schema_change_with_compaction5', 'nonConcurrent') { options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] options.beConfigs += [ "disable_auto_compaction=true" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=true" ] options.beNum = 1 docker(options) { def getJobState = { tableName -> diff --git a/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy b/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy index 245dbe46b714c62..951535433d13621 100644 --- a/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy +++ b/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy @@ -26,6 +26,7 @@ suite('test_schema_change_with_compaction6', 'nonConcurrent') { options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] options.beConfigs += [ "disable_auto_compaction=true" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=true" ] options.beNum = 1 docker(options) { def getJobState = { tableName -> diff --git a/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy b/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy index 6cb47e01f4b62c1..83c549eefc5abd6 100644 --- a/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy +++ b/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy @@ -26,6 +26,7 @@ suite('test_schema_change_with_compaction9') { options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] options.beConfigs += [ "disable_auto_compaction=true" ] + options.beConfigs += [ "enable_new_tablet_do_compaction=true" ] options.beNum = 1 docker(options) { def getJobState = { tableName ->