diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp new file mode 100644 index 000000000000000..e4d1c1912153ecb --- /dev/null +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -0,0 +1,335 @@ +#include "cloud/cloud_schema_change_job.h" + +#include +#include + +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/cloud_meta_mgr.h" +// #include "cloud/utils.h" +#include "common/status.h" +#include "olap/delete_handler.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_fwd.h" +#include "olap/tablet_meta.h" +#include "service/backend_options.h" + +namespace doris::cloud { +using namespace ErrorCode; + +static constexpr int ALTER_TABLE_BATCH_SIZE = 4096; + +static std::unique_ptr get_sc_procedure(const BlockChanger& changer, + bool sc_sorting) { + if (sc_sorting) { + return std::make_unique( + changer, config::memory_limitation_per_thread_for_schema_change_bytes); + } + // else sc_directly + return std::make_unique(changer); +} + +CloudSchemaChangeJob::CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_engine, std::string job_id, int64_t expiration) + : _cloud_storage_engine(cloud_storage_engine), _job_id(std::move(job_id)), _expiration(expiration) {} + +CloudSchemaChangeJob::~CloudSchemaChangeJob() = default; + +Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { + LOG(INFO) << "Begin to alter tablet. base_tablet_id=" << request.base_tablet_id + << ", new_tablet_id=" << request.new_tablet_id + << ", alter_version=" << request.alter_version << ", job_id=" << _job_id; + + // new tablet has to exist + _new_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.new_tablet_id)); + if (_new_tablet->tablet_state() == TABLET_RUNNING) { + LOG(INFO) << "schema change job has already finished. base_tablet_id=" + << request.base_tablet_id << ", new_tablet_id=" << request.new_tablet_id + << ", alter_version=" << request.alter_version << ", job_id=" << _job_id; + return Status::OK(); + } + + _base_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.base_tablet_id));; + std::unique_lock schema_change_lock(_base_tablet->get_schema_change_lock(), + std::try_to_lock); + if (!schema_change_lock.owns_lock()) { + LOG(WARNING) << "Failed to obtain schema change lock. base_tablet=" + << request.base_tablet_id; + return Status::Error("Failed to obtain schema change lock. base_tablet={}", + request.base_tablet_id); + } + // MUST sync rowsets before capturing rowset readers and building DeleteHandler + RETURN_IF_ERROR(_base_tablet->sync_rowsets(request.alter_version)); + // ATTN: Only convert rowsets of version larger than 1, MUST let the new tablet cache have rowset [0-1] + _output_cumulative_point = _base_tablet->cumulative_layer_point(); + + std::vector rs_splits; + int64_t base_max_version = _base_tablet->max_version_unlocked(); + if (request.alter_version > 1) { + // [0-1] is a placeholder rowset, no need to convert + RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, base_max_version}, &rs_splits)); + } + // FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert. + + // Create a new tablet schema, should merge with dropped columns in light weight schema change + _base_tablet_schema = std::make_shared(); + _base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns); + + // delete handlers to filter out deleted rows + DeleteHandler delete_handler; + std::vector delete_predicates; + for (auto& split : rs_splits) { + auto& rs_meta = split.rs_reader->rowset()->rowset_meta(); + if (rs_meta->has_delete_predicate()) { + _base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema()); + delete_predicates.push_back(rs_meta); + } + } + RETURN_IF_ERROR(delete_handler.init(_base_tablet_schema, delete_predicates, base_max_version)); + + std::vector return_columns; + return_columns.resize(_base_tablet_schema->num_columns()); + std::iota(return_columns.begin(), return_columns.end(), 0); + + // reader_context is stack variables, it's lifetime MUST keep the same with rs_readers + RowsetReaderContext reader_context; + reader_context.reader_type = ReaderType::READER_ALTER_TABLE; + reader_context.tablet_schema = _base_tablet_schema; + reader_context.need_ordered_result = true; + reader_context.delete_handler = &delete_handler; + reader_context.return_columns = &return_columns; + reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); + reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; + reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; + reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); + reader_context.version = Version(0, base_max_version); + + for (auto& split : rs_splits) { + RETURN_IF_ERROR(split.rs_reader->init(&reader_context)); + } + + SchemaChangeParams sc_params; + + RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl)); + sc_params.ref_rowset_readers.reserve(rs_splits.size()); + for (RowSetSplits& split : rs_splits) { + sc_params.ref_rowset_readers.emplace_back(std::move(split.rs_reader)); + } + sc_params.delete_handler = &delete_handler; + sc_params.be_exec_version = request.be_exec_version; + DCHECK(request.__isset.alter_tablet_type); + switch (request.alter_tablet_type) { + case TAlterTabletType::SCHEMA_CHANGE: + sc_params.alter_tablet_type = AlterTabletType::SCHEMA_CHANGE; + break; + case TAlterTabletType::ROLLUP: + sc_params.alter_tablet_type = AlterTabletType::ROLLUP; + break; + case TAlterTabletType::MIGRATION: + sc_params.alter_tablet_type = AlterTabletType::MIGRATION; + break; + } + if (!request.__isset.materialized_view_params) { + return _convert_historical_rowsets(sc_params); + } + for (auto item : request.materialized_view_params) { + AlterMaterializedViewParam mv_param; + mv_param.column_name = item.column_name; + /* + * origin_column_name is always be set now, + * but origin_column_name may be not set in some materialized view function. eg:count(1) + */ + if (item.__isset.origin_column_name) { + mv_param.origin_column_name = item.origin_column_name; + } + + if (item.__isset.mv_expr) { + mv_param.expr = std::make_shared(item.mv_expr); + } + sc_params.materialized_params_map.insert(std::make_pair(item.column_name, mv_param)); + } + sc_params.enable_unique_key_merge_on_write = _new_tablet->enable_unique_key_merge_on_write(); + return _convert_historical_rowsets(sc_params); +} + +Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc_params) { + LOG(INFO) << "Begin to convert historical rowsets for new_tablet from base_tablet. base_tablet=" + << _base_tablet->tablet_id() + << ", new_tablet=" << _new_tablet->tablet_id() << ", job_id=" << _job_id; + + // Add filter information in change, and filter column information will be set in _parse_request + // And filter some data every time the row block changes + BlockChanger changer(_new_tablet->tablet_schema(), *sc_params.desc_tbl); + + bool sc_sorting = false; + bool sc_directly = false; + + // 1. Parse the Alter request and convert it into an internal representation + RETURN_IF_ERROR( + SchemaChangeJob::parse_request(sc_params, _base_tablet_schema.get(), _new_tablet_schema.get(), + &changer, &sc_sorting, &sc_directly)); + if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) { + LOG(INFO) << "Don't support to add materialized view by linked schema change"; + return Status::InternalError( + "Don't support to add materialized view by linked schema change"); + } + + // 2. Generate historical data converter + auto sc_procedure = get_sc_procedure(changer, sc_sorting); + + cloud::TabletJobInfoPB job; + auto* idx = job.mutable_idx(); + idx->set_tablet_id(_base_tablet->tablet_id()); + idx->set_table_id(_base_tablet->table_id()); + idx->set_index_id(_base_tablet->index_id()); + idx->set_partition_id(_base_tablet->partition_id()); + auto* sc_job = job.mutable_schema_change(); + sc_job->set_id(_job_id); + sc_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + auto* new_tablet_idx = sc_job->mutable_new_tablet_idx(); + new_tablet_idx->set_tablet_id(_new_tablet->tablet_id()); + new_tablet_idx->set_table_id(_new_tablet->table_id()); + new_tablet_idx->set_index_id(_new_tablet->index_id()); + new_tablet_idx->set_partition_id(_new_tablet->partition_id()); + cloud::StartTabletJobResponse start_resp; + auto st = _cloud_storage_engine.meta_mgr().prepare_tablet_job(job, &start_resp); + if (!st.ok()) { + if (start_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) { + st = _new_tablet->sync_rowsets(); + if (!st.ok()) { + LOG_WARNING("failed to sync new tablet") + .tag("tablet_id", _new_tablet->tablet_id()) + .error(st); + } + return Status::OK(); + } + return st; + } + + // 3. Convert historical data + for (const auto& rs_reader : sc_params.ref_rowset_readers) { + VLOG_TRACE << "Begin to convert a history rowset. version=" << rs_reader->version(); + + RowsetWriterContext context; + context.txn_id = rs_reader->rowset()->txn_id(); + context.txn_expiration = _expiration; + context.version = rs_reader->version(); + context.rowset_state = VISIBLE; + context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); + context.tablet_schema = _new_tablet->tablet_schema(); + context.newest_write_timestamp = rs_reader->newest_write_timestamp(); + context.fs = _cloud_storage_engine.latest_fs(); + auto rowset_writer = DORIS_TRY(_new_tablet->create_rowset_writer(context, false)); + + RowsetMetaSharedPtr existed_rs_meta; + auto st = _cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(), true, + &existed_rs_meta); + if (!st.ok()) { + if (st.is()) { + LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet " + << _new_tablet->tablet_id(); + // Add already committed rowset to _output_rowsets. + DCHECK(existed_rs_meta != nullptr); + RowsetSharedPtr rowset; + // schema is nullptr implies using RowsetMeta.tablet_schema + RETURN_IF_ERROR(RowsetFactory::create_rowset(nullptr, _new_tablet->tablet_path(), + existed_rs_meta, &rowset)); + _output_rowsets.push_back(std::move(rowset)); + continue; + } else { + return st; + } + } + + RETURN_IF_ERROR(sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, + _base_tablet, _base_tablet_schema, _new_tablet_schema)); + + RowsetSharedPtr new_rowset; + st = rowset_writer->build(new_rowset); + if (!st.ok()) { + return Status::InternalError("failed to build rowset, version=[{}-{}] status={}", + rs_reader->version().first, rs_reader->version().second, + st.to_string()); + } + + st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), true, &existed_rs_meta); + if (!st.ok()) { + if (st.is()) { + LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet " + << _new_tablet->tablet_id(); + // Add already committed rowset to _output_rowsets. + DCHECK(existed_rs_meta != nullptr); + RowsetSharedPtr rowset; + // schema is nullptr implies using RowsetMeta.tablet_schema + RETURN_IF_ERROR(RowsetFactory::create_rowset(nullptr, _new_tablet->tablet_path(), + existed_rs_meta, &rowset)); + _output_rowsets.push_back(std::move(rowset)); + continue; + } else { + return st; + } + } + _output_rowsets.push_back(std::move(new_rowset)); + + VLOG_TRACE << "Successfully convert a history version " << rs_reader->version(); + } + + if (sc_params.ref_rowset_readers.empty()) { + sc_job->set_alter_version(1); // no rowset to convert implies alter_version == 1 + } else { + int64_t num_output_rows = 0; + int64_t size_output_rowsets = 0; + int64_t num_output_segments = 0; + for (auto& rs : _output_rowsets) { + sc_job->add_txn_ids(rs->txn_id()); + sc_job->add_output_versions(rs->end_version()); + num_output_rows += rs->num_rows(); + size_output_rowsets += rs->data_disk_size(); + num_output_segments += rs->num_segments(); + } + sc_job->set_num_output_rows(num_output_rows); + sc_job->set_size_output_rowsets(size_output_rowsets); + sc_job->set_num_output_segments(num_output_segments); + sc_job->set_num_output_rowsets(_output_rowsets.size()); + sc_job->set_alter_version(_output_rowsets.back()->end_version()); + } + _output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1); + sc_job->set_output_cumulative_point(_output_cumulative_point); + + // TODO(Lchangliang): process delete bitmap if the table is MOW + + cloud::FinishTabletJobResponse finish_resp; + st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); + if (!st.ok()) { + if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) { + st = _new_tablet->sync_rowsets(); + if (!st.ok()) { + LOG_WARNING("failed to sync new tablet") + .tag("tablet_id", _new_tablet->tablet_id()) + .error(st); + } + return Status::OK(); + } + return st; + } + const auto& stats = finish_resp.stats(); + { + std::unique_lock wlock(_new_tablet->get_header_lock()); + // new_tablet's state MUST be `TABLET_NOTREADY`, because we won't sync a new tablet in schema change job + DCHECK(_new_tablet->tablet_state() == TABLET_NOTREADY); + if (_new_tablet->tablet_state() != TABLET_NOTREADY) [[unlikely]] { + LOG(ERROR) << "invalid tablet state, tablet_id=" << _new_tablet->tablet_id(); + return Status::InternalError("invalid tablet state, tablet_id={}", _new_tablet->tablet_id()); + } + _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock); + _new_tablet->set_cumulative_layer_point(_output_cumulative_point); + _new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), + stats.num_rows(), stats.data_size()); + RETURN_IF_ERROR(_new_tablet->set_tablet_state(TABLET_RUNNING)); + } + return Status::OK(); +} +} // namespace doris::cloud diff --git a/be/src/cloud/cloud_schema_change_job.h b/be/src/cloud/cloud_schema_change_job.h new file mode 100644 index 000000000000000..7dd89b56fb6259f --- /dev/null +++ b/be/src/cloud/cloud_schema_change_job.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include "cloud/cloud_storage_engine.h" +#include "olap/rowset/rowset.h" +#include "olap/schema_change.h" +#include "cloud/cloud_tablet.h" +#include "olap/tablet_fwd.h" + +namespace doris::cloud { + +class CloudSchemaChangeJob { +public: + CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_engine, std::string job_id, int64_t expiration); + ~CloudSchemaChangeJob(); + + // This method is idempotent for a same request. + Status process_alter_tablet(const TAlterTabletReqV2& request); + +private: + Status _convert_historical_rowsets(const SchemaChangeParams& sc_params); + +private: + CloudStorageEngine& _cloud_storage_engine; + std::shared_ptr _base_tablet; + std::shared_ptr _new_tablet; + TabletSchemaSPtr _base_tablet_schema; + TabletSchemaSPtr _new_tablet_schema; + std::string _job_id; + std::vector _output_rowsets; + int64_t _output_cumulative_point; + int64_t _expiration; +}; + +} // namespace doris::cloud diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 10e1390496140a7..1e8a2cb08ecc8fb 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -35,7 +35,7 @@ class CloudTablet final : public BaseTablet { bool vertical) override; Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - bool skip_missing_version) override; + bool skip_missing_version = false) override; Status capture_consistent_rowsets_unlocked( const Version& spec_version, std::vector* rowsets) const override; diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 5152f0a541d0f14..943c257de645e35 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -58,6 +58,7 @@ class BaseTablet { int32_t schema_hash() const { return _tablet_meta->schema_hash(); } KeysType keys_type() const { return _tablet_meta->tablet_schema()->keys_type(); } size_t num_key_columns() const { return _tablet_meta->tablet_schema()->num_key_columns(); } + std::mutex& get_schema_change_lock() { return _schema_change_lock; } bool enable_unique_key_merge_on_write() const { #ifdef BE_TEST if (_tablet_meta == nullptr) { @@ -238,6 +239,9 @@ class BaseTablet { // metrics of this tablet std::shared_ptr _metric_entity; +protected: + std::mutex _schema_change_lock; + public: IntCounter* query_scan_bytes = nullptr; IntCounter* query_scan_rows = nullptr; diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 33dd99ca7b6fe8d..f316372097822b1 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -33,6 +33,7 @@ #include "common/config.h" #include "common/logging.h" +#include "olap/base_tablet.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowid_conversion.h" diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index d3005c7cb3af985..c32054dd7b94b7c 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -20,19 +20,23 @@ #include #include #include +#include #include #include #include #include +#include "cloud/config.h" #include "common/logging.h" #include "common/signal_handler.h" #include "common/status.h" +#include "exec/schema_scanner/schema_metadata_name_ids_scanner.h" #include "gutil/hash/hash.h" #include "gutil/integral_types.h" #include "gutil/strings/numbers.h" #include "io/fs/file_system.h" #include "io/io_common.h" +#include "olap/base_tablet.h" #include "olap/data_dir.h" #include "olap/delete_handler.h" #include "olap/field.h" @@ -60,6 +64,7 @@ #include "olap/types.h" #include "olap/utils.h" #include "olap/wrapper_field.h" +#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" #include "util/debug_points.h" @@ -85,7 +90,7 @@ constexpr int ALTER_TABLE_BATCH_SIZE = 4064; class MultiBlockMerger { public: - MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {} + MultiBlockMerger(BaseTabletSPtr tablet) : _tablet(tablet), _cmp(*tablet) {} Status merge(const std::vector>& blocks, RowsetWriter* rowset_writer, uint64_t* merged_rows) { @@ -224,7 +229,7 @@ class MultiBlockMerger { }; struct RowRefComparator { - RowRefComparator(TabletSharedPtr tablet) : _num_columns(tablet->num_key_columns()) {} + RowRefComparator(const BaseTablet& tablet) : _num_columns(tablet.num_key_columns()) {} int compare(const RowRef& lhs, const RowRef& rhs) const { return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1); @@ -237,7 +242,7 @@ class MultiBlockMerger { const size_t _num_columns; }; - TabletSharedPtr _tablet; + BaseTabletSPtr _tablet; RowRefComparator _cmp; }; @@ -364,7 +369,7 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, // This check is to prevent schema-change from causing data loss Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column, vectorized::ColumnPtr new_column, - AlterTabletType type) const { + AlterTabletType type) { if (ref_column->size() != new_column->size()) { return Status::InternalError( "column size is changed, ref_column_size={}, new_column_size={}", @@ -429,51 +434,39 @@ Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column, } Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSharedPtr base_tablet, + BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) { - // In some cases, there may be more than one type of rowset in a tablet, - // in which case the conversion cannot be done directly by linked schema change, - // but requires direct schema change to rewrite the data. - if (rowset_reader->type() != rowset_writer->type()) { - LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id() - << " in base tablet is not same as type " << rowset_writer->type() - << ", use direct schema change."; - return SchemaChangeHandler::get_sc_procedure(_changer, false, true) - ->process(rowset_reader, rowset_writer, new_tablet, base_tablet, base_tablet_schema, - new_tablet_schema); - } else { - Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset()); - if (!status) { - LOG(WARNING) << "fail to convert rowset." - << ", new_tablet=" << new_tablet->tablet_id() - << ", version=" << rowset_writer->version().first << "-" - << rowset_writer->version().second << ", error status " << status; - return status; - } - // copy delete bitmap to new tablet. - if (new_tablet->keys_type() == UNIQUE_KEYS && - new_tablet->enable_unique_key_merge_on_write()) { - DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id()); - base_tablet->tablet_meta()->delete_bitmap().subset( - {rowset_reader->rowset()->rowset_id(), 0, 0}, - {rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX}, - &origin_delete_bitmap); - for (auto& iter : origin_delete_bitmap.delete_bitmap) { - int ret = new_tablet->tablet_meta()->delete_bitmap().set( - {rowset_writer->rowset_id(), std::get<1>(iter.first), - std::get<2>(iter.first)}, - iter.second); - DCHECK(ret == 1); - } + Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset()); + if (!status) { + LOG(WARNING) << "fail to convert rowset." + << ", new_tablet=" << new_tablet->tablet_id() + << ", version=" << rowset_writer->version().first << "-" + << rowset_writer->version().second << ", error status " << status; + return status; + } + // copy delete bitmap to new tablet. + if (new_tablet->keys_type() == UNIQUE_KEYS && + new_tablet->enable_unique_key_merge_on_write()) { + DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id()); + base_tablet->tablet_meta()->delete_bitmap().subset( + {rowset_reader->rowset()->rowset_id(), 0, 0}, + {rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX}, + &origin_delete_bitmap); + for (auto& iter : origin_delete_bitmap.delete_bitmap) { + int ret = new_tablet->tablet_meta()->delete_bitmap().set( + {rowset_writer->rowset_id(), std::get<1>(iter.first), + std::get<2>(iter.first)}, + iter.second); + DCHECK(ret == 1); } - return Status::OK(); } + return Status::OK(); } Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, + BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) { bool eof = false; @@ -498,7 +491,7 @@ Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader return Status::OK(); } -VSchemaChangeWithSorting::VSchemaChangeWithSorting(const BlockChanger& changer, +VBaseSchemaChangeWithSorting::VBaseSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation) : _changer(changer), _memory_limitation(memory_limitation), @@ -507,26 +500,14 @@ VSchemaChangeWithSorting::VSchemaChangeWithSorting(const BlockChanger& changer, fmt::format("VSchemaChangeWithSorting:changer={}", std::to_string(int64(&changer)))); } -Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, +Status VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, + BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) { // for internal sorting std::vector> blocks; - // for external sorting - // src_rowsets to store the rowset generated by internal sorting - std::vector src_rowsets; - std::vector pending_rs_guards; - - Defer defer {[&]() { - // remove the intermediate rowsets generated by internal sorting - for (auto& row_set : src_rowsets) { - ExecEnv::GetInstance()->storage_engine().to_local().add_unused_rowset(row_set); - } - }}; - RowsetSharedPtr rowset = rowset_reader->rowset(); SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap(); int64_t newest_write_timestamp = rowset->newest_write_timestamp(); @@ -537,12 +518,11 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea return Status::OK(); } - auto [rowset, guard] = DORIS_TRY(_internal_sorting( + auto rowset= DORIS_TRY(_internal_sorting( blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second), newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap, new_tablet_schema)); - src_rowsets.push_back(std::move(rowset)); - pending_rs_guards.push_back(std::move(guard)); + _src_rowsets.push_back(std::move(rowset)); for (auto& block : blocks) { _mem_tracker->release(block->allocated_bytes()); } @@ -592,19 +572,19 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea RETURN_IF_ERROR(create_rowset()); - if (src_rowsets.empty()) { + if (_src_rowsets.empty()) { RETURN_IF_ERROR(rowset_writer->flush()); } else { RETURN_IF_ERROR( - _external_sorting(src_rowsets, rowset_writer, new_tablet, new_tablet_schema)); + _external_sorting(_src_rowsets, rowset_writer, new_tablet, new_tablet_schema)); } return Status::OK(); } -Result> VSchemaChangeWithSorting::_internal_sorting( +Result VBaseSchemaChangeWithSorting::_internal_sorting( const std::vector>& blocks, const Version& version, - int64_t newest_write_timestamp, TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, + int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) { uint64_t merged_rows = 0; MultiBlockMerger merger(new_tablet); @@ -624,18 +604,16 @@ Result> VSchemaChangeWithSorting: } else { rowset_writer = std::move(result).value(); } - auto guard = ExecEnv::GetInstance()->storage_engine().to_local().pending_local_rowsets().add( - context.rowset_id); RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows)); _add_merged_rows(merged_rows); RowsetSharedPtr rowset; RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset)); - return std::make_pair(std::move(rowset), std::move(guard)); + return rowset; } -Status VSchemaChangeWithSorting::_external_sorting(vector& src_rowsets, +Status VBaseSchemaChangeWithSorting::_external_sorting(vector& src_rowsets, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, + BaseTabletSPtr new_tablet, TabletSchemaSPtr new_tablet_schema) { std::vector rs_readers; for (auto& rowset : src_rowsets) { @@ -652,7 +630,34 @@ Status VSchemaChangeWithSorting::_external_sorting(vector& src_ return Status::OK(); } -Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) { +Status VLocalSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) { + Defer defer {[&]() { + // remove the intermediate rowsets generated by internal sorting + for (auto& row_set : _src_rowsets) { + _local_storage_engine.add_unused_rowset(row_set); + } + }}; + return VBaseSchemaChangeWithSorting::_inner_process(rowset_reader, rowset_writer, new_tablet, + base_tablet_schema, new_tablet_schema); +} + +Result VLocalSchemaChangeWithSorting::_internal_sorting( + const std::vector>& blocks, + const Version& temp_delta_versions, int64_t newest_write_timestamp, + BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, + SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) { + auto res = VBaseSchemaChangeWithSorting::_internal_sorting(blocks, temp_delta_versions, newest_write_timestamp, + new_tablet, new_rowset_type, segments_overlap, new_tablet_schema); + if (res.has_value()) { + auto guard = _local_storage_engine.pending_local_rowsets().add(res.value()->rowset_id()); + _pending_rs_guards.push_back(std::move(guard)); + } + return res; +} + +Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { if (!request.__isset.desc_tbl) { return Status::Error( "desc_tbl is not set. Maybe the FE version is not equal to the BE " @@ -664,7 +669,7 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req << ", alter_version=" << request.alter_version; TabletSharedPtr base_tablet = - ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager()->get_tablet( + _local_storage_engine.tablet_manager()->get_tablet( request.base_tablet_id); if (base_tablet == nullptr) { return Status::Error("fail to find base tablet. base_tablet={}", @@ -678,71 +683,74 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req request.base_tablet_id); } - Status res = _do_process_alter_tablet_v2(request); + Status res = _do_process_alter_tablet(request); LOG(INFO) << "finished alter tablet process, res=" << res; return res; } -std::shared_mutex SchemaChangeHandler::_mutex; -std::unordered_set SchemaChangeHandler::_tablet_ids_in_converting; +SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine, const TAlterTabletReqV2& request) + : _local_storage_engine(local_storage_engine) { + _base_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.base_tablet_id); + _new_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.new_tablet_id); + _base_tablet_schema = std::make_shared(); + if (_base_tablet && _new_tablet) { + _base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns); + // During a schema change, the extracted columns of a variant should not be included in the tablet schema. + // This is because the schema change for a variant needs to ignore the extracted columns. + // Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change, + // the complete variant is constructed by reading all the sub-columns of the variant. + _new_tablet_schema = _new_tablet->tablet_schema()->copy_without_extracted_columns(); + } +} // In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished // It will cost a lot of time to wait and the task is very difficult to understand. // In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data. // The admin should upgrade all BE and then upgrade FE. // Should delete the old code after upgrade finished. -Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& request) { - Status res = Status::OK(); - TabletSharedPtr base_tablet = - ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager()->get_tablet( - request.base_tablet_id); - if (base_tablet == nullptr) { +Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& request) { + if (_base_tablet == nullptr) { return Status::Error("fail to find base tablet. base_tablet={}", request.base_tablet_id); } - - signal::tablet_id = base_tablet->get_table_id(); - - // new tablet has to exist - TabletSharedPtr new_tablet = - ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager()->get_tablet( - request.new_tablet_id); - if (new_tablet == nullptr) { + if (_new_tablet == nullptr) { return Status::Error("fail to find new tablet. new_tablet={}", request.new_tablet_id); } + Status res = Status::OK(); + signal::tablet_id = _base_tablet->get_table_id(); // check if tablet's state is not_ready, if it is ready, it means the tablet already finished // check whether the tablet's max continuous version == request.version - if (new_tablet->tablet_state() != TABLET_NOTREADY) { - res = _validate_alter_result(new_tablet, request); - LOG(INFO) << "tablet's state=" << new_tablet->tablet_state() + if (_new_tablet->tablet_state() != TABLET_NOTREADY) { + res = _validate_alter_result(request); + LOG(INFO) << "tablet's state=" << _new_tablet->tablet_state() << " the convert job already finished, check its version" << " res=" << res; return res; } - new_tablet->set_alter_failed(false); - Defer defer([&new_tablet] { + _new_tablet->set_alter_failed(false); + Defer defer([this] { // if tablet state is not TABLET_RUNNING when return, indicates that alter has failed. - if (new_tablet->tablet_state() != TABLET_RUNNING) { - new_tablet->set_alter_failed(true); + if (_new_tablet->tablet_state() != TABLET_RUNNING) { + _new_tablet->set_alter_failed(true); } }); LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet " "to new tablet" - << " base_tablet=" << base_tablet->tablet_id() - << " new_tablet=" << new_tablet->tablet_id(); + << " base_tablet=" << _base_tablet->tablet_id() + << " new_tablet=" << _new_tablet->tablet_id(); - std::shared_lock base_migration_rlock(base_tablet->get_migration_lock(), std::try_to_lock); + std::shared_lock base_migration_rlock(_base_tablet->get_migration_lock(), std::try_to_lock); if (!base_migration_rlock.owns_lock()) { return Status::Error( - "SchemaChangeHandler::_do_process_alter_tablet_v2 get lock failed"); + "SchemaChangeJob::_do_process_alter_tablet get lock failed"); } - std::shared_lock new_migration_rlock(new_tablet->get_migration_lock(), std::try_to_lock); + std::shared_lock new_migration_rlock(_new_tablet->get_migration_lock(), std::try_to_lock); if (!new_migration_rlock.owns_lock()) { return Status::Error( - "SchemaChangeHandler::_do_process_alter_tablet_v2 get lock failed"); + "SchemaChangeJob::_do_process_alter_tablet get lock failed"); } std::vector versions_to_be_changed; @@ -756,18 +764,12 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& std::vector return_columns; // Create a new tablet schema, should merge with dropped columns in light weight schema change TabletSchemaSPtr base_tablet_schema = std::make_shared(); - base_tablet_schema->copy_from(*base_tablet->tablet_schema()); - if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) { - base_tablet_schema->clear_columns(); - for (const auto& column : request.columns) { - base_tablet_schema->append_column(TabletColumn(column)); - } - } + // Use tablet schema directly from base tablet, they are the newest schema, not contain // dropped column during light weight schema change. // But the tablet schema in base tablet maybe not the latest from FE, so that if fe pass through // a tablet schema, then use request schema. - size_t num_cols = request.columns.empty() ? base_tablet->tablet_schema()->num_columns() + size_t num_cols = request.columns.empty() ? _base_tablet_schema->num_columns() : request.columns.size(); return_columns.resize(num_cols); for (int i = 0; i < num_cols; ++i) { @@ -777,16 +779,16 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& // begin to find deltas to convert from base tablet to new tablet so that // obtain base tablet and new tablet's push lock and header write lock to prevent loading data { - std::lock_guard base_tablet_lock(base_tablet->get_push_lock()); - std::lock_guard new_tablet_lock(new_tablet->get_push_lock()); - std::lock_guard base_tablet_wlock(base_tablet->get_header_lock()); + std::lock_guard base_tablet_lock(_base_tablet->get_push_lock()); + std::lock_guard new_tablet_lock(_new_tablet->get_push_lock()); + std::lock_guard base_tablet_wlock(_base_tablet->get_header_lock()); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); - std::lock_guard new_tablet_wlock(new_tablet->get_header_lock()); + std::lock_guard new_tablet_wlock(_new_tablet->get_header_lock()); do { RowsetSharedPtr max_rowset; // get history data to be converted and it will check if there is hold in base tablet - if (!_get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset)) { + if (!_get_versions_to_be_changed(&versions_to_be_changed, &max_rowset)) { LOG(WARNING) << "fail to get version to be changed. res=" << res; break; } @@ -802,10 +804,10 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& // before calculating version_to_be_changed, // remove all data from new tablet, prevent to rewrite data(those double pushed when wait) LOG(INFO) << "begin to remove all data from new tablet to prevent rewrite." - << " new_tablet=" << new_tablet->tablet_id(); + << " new_tablet=" << _new_tablet->tablet_id(); std::vector rowsets_to_delete; std::vector> version_rowsets; - new_tablet->acquire_version_and_rowsets(&version_rowsets); + _new_tablet->acquire_version_and_rowsets(&version_rowsets); std::sort(version_rowsets.begin(), version_rowsets.end(), [](const std::pair& l, const std::pair& r) { @@ -825,15 +827,15 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& } } std::vector empty_vec; - RETURN_IF_ERROR(new_tablet->modify_rowsets(empty_vec, rowsets_to_delete)); + RETURN_IF_ERROR(_new_tablet->modify_rowsets(empty_vec, rowsets_to_delete)); // inherit cumulative_layer_point from base_tablet // check if new_tablet.ce_point > base_tablet.ce_point? - new_tablet->set_cumulative_layer_point(-1); + _new_tablet->set_cumulative_layer_point(-1); // save tablet meta - new_tablet->save_meta(); + _new_tablet->save_meta(); for (auto& rowset : rowsets_to_delete) { // do not call rowset.remove directly, using gc thread to delete it - ExecEnv::GetInstance()->storage_engine().to_local().add_unused_rowset(rowset); + _local_storage_engine.add_unused_rowset(rowset); } // init one delete handler @@ -843,7 +845,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& // acquire data sources correspond to history versions RETURN_IF_ERROR( - base_tablet->capture_rs_readers_unlocked(versions_to_be_changed, &rs_splits)); + _base_tablet->capture_rs_readers_unlocked(versions_to_be_changed, &rs_splits)); if (rs_splits.empty()) { res = Status::Error( "fail to acquire all data sources. version_num={}, data_source_num={}", @@ -862,24 +864,24 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& res = delete_handler.init(base_tablet_schema, del_preds, end_version); if (!res) { LOG(WARNING) << "init delete handler failed. base_tablet=" - << base_tablet->tablet_id() << ", end_version=" << end_version; + << _base_tablet->tablet_id() << ", end_version=" << end_version; break; } reader_context.reader_type = ReaderType::READER_ALTER_TABLE; - reader_context.tablet_schema = base_tablet_schema; + reader_context.tablet_schema = _base_tablet_schema; reader_context.need_ordered_result = true; reader_context.delete_handler = &delete_handler; reader_context.return_columns = &return_columns; reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); - reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS; + reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; - reader_context.delete_bitmap = &base_tablet->tablet_meta()->delete_bitmap(); + reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); reader_context.version = Version(0, end_version); for (auto& rs_split : rs_splits) { res = rs_split.rs_reader->init(&reader_context); if (!res) { - LOG(WARNING) << "failed to init rowset reader: " << base_tablet->tablet_id(); + LOG(WARNING) << "failed to init rowset reader: " << _base_tablet->tablet_id(); break; } } @@ -894,19 +896,11 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& RETURN_IF_ERROR( DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl)); - sc_params.base_tablet = base_tablet; - sc_params.new_tablet = new_tablet; - // During a schema change, the extracted columns of a variant should not be included in the tablet schema. - // This is because the schema change for a variant needs to ignore the extracted columns. - // Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change, - // the complete variant is constructed by reading all the sub-columns of the variant. - sc_params.new_tablet_schema = new_tablet->tablet_schema()->copy_without_extracted_columns(); sc_params.ref_rowset_readers.reserve(rs_splits.size()); for (RowSetSplits& split : rs_splits) { sc_params.ref_rowset_readers.emplace_back(split.rs_reader); } sc_params.delete_handler = &delete_handler; - sc_params.base_tablet_schema = base_tablet_schema; sc_params.be_exec_version = request.be_exec_version; DCHECK(request.__isset.alter_tablet_type); switch (request.alter_tablet_type) { @@ -934,68 +928,68 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& } { std::lock_guard wrlock(_mutex); - _tablet_ids_in_converting.insert(new_tablet->tablet_id()); + _tablet_ids_in_converting.insert(_new_tablet->tablet_id()); } int64_t real_alter_version = 0; + sc_params.enable_unique_key_merge_on_write = _new_tablet->enable_unique_key_merge_on_write(); res = _convert_historical_rowsets(sc_params, &real_alter_version); { std::lock_guard wrlock(_mutex); - _tablet_ids_in_converting.erase(new_tablet->tablet_id()); + _tablet_ids_in_converting.erase(_new_tablet->tablet_id()); } if (!res) { break; } - if (new_tablet->keys_type() == UNIQUE_KEYS && - new_tablet->enable_unique_key_merge_on_write()) { - res = _calc_delete_bitmap_for_mow_table(new_tablet, real_alter_version); + if (_new_tablet->keys_type() == UNIQUE_KEYS && + _new_tablet->enable_unique_key_merge_on_write()) { + res = _calc_delete_bitmap_for_mow_table(real_alter_version); if (!res) { break; } } else { // set state to ready - std::lock_guard new_wlock(new_tablet->get_header_lock()); + std::lock_guard new_wlock(_new_tablet->get_header_lock()); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); - res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING); + res = _new_tablet->set_tablet_state(TabletState::TABLET_RUNNING); if (!res) { break; } - new_tablet->save_meta(); + _new_tablet->save_meta(); } } while (false); if (res) { // _validate_alter_result should be outside the above while loop. // to avoid requiring the header lock twice. - res = _validate_alter_result(new_tablet, request); + res = _validate_alter_result(request); } // if failed convert history data, then just remove the new tablet if (!res) { - LOG(WARNING) << "failed to alter tablet. base_tablet=" << base_tablet->tablet_id() - << ", drop new_tablet=" << new_tablet->tablet_id(); + LOG(WARNING) << "failed to alter tablet. base_tablet=" << _base_tablet->tablet_id() + << ", drop new_tablet=" << _new_tablet->tablet_id(); // do not drop the new tablet and its data. GC thread will } return res; } -bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) { +bool SchemaChangeJob::tablet_in_converting(int64_t tablet_id) { std::shared_lock rdlock(_mutex); return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end(); } -Status SchemaChangeHandler::_get_versions_to_be_changed( - TabletSharedPtr base_tablet, std::vector* versions_to_be_changed, +Status SchemaChangeJob::_get_versions_to_be_changed(std::vector* versions_to_be_changed, RowsetSharedPtr* max_rowset) { - RowsetSharedPtr rowset = base_tablet->get_rowset_with_max_version(); + RowsetSharedPtr rowset = _base_tablet->get_rowset_with_max_version(); if (rowset == nullptr) { return Status::Error("Tablet has no version. base_tablet={}", - base_tablet->tablet_id()); + _base_tablet->tablet_id()); } *max_rowset = rowset; - RETURN_IF_ERROR(base_tablet->capture_consistent_versions_unlocked( + RETURN_IF_ERROR(_base_tablet->capture_consistent_versions_unlocked( Version(0, rowset->version().second), versions_to_be_changed, false, false)); return Status::OK(); @@ -1003,11 +997,11 @@ Status SchemaChangeHandler::_get_versions_to_be_changed( // The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is // converted from a base tablet, only used for the mow table now. -Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params, +Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc_params, int64_t* real_alter_version) { LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet." - << " base_tablet=" << sc_params.base_tablet->tablet_id() - << ", new_tablet=" << sc_params.new_tablet->tablet_id(); + << " base_tablet=" << _base_tablet->tablet_id() + << ", new_tablet=" << _new_tablet->tablet_id(); // find end version int32_t end_version = -1; @@ -1017,35 +1011,36 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams } } - // Add filter information in change, and filter column information will be set in _parse_request + // Add filter information in change, and filter column information will be set in parse_request // And filter some data every time the row block changes - BlockChanger changer(sc_params.new_tablet_schema, *sc_params.desc_tbl); + BlockChanger changer(_new_tablet_schema, *sc_params.desc_tbl); bool sc_sorting = false; bool sc_directly = false; // a.Parse the Alter request and convert it into an internal representation - Status res = _parse_request(sc_params, &changer, &sc_sorting, &sc_directly); + Status res = parse_request(sc_params, _base_tablet_schema.get(), + _new_tablet_schema.get(), &changer, &sc_sorting, &sc_directly); LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting << ", sc_directly: " << sc_directly - << ", base_tablet=" << sc_params.base_tablet->tablet_id() - << ", new_tablet=" << sc_params.new_tablet->tablet_id(); + << ", base_tablet=" << _base_tablet->tablet_id() + << ", new_tablet=" << _new_tablet->tablet_id(); auto process_alter_exit = [&]() -> Status { { // save tablet meta here because rowset meta is not saved during add rowset - std::lock_guard new_wlock(sc_params.new_tablet->get_header_lock()); + std::lock_guard new_wlock(_new_tablet->get_header_lock()); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); - sc_params.new_tablet->save_meta(); + _new_tablet->save_meta(); } if (res) { Version test_version(0, end_version); - res = sc_params.new_tablet->check_version_integrity(test_version); + res = _new_tablet->check_version_integrity(test_version); } LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. " - << "base_tablet=" << sc_params.base_tablet->tablet_id() - << ", new_tablet=" << sc_params.new_tablet->tablet_id(); + << "base_tablet=" << _base_tablet->tablet_id() + << ", new_tablet=" << _new_tablet->tablet_id(); return res; }; @@ -1071,19 +1066,18 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams // set status for monitor // As long as there is a new_table as running, ref table is set as running // NOTE If the first sub_table fails first, it will continue to go as normal here - TabletSharedPtr new_tablet = sc_params.new_tablet; // When tablet create new rowset writer, it may change rowset type, in this case // linked schema change will not be used. RowsetWriterContext context; context.version = rs_reader->version(); context.rowset_state = VISIBLE; context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); - context.tablet_schema = sc_params.new_tablet_schema; - context.original_tablet_schema = sc_params.new_tablet_schema; + context.tablet_schema = _new_tablet_schema; + context.original_tablet_schema = _new_tablet_schema; context.newest_write_timestamp = rs_reader->newest_write_timestamp(); context.fs = rs_reader->rowset()->rowset_meta()->fs(); context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; - auto result = new_tablet->create_rowset_writer(context, false); + auto result = _new_tablet->create_rowset_writer(context, false); if (!result.has_value()) { res = Status::Error("create_rowset_writer failed, reason={}", result.error().to_string()); @@ -1091,11 +1085,11 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams } auto rowset_writer = std::move(result).value(); auto pending_rs_guard = - ExecEnv::GetInstance()->storage_engine().to_local().add_pending_rowset(context); + _local_storage_engine.add_pending_rowset(context); - if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet, - sc_params.base_tablet, sc_params.base_tablet_schema, - sc_params.new_tablet_schema); + if (res = sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, + _base_tablet, _base_tablet_schema, + _new_tablet_schema); !res) { LOG(WARNING) << "failed to process the version." << " version=" << rs_reader->version().first << "-" @@ -1104,28 +1098,28 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams } // Add the new version of the data to the header // In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table - std::lock_guard lock(sc_params.new_tablet->get_push_lock()); + std::lock_guard lock(_new_tablet->get_push_lock()); RowsetSharedPtr new_rowset; if (!(res = rowset_writer->build(new_rowset)).ok()) { LOG(WARNING) << "failed to build rowset, exit alter process"; return process_alter_exit(); } - res = sc_params.new_tablet->add_rowset(new_rowset); + res = _new_tablet->add_rowset(new_rowset); if (res.is()) { LOG(WARNING) << "version already exist, version revert occurred. " - << "tablet=" << sc_params.new_tablet->tablet_id() << ", version='" + << "tablet=" << _new_tablet->tablet_id() << ", version='" << rs_reader->version().first << "-" << rs_reader->version().second; - ExecEnv::GetInstance()->storage_engine().to_local().add_unused_rowset(new_rowset); + _local_storage_engine.add_unused_rowset(new_rowset); res = Status::OK(); } else if (!res) { LOG(WARNING) << "failed to register new version. " - << " tablet=" << sc_params.new_tablet->tablet_id() + << " tablet=" << _new_tablet->tablet_id() << ", version=" << rs_reader->version().first << "-" << rs_reader->version().second; - ExecEnv::GetInstance()->storage_engine().to_local().add_unused_rowset(new_rowset); + _local_storage_engine.add_unused_rowset(new_rowset); return process_alter_exit(); } else { - VLOG_NOTICE << "register new version. tablet=" << sc_params.new_tablet->tablet_id() + VLOG_NOTICE << "register new version. tablet=" << _new_tablet->tablet_id() << ", version=" << rs_reader->version().first << "-" << rs_reader->version().second; } @@ -1144,19 +1138,17 @@ static const std::string WHERE_SIGN_LOWER = to_lower("__DORIS_WHERE_SIGN__"); // @static // Analyze the mapping of the column and the mapping of the filter key -Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, +Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, + TabletSchema* base_tablet_schema, + TabletSchema* new_tablet_schema, BlockChanger* changer, bool* sc_sorting, bool* sc_directly) { changer->set_type(sc_params.alter_tablet_type); changer->set_compatible_version(sc_params.be_exec_version); - TabletSharedPtr base_tablet = sc_params.base_tablet; - TabletSharedPtr new_tablet = sc_params.new_tablet; - TabletSchemaSPtr base_tablet_schema = sc_params.base_tablet_schema; const std::unordered_map& materialized_function_map = sc_params.materialized_params_map; DescriptorTbl desc_tbl = *sc_params.desc_tbl; - TabletSchemaSPtr new_tablet_schema = sc_params.new_tablet_schema; // set column mapping for (int i = 0, new_schema_size = new_tablet_schema->num_columns(); i < new_schema_size; ++i) { @@ -1208,8 +1200,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, LOG(INFO) << "A column with default value will be added after schema changing. " << "column=" << new_column.name() - << ", default_value=" << new_column.default_value() << " to table " - << new_tablet->get_table_id(); + << ", default_value=" << new_column.default_value(); } if (materialized_function_map.contains(WHERE_SIGN_LOWER)) { @@ -1219,7 +1210,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, // If the reference sequence of the Key column is out of order, it needs to be reordered int num_default_value = 0; - for (int i = 0, new_schema_size = new_tablet->num_key_columns(); i < new_schema_size; ++i) { + for (int i = 0, new_schema_size = new_tablet_schema->num_key_columns(); i < new_schema_size; ++i) { ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i); if (!column_mapping->has_reference()) { @@ -1248,7 +1239,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, // old keys: A B C D // new keys: A B if (new_tablet_schema->keys_type() != KeysType::DUP_KEYS && - new_tablet->num_key_columns() < base_tablet_schema->num_key_columns()) { + new_tablet_schema->num_key_columns() < base_tablet_schema->num_key_columns()) { // this is a table with aggregate key type, and num of key columns in new schema // is less, which means the data in new tablet should be more aggregated. // so we use sorting schema change to sort and merge the data. @@ -1261,13 +1252,13 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, return Status::OK(); } - if (new_tablet->enable_unique_key_merge_on_write() && - new_tablet->num_key_columns() > base_tablet_schema->num_key_columns()) { + if (sc_params.enable_unique_key_merge_on_write && + new_tablet_schema->num_key_columns() > base_tablet_schema->num_key_columns()) { *sc_directly = true; return Status::OK(); } - if (base_tablet_schema->num_short_key_columns() != new_tablet->num_short_key_columns()) { + if (base_tablet_schema->num_short_key_columns() != new_tablet_schema->num_short_key_columns()) { // the number of short_keys changed, can't do linked schema change *sc_directly = true; return Status::OK(); @@ -1279,7 +1270,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, return Status::OK(); } - for (size_t i = 0; i < new_tablet->num_columns(); ++i) { + for (size_t i = 0; i < new_tablet_schema->num_columns(); ++i) { ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i); if (column_mapping->expr != nullptr) { *sc_directly = true; @@ -1302,7 +1293,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, return Status::OK(); } -Status SchemaChangeHandler::_init_column_mapping(ColumnMapping* column_mapping, +Status SchemaChangeJob::_init_column_mapping(ColumnMapping* column_mapping, const TabletColumn& column_schema, const std::string& value) { if (auto field = WrapperField::create(column_schema); field.has_value()) { @@ -1321,11 +1312,10 @@ Status SchemaChangeHandler::_init_column_mapping(ColumnMapping* column_mapping, return Status::OK(); } -Status SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet, - const TAlterTabletReqV2& request) { +Status SchemaChangeJob::_validate_alter_result(const TAlterTabletReqV2& request) { Version max_continuous_version = {-1, 0}; - new_tablet->max_continuous_version_from_beginning(&max_continuous_version); - LOG(INFO) << "find max continuous version of tablet=" << new_tablet->tablet_id() + _new_tablet->max_continuous_version_from_beginning(&max_continuous_version); + LOG(INFO) << "find max continuous version of tablet=" << _new_tablet->tablet_id() << ", start_version=" << max_continuous_version.first << ", end_version=" << max_continuous_version.second; if (max_continuous_version.second < request.alter_version) { @@ -1335,14 +1325,14 @@ Status SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet, std::vector> version_rowsets; { - std::shared_lock rdlock(new_tablet->get_header_lock()); - new_tablet->acquire_version_and_rowsets(&version_rowsets); + std::shared_lock rdlock(_new_tablet->get_header_lock()); + _new_tablet->acquire_version_and_rowsets(&version_rowsets); } for (auto& pair : version_rowsets) { RowsetSharedPtr rowset = pair.second; if (!rowset->check_file_exist()) { return Status::Error( - "SchemaChangeHandler::_validate_alter_result meet invalid rowset"); + "SchemaChangeJob::_validate_alter_result meet invalid rowset"); } } return Status::OK(); @@ -1358,57 +1348,65 @@ Status SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet, // incremental rowsets. // 4. Switch the tablet status to TABLET_RUNNING. The newly imported // data will calculate delete bitmap. -Status SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet, - int64_t alter_version) { - DBUG_EXECUTE_IF("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed", { +Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) { + DBUG_EXECUTE_IF("SchemaChangeJob._calc_delete_bitmap_for_mow_table.random_failed", { if (rand() % 100 < (100 * dp->param("percent", 0.1))) { - LOG_WARNING("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed"); + LOG_WARNING("SchemaChangeJob._calc_delete_bitmap_for_mow_table.random_failed"); return Status::InternalError("debug schema change calc delete bitmap random failed"); } }); // can't do compaction when calc delete bitmap, if the rowset being calculated does // a compaction, it may cause the delete bitmap to be missed. - std::lock_guard base_compaction_lock(new_tablet->get_base_compaction_lock()); - std::lock_guard cumu_compaction_lock(new_tablet->get_cumulative_compaction_lock()); + std::lock_guard base_compaction_lock(_new_tablet->get_base_compaction_lock()); + std::lock_guard cumu_compaction_lock(_new_tablet->get_cumulative_compaction_lock()); // step 2 - int64_t max_version = new_tablet->max_version().second; + int64_t max_version = _new_tablet->max_version().second; std::vector rowsets; if (alter_version < max_version) { LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " << "double write rowsets for version: " << alter_version + 1 << "-" << max_version - << " new_tablet=" << new_tablet->tablet_id(); - std::shared_lock rlock(new_tablet->get_header_lock()); - RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets_unlocked( + << " new_tablet=" << _new_tablet->tablet_id(); + std::shared_lock rlock(_new_tablet->get_header_lock()); + RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked( {alter_version + 1, max_version}, &rowsets)); } for (auto rowset_ptr : rowsets) { - std::lock_guard rwlock(new_tablet->get_rowset_update_lock()); - std::shared_lock rlock(new_tablet->get_header_lock()); - RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(new_tablet, rowset_ptr)); + std::lock_guard rwlock(_new_tablet->get_rowset_update_lock()); + std::shared_lock rlock(_new_tablet->get_header_lock()); + RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr)); } // step 3 - std::lock_guard rwlock(new_tablet->get_rowset_update_lock()); - std::lock_guard new_wlock(new_tablet->get_header_lock()); + std::lock_guard rwlock(_new_tablet->get_rowset_update_lock()); + std::lock_guard new_wlock(_new_tablet->get_header_lock()); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); - int64_t new_max_version = new_tablet->max_version_unlocked(); + int64_t new_max_version = _new_tablet->max_version_unlocked(); rowsets.clear(); if (max_version < new_max_version) { LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " << "incremental rowsets for version: " << max_version + 1 << "-" - << new_max_version << " new_tablet=" << new_tablet->tablet_id(); - RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets_unlocked( + << new_max_version << " new_tablet=" << _new_tablet->tablet_id(); + RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked( {max_version + 1, new_max_version}, &rowsets)); } for (auto&& rowset_ptr : rowsets) { - RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(new_tablet, rowset_ptr)); + RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr)); } // step 4 - RETURN_IF_ERROR(new_tablet->set_tablet_state(TabletState::TABLET_RUNNING)); - new_tablet->save_meta(); + RETURN_IF_ERROR(_new_tablet->set_tablet_state(TabletState::TABLET_RUNNING)); + _new_tablet->save_meta(); return Status::OK(); } +Status SchemaChangeJob::execute_schema_change_job(const TAlterTabletReqV2& request) { + Status st; + if (!config::is_cloud_mode()) { + SchemaChangeJob job(ExecEnv::GetInstance()->storage_engine().to_local(), request); + st = job.process_alter_tablet(request); + } + return st; +} + } // namespace doris diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 949c1d5514ca19d..adda1ce7bef8db2 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -44,7 +44,9 @@ #include "olap/rowset/rowset_reader.h" #include "olap/rowset/rowset_writer.h" #include "olap/rowset/segment_v2/inverted_index_writer.h" +#include "olap/storage_engine.h" #include "olap/tablet.h" +#include "olap/tablet_fwd.h" #include "olap/tablet_schema.h" #include "runtime/descriptors.h" #include "runtime/memory/mem_tracker.h" @@ -84,8 +86,8 @@ class BlockChanger { bool has_where() const { return _where_expr != nullptr; } private: - Status _check_cast_valid(vectorized::ColumnPtr ref_column, vectorized::ColumnPtr new_column, - AlterTabletType type) const; + static Status _check_cast_valid(vectorized::ColumnPtr ref_column, vectorized::ColumnPtr new_column, + AlterTabletType type); // @brief column-mapping specification of new schema SchemaMapping _schema_mapping; @@ -105,7 +107,7 @@ class SchemaChange { virtual ~SchemaChange() = default; virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSharedPtr base_tablet, + BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) { if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) { @@ -140,7 +142,7 @@ class SchemaChange { void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; } virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema, + BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) { return Status::NotSupported("inner process unsupported."); } @@ -167,16 +169,15 @@ class SchemaChange { class LinkedSchemaChange : public SchemaChange { public: - explicit LinkedSchemaChange(const BlockChanger& changer) : _changer(changer) {} + LinkedSchemaChange() = default; ~LinkedSchemaChange() override = default; Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSharedPtr base_tablet, + BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) override; private: - const BlockChanger& _changer; DISALLOW_COPY_AND_ASSIGN(LinkedSchemaChange); }; @@ -186,7 +187,7 @@ class VSchemaChangeDirectly : public SchemaChange { private: Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema, + BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) override; bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override { @@ -196,26 +197,31 @@ class VSchemaChangeDirectly : public SchemaChange { const BlockChanger& _changer; }; -// @breif schema change with sorting -class VSchemaChangeWithSorting : public SchemaChange { +class VBaseSchemaChangeWithSorting : public SchemaChange { public: - VSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation); - ~VSchemaChangeWithSorting() override = default; + VBaseSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation); + ~VBaseSchemaChangeWithSorting() override = default; -private: Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema, + BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) override; - Result> _internal_sorting( + virtual Result _internal_sorting( const std::vector>& blocks, const Version& temp_delta_versions, int64_t newest_write_timestamp, - TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, + BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema); Status _external_sorting(std::vector& src_rowsets, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSchemaSPtr new_tablet_schema); + BaseTabletSPtr new_tablet, TabletSchemaSPtr new_tablet_schema); +protected: + // for external sorting + // src_rowsets to store the rowset generated by internal sorting + std::vector _src_rowsets; + std::vector _pending_rs_guards; // just for local schema change + +private: bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override { return _changer.has_where() || SchemaChange::_check_row_nums(reader, writer); } @@ -226,72 +232,96 @@ class VSchemaChangeWithSorting : public SchemaChange { std::unique_ptr _mem_tracker; }; -class SchemaChangeHandler { +// @breif schema change with sorting +class VLocalSchemaChangeWithSorting : public VBaseSchemaChangeWithSorting { +public: + VLocalSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation + , StorageEngine& local_storage_engine) : + VBaseSchemaChangeWithSorting(changer, memory_limitation), + _local_storage_engine(local_storage_engine) {} + ~VLocalSchemaChangeWithSorting() override = default; + + Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) override; + + Result _internal_sorting( + const std::vector>& blocks, + const Version& temp_delta_versions, int64_t newest_write_timestamp, + BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, + SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) override; +private: + StorageEngine& _local_storage_engine; +}; + +struct AlterMaterializedViewParam { + std::string column_name; + std::string origin_column_name; + std::shared_ptr expr; +}; + +struct SchemaChangeParams { + AlterTabletType alter_tablet_type; + bool enable_unique_key_merge_on_write; + std::vector ref_rowset_readers; + DeleteHandler* delete_handler = nullptr; + std::unordered_map materialized_params_map; + DescriptorTbl* desc_tbl = nullptr; + ObjectPool pool; + int32_t be_exec_version; +}; + +class SchemaChangeJob { public: - // schema change v2, it will not set alter task in base tablet - static Status process_alter_tablet_v2(const TAlterTabletReqV2& request); + static Status execute_schema_change_job(const TAlterTabletReqV2& request); + SchemaChangeJob(StorageEngine& local_storage_engine, const TAlterTabletReqV2& request); + Status process_alter_tablet(const TAlterTabletReqV2& request); - static std::unique_ptr get_sc_procedure(const BlockChanger& changer, + std::unique_ptr get_sc_procedure(const BlockChanger& changer, bool sc_sorting, bool sc_directly) { if (sc_sorting) { - return std::make_unique( - changer, config::memory_limitation_per_thread_for_schema_change_bytes); + return std::make_unique(changer, + config::memory_limitation_per_thread_for_schema_change_bytes, _local_storage_engine); } if (sc_directly) { return std::make_unique(changer); } - return std::make_unique(changer); + return std::make_unique(); } - static bool tablet_in_converting(int64_t tablet_id); + bool tablet_in_converting(int64_t tablet_id); + + static Status parse_request(const SchemaChangeParams& sc_params, TabletSchema* base_tablet_schema, + TabletSchema* new_tablet_schema, BlockChanger* changer, + bool* sc_sorting, bool* sc_directly); private: - static Status _get_versions_to_be_changed(TabletSharedPtr base_tablet, - std::vector* versions_to_be_changed, + Status _get_versions_to_be_changed(std::vector* versions_to_be_changed, RowsetSharedPtr* max_rowset); - struct AlterMaterializedViewParam { - std::string column_name; - std::string origin_column_name; - std::shared_ptr expr; - }; - - struct SchemaChangeParams { - AlterTabletType alter_tablet_type; - TabletSharedPtr base_tablet; - TabletSharedPtr new_tablet; - TabletSchemaSPtr base_tablet_schema = nullptr; - TabletSchemaSPtr new_tablet_schema = nullptr; - std::vector ref_rowset_readers; - DeleteHandler* delete_handler = nullptr; - std::unordered_map materialized_params_map; - DescriptorTbl* desc_tbl = nullptr; - ObjectPool pool; - int32_t be_exec_version; - }; - - static Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request); - - static Status _validate_alter_result(TabletSharedPtr new_tablet, - const TAlterTabletReqV2& request); - - static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params, - int64_t* real_alter_version); - static Status _parse_request(const SchemaChangeParams& sc_params, BlockChanger* changer, - bool* sc_sorting, bool* sc_directly); + Status _do_process_alter_tablet(const TAlterTabletReqV2& request); + + Status _validate_alter_result(const TAlterTabletReqV2& request); + + Status _convert_historical_rowsets(const SchemaChangeParams& sc_params, + int64_t* real_alter_version); // Initialization Settings for creating a default value static Status _init_column_mapping(ColumnMapping* column_mapping, const TabletColumn& column_schema, const std::string& value); - static Status _calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet, - int64_t alter_version); + Status _calc_delete_bitmap_for_mow_table(int64_t alter_version); - static std::shared_mutex _mutex; - static std::unordered_set _tablet_ids_in_converting; - static std::set _supported_functions; + StorageEngine& _local_storage_engine; + TabletSharedPtr _base_tablet; + TabletSharedPtr _new_tablet; + TabletSchemaSPtr _base_tablet_schema; + TabletSchemaSPtr _new_tablet_schema; + std::shared_mutex _mutex; + std::unordered_set _tablet_ids_in_converting; + std::set _supported_functions; }; } // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 551b242238159e5..aea9dade3109f31 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -184,8 +184,6 @@ class Tablet final : public BaseTablet { std::shared_timed_mutex& get_migration_lock() { return _migration_lock; } - std::mutex& get_schema_change_lock() { return _schema_change_lock; } - std::mutex& get_build_inverted_index_lock() { return _build_inverted_index_lock; } // operation for compaction @@ -506,7 +504,6 @@ class Tablet final : public BaseTablet { std::mutex _ingest_lock; std::mutex _base_compaction_lock; std::mutex _cumulative_compaction_lock; - std::mutex _schema_change_lock; std::shared_timed_mutex _migration_lock; std::mutex _build_inverted_index_lock; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 79806c4703e58c7..e54cf99eac37cd8 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1231,6 +1231,17 @@ std::vector TabletSchema::get_indexes_for_column( return indexes_for_column; } +void TabletSchema::update_tablet_columns(const TabletSchema& tablet_schema, + const std::vector& t_columns) { + copy_from(tablet_schema); + if (!t_columns.empty() && t_columns[0].col_unique_id >= 0) { + clear_columns(); + for (const auto& column : t_columns) { + append_column(TabletColumn(column)); + } + } +} + bool TabletSchema::has_inverted_index(const TabletColumn& col) const { // TODO use more efficient impl int32_t col_unique_id = col.unique_id(); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index d4a1919101a130b..91193b998f9e407 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -401,6 +401,9 @@ class TabletSchema { std::shared_ptr copy_without_extracted_columns(); + void update_tablet_columns(const TabletSchema& tablet_schema, + const std::vector& t_columns); + private: friend bool operator==(const TabletSchema& a, const TabletSchema& b); friend bool operator!=(const TabletSchema& a, const TabletSchema& b); diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index 30c3e95809d766d..6f1998c548d78dd 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -48,7 +48,7 @@ Status EngineAlterTabletTask::execute() { DorisMetrics::instance()->create_rollup_requests_total->increment(1); Status res = Status::OK(); try { - res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req); + res = SchemaChangeJob::execute_schema_change_job(_alter_tablet_req); } catch (const Exception& e) { res = e.to_status(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 43ad9de5ffd1cd5..362d84caed85adb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; @@ -81,7 +82,7 @@ protected void unlock() { lock.unlock(); } - protected void addAlterJobV2(AlterJobV2 alterJob) { + protected void addAlterJobV2(AlterJobV2 alterJob) throws AnalysisException { this.alterJobsV2.put(alterJob.getJobId(), alterJob); LOG.info("add {} job {}", alterJob.getType(), alterJob.getJobId()); } @@ -262,7 +263,7 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce } // replay the alter job v2 - public void replayAlterJobV2(AlterJobV2 alterJob) { + public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException { AlterJobV2 existingJob = alterJobsV2.get(alterJob.getJobId()); if (existingJob == null) { // This is the first time to replay the alter job, so just using the replayed alterJob to call replay(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index a863b7775b0d161..01f529d5034fefd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -89,6 +89,8 @@ public enum JobType { protected long timeoutMs = -1; @SerializedName(value = "rawSql") protected String rawSql; + @SerializedName(value = "cloudClusterName") + protected String cloudClusterName = ""; // The job will wait all transactions before this txn id finished, then send the schema_change/rollup tasks. @SerializedName(value = "watershedTxnId") @@ -113,6 +115,22 @@ protected AlterJobV2(JobType type) { this.type = type; } + public String getCloudClusterName() { + return cloudClusterName; + } + + public void setCloudClusterName(final String clusterName) { + cloudClusterName = clusterName; + } + + protected void sleepSeveralSeconds() { + try { + Thread.sleep(10000); + } catch (InterruptedException ie) { + LOG.warn("ignore InterruptedException"); + } + } + public long getJobId() { return jobId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2Factory.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2Factory.java new file mode 100644 index 000000000000000..79503e7e6ec4ec1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2Factory.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.alter; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.qe.OriginStatement; + +import java.util.List; + +public class AlterJobV2Factory { + public static SchemaChangeJobV2 createSchemaChangeJobV2(String rawSql, long jobId, long dbId, + long tableId, String tableName, long timeoutMs) { + if (Config.isCloudMode()) { + return new CloudSchemaChangeJobV2(rawSql, jobId, dbId, tableId, tableName, timeoutMs); + } else { + return new SchemaChangeJobV2(rawSql, jobId, dbId, tableId, tableName, timeoutMs, false); + } + } + + public static RollupJobV2 createRollupJobV2(String rawSql, long jobId, long dbId, long tableId, + String tableName, long timeoutMs, long baseIndexId, + long rollupIndexId, String baseIndexName, String rollupIndexName, List rollupSchema, + Column whereColumn, + int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, + short rollupShortKeyColumnCount, + OriginStatement origStmt) throws AnalysisException { + if (Config.isCloudMode()) { + return new CloudRollupJobV2(rawSql, jobId, dbId, tableId, tableName, timeoutMs, baseIndexId, + rollupIndexId, baseIndexName, rollupIndexName, rollupSchema, whereColumn, + baseSchemaHash, rollupSchemaHash, rollupKeysType, rollupShortKeyColumnCount, origStmt); + } else { + return new RollupJobV2(rawSql, jobId, dbId, tableId, tableName, timeoutMs, baseIndexId, + rollupIndexId, baseIndexName, rollupIndexName, rollupSchema, whereColumn, + baseSchemaHash, rollupSchemaHash, rollupKeysType, rollupShortKeyColumnCount, origStmt, false); + } + } + + public static AlterJobV2 rebuildAlterJobV2(AlterJobV2 job) throws AnalysisException { + if (Config.isCloudMode()) { + if (job.getType() == AlterJobV2.JobType.SCHEMA_CHANGE && !((SchemaChangeJobV2) job).isCloudSchemaChange()) { + job = new CloudSchemaChangeJobV2((SchemaChangeJobV2) job); + } else if (job.getType() == AlterJobV2.JobType.ROLLUP && !((RollupJobV2) job).isCloudRollup()) { + job = new CloudRollupJobV2((RollupJobV2) job); + } + } + return job; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java new file mode 100644 index 000000000000000..b7b38d9670048d9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.alter; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.OlapTable.OlapTableState; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.cloud.datasource.CloudInternalCatalog; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.proto.OlapFile; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.task.AgentTask; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TTabletType; +import org.apache.doris.thrift.TTaskType; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class CloudRollupJobV2 extends RollupJobV2 { + private static final Logger LOG = LogManager.getLogger(CloudRollupJobV2.class); + + public CloudRollupJobV2(RollupJobV2 job) throws AnalysisException { + this(job.rawSql, job.jobId, job.dbId, job.tableId, job.tableName, job.timeoutMs, job.baseIndexId, + job.rollupIndexId, job.baseIndexName, job.rollupIndexName, job.rollupSchema, job.whereColumn, + job.baseSchemaHash, job.rollupSchemaHash, job.rollupKeysType, + job.rollupShortKeyColumnCount, job.origStmt); + } + + private CloudRollupJobV2() {} + + // Don't call it directly, use AlterJobV2Factory to replace + public CloudRollupJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs, + long baseIndexId, + long rollupIndexId, String baseIndexName, String rollupIndexName, List rollupSchema, + Column whereColumn, + int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, + short rollupShortKeyColumnCount, + OriginStatement origStmt) throws AnalysisException { + super(rawSql, jobId, dbId, tableId, tableName, timeoutMs, baseIndexId, + rollupIndexId, baseIndexName, rollupIndexName, rollupSchema, whereColumn, + baseSchemaHash, rollupSchemaHash, rollupKeysType, rollupShortKeyColumnCount, origStmt, true); + ConnectContext context = ConnectContext.get(); + if (context != null) { + LOG.debug("rollup job add cloud cluster, context not null, cluster: {}", context.getCloudCluster()); + setCloudClusterName(context.getCloudCluster()); + } + LOG.debug("rollup job add cloud cluster, context {}", context); + } + + @Override + protected void commitRollupIndex() throws AlterCancelException { + List rollupIndexList = new ArrayList(); + rollupIndexList.add(rollupIndexId); + try { + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .commitMaterializedIndex(tableId, rollupIndexList); + } catch (Exception e) { + LOG.warn("commitMaterializedIndex Exception:{}", e); + throw new AlterCancelException(e.getMessage()); + } + + LOG.info("commitRollupIndex finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}", + dbId, tableId, jobId, rollupIndexList); + } + + @Override + protected void postProcessRollupIndex() { + List rollupIndexList = new ArrayList(); + rollupIndexList.add(rollupIndexId); + long tryTimes = 1; + while (true) { + try { + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .dropMaterializedIndex(tableId, rollupIndexList); + break; + } catch (Exception e) { + LOG.warn("tryTimes:{}, postProcessRollupIndex exception:", tryTimes, e); + } + sleepSeveralSeconds(); + tryTimes++; + } + + LOG.info("postProcessRollupIndex finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}", + dbId, tableId, jobId, rollupIndexList); + } + + @Override + protected void createRollupReplica() throws AlterCancelException { + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); + + // 1. create rollup replicas + OlapTable tbl; + try { + tbl = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP); + } catch (MetaNotFoundException e) { + throw new AlterCancelException(e.getMessage()); + } + + long expiration = (createTimeMs + timeoutMs) / 1000; + tbl.readLock(); + try { + Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); + try { + List rollupIndexList = new ArrayList(); + rollupIndexList.add(rollupIndexId); + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .prepareMaterializedIndex(tbl.getId(), rollupIndexList, expiration); + for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { + long partitionId = entry.getKey(); + Partition partition = tbl.getPartition(partitionId); + if (partition == null) { + continue; + } + TTabletType tabletType = tbl.getPartitionInfo().getTabletType(partitionId); + MaterializedIndex rollupIndex = entry.getValue(); + Cloud.CreateTabletsRequest.Builder requestBuilder = + Cloud.CreateTabletsRequest.newBuilder(); + for (Tablet rollupTablet : rollupIndex.getTablets()) { + OlapFile.TabletMetaCloudPB.Builder builder = + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .createTabletMetaBuilder(tableId, rollupIndexId, + partitionId, rollupTablet, tabletType, rollupSchemaHash, + rollupKeysType, rollupShortKeyColumnCount, tbl.getCopiedBfColumns(), + tbl.getBfFpp(), null, rollupSchema, + tbl.getDataSortInfo(), tbl.getCompressionType(), tbl.getStoragePolicy(), + tbl.isInMemory(), true, + tbl.getName(), tbl.getTTLSeconds(), + tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), + tbl.getBaseSchemaVersion()); + requestBuilder.addTabletMetas(builder); + } // end for rollupTablets + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .sendCreateTabletsRpc(requestBuilder); + } + } catch (Exception e) { + LOG.warn("createCloudShadowIndexReplica Exception:{}", e); + throw new AlterCancelException(e.getMessage()); + } + } finally { + tbl.readUnlock(); + } + + // create all rollup replicas success. + // add rollup index to catalog + tbl.writeLockOrAlterCancelException(); + try { + Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); + addRollupIndexToCatalog(tbl); + } finally { + tbl.writeUnlock(); + } + } + + @Override + protected void checkCloudClusterName(List tasks) throws AlterCancelException { + if (((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(cloudClusterName) == null) { + for (AgentTask task : tasks) { + task.setFinished(true); + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); + } + StringBuilder sb = new StringBuilder("cloud cluster("); + sb.append(cloudClusterName); + sb.append(") has been removed, jobId="); + sb.append(jobId); + String msg = sb.toString(); + LOG.warn(msg); + throw new AlterCancelException(msg); + } + } + + @Override + protected boolean checkTableStable(Database db) throws AlterCancelException { + return true; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java new file mode 100644 index 000000000000000..efea337214e463f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.alter; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.OlapTable.OlapTableState; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.cloud.datasource.CloudInternalCatalog; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.proto.OlapFile; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.task.AgentTask; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TTaskType; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CloudSchemaChangeJobV2 extends SchemaChangeJobV2 { + private static final Logger LOG = LogManager.getLogger(SchemaChangeJobV2.class); + + public CloudSchemaChangeJobV2(SchemaChangeJobV2 job) { + this(job.rawSql, job.jobId, job.dbId, job.tableId, job.tableName, job.timeoutMs); + } + + public CloudSchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId, + String tableName, long timeoutMs) { + super(rawSql, jobId, dbId, tableId, tableName, timeoutMs, true); + ConnectContext context = ConnectContext.get(); + if (context != null) { + LOG.debug("schema change job add cloud cluster, context not null, cluster: {}", + context.getCloudCluster()); + setCloudClusterName(context.getCloudCluster()); + } + LOG.debug("schema change job add cloud cluster, context {}", context); + } + + private CloudSchemaChangeJobV2() {} + + @Override + protected void commitShadowIndex() throws AlterCancelException { + List shadowIdxList = + indexIdMap.keySet().stream().collect(Collectors.toList()); + try { + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .commitMaterializedIndex(tableId, + shadowIdxList); + } catch (Exception e) { + LOG.warn("commitMaterializedIndex exception:", e); + throw new AlterCancelException(e.getMessage()); + } + LOG.info("commitShadowIndex finished, dbId:{}, tableId:{}, jobId:{}, shadowIdxList:{}", + dbId, tableId, jobId, shadowIdxList); + } + + @Override + protected void postProcessShadowIndex() { + List shadowIdxList = indexIdMap.keySet().stream().collect(Collectors.toList()); + dropIndex(shadowIdxList); + } + + @Override + protected void postProcessOriginIndex() { + List originIdxList = indexIdMap.values().stream().collect(Collectors.toList()); + dropIndex(originIdxList); + } + + private void dropIndex(List idxList) { + int tryTimes = 1; + while (true) { + try { + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .dropMaterializedIndex(tableId, idxList); + break; + } catch (Exception e) { + LOG.warn("tryTimes:{}, dropIndex exception:", tryTimes, e); + } + sleepSeveralSeconds(); + tryTimes++; + } + + LOG.info("dropIndex finished, dbId:{}, tableId:{}, jobId:{}, IdxList:{}", + dbId, tableId, jobId, idxList); + } + + @Override + protected void createShadowIndexReplica() throws AlterCancelException { + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); + + // 1. create replicas + OlapTable tbl; + try { + tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); + } catch (MetaNotFoundException e) { + throw new AlterCancelException(e.getMessage()); + } + + Long expiration = (createTimeMs + timeoutMs) / 1000; + tbl.readLock(); + try { + Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); + try { + List shadowIdxList = indexIdMap.keySet().stream().collect(Collectors.toList()); + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .prepareMaterializedIndex(tableId, shadowIdxList, + expiration); + + for (long partitionId : partitionIndexMap.rowKeySet()) { + Partition partition = tbl.getPartition(partitionId); + if (partition == null) { + continue; + } + Map shadowIndexMap = partitionIndexMap.row(partitionId); + for (Map.Entry entry : shadowIndexMap.entrySet()) { + long shadowIdxId = entry.getKey(); + MaterializedIndex shadowIdx = entry.getValue(); + + short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId); + List shadowSchema = indexSchemaMap.get(shadowIdxId); + int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash; + int shadowSchemaVersion = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion; + long originIndexId = indexIdMap.get(shadowIdxId); + KeysType originKeysType = tbl.getKeysTypeByIndexId(originIndexId); + + Cloud.CreateTabletsRequest.Builder requestBuilder = + Cloud.CreateTabletsRequest.newBuilder(); + for (Tablet shadowTablet : shadowIdx.getTablets()) { + OlapFile.TabletMetaCloudPB.Builder builder = + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .createTabletMetaBuilder(tableId, shadowIdxId, + partitionId, shadowTablet, tbl.getPartitionInfo().getTabletType(partitionId), + shadowSchemaHash, originKeysType, shadowShortKeyColumnCount, bfColumns, + bfFpp, indexes, shadowSchema, tbl.getDataSortInfo(), tbl.getCompressionType(), + tbl.getStoragePolicy(), tbl.isInMemory(), true, + tbl.getName(), tbl.getTTLSeconds(), + tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), + shadowSchemaVersion); + requestBuilder.addTabletMetas(builder); + } // end for rollupTablets + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .sendCreateTabletsRpc(requestBuilder); + } + } + } catch (Exception e) { + LOG.warn("createCloudShadowIndexReplica Exception:", e); + throw new AlterCancelException(e.getMessage()); + } + + } finally { + tbl.readUnlock(); + } + + // create all replicas success. + // add all shadow indexes to catalog + tbl.writeLockOrAlterCancelException(); + try { + Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); + addShadowIndexToCatalog(tbl); + } finally { + tbl.writeUnlock(); + } + } + + @Override + protected void checkCloudClusterName(List tasks) throws AlterCancelException { + if (((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(cloudClusterName) == null) { + for (AgentTask task : tasks) { + task.setFinished(true); + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); + } + StringBuilder sb = new StringBuilder("cloud cluster("); + sb.append(cloudClusterName); + sb.append(") has been removed, jobId="); + sb.append(jobId); + String msg = sb.toString(); + LOG.warn(msg); + throw new AlterCancelException(msg); + } + } + + @Override + protected boolean checkTableStable(Database db) throws AlterCancelException { + return true; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 347da5f1a497d3b..1072bb89e202148 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -111,12 +111,13 @@ public MaterializedViewHandler() { private Map> tableRunningJobMap = new ConcurrentHashMap<>(); @Override - public void addAlterJobV2(AlterJobV2 alterJob) { + public void addAlterJobV2(AlterJobV2 alterJob) throws AnalysisException { + alterJob = AlterJobV2Factory.rebuildAlterJobV2(alterJob); super.addAlterJobV2(alterJob); addAlterJobV2ToTableNotFinalStateJobMap(alterJob); } - protected void batchAddAlterJobV2(List alterJobV2List) { + protected void batchAddAlterJobV2(List alterJobV2List) throws AnalysisException { for (AlterJobV2 alterJobV2 : alterJobV2List) { addAlterJobV2(alterJobV2); } @@ -368,7 +369,8 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri IdGeneratorBuffer idGeneratorBuffer = env.getIdGeneratorBuffer(bufferSize); long jobId = idGeneratorBuffer.getNextId(); long mvIndexId = idGeneratorBuffer.getNextId(); - RollupJobV2 mvJob = new RollupJobV2(rawSql, jobId, dbId, tableId, olapTable.getName(), timeoutMs, + RollupJobV2 mvJob = AlterJobV2Factory.createRollupJobV2( + rawSql, jobId, dbId, tableId, olapTable.getName(), timeoutMs, baseIndexId, mvIndexId, baseIndexName, mvName, mvColumns, whereColumn, baseSchemaHash, mvSchemaHash, mvKeysType, mvShortKeyColumnCount, origStmt); @@ -1076,7 +1078,8 @@ private void changeTableStatus(long dbId, long tableId, OlapTableState olapTable // replay the alter job v2 @Override - public void replayAlterJobV2(AlterJobV2 alterJob) { + public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException { + alterJob = AlterJobV2Factory.rebuildAlterJobV2(alterJob); super.replayAlterJobV2(alterJob); if (!alterJob.isDone()) { addAlterJobV2ToTableNotFinalStateJobMap(alterJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 4fe12d49b06a17a..cc2cd93a85a85f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -101,38 +101,41 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { @SerializedName(value = "partitionIdToBaseRollupTabletIdMap") private Map> partitionIdToBaseRollupTabletIdMap = Maps.newHashMap(); @SerializedName(value = "partitionIdToRollupIndex") - private Map partitionIdToRollupIndex = Maps.newHashMap(); + protected Map partitionIdToRollupIndex = Maps.newHashMap(); // rollup and base schema info @SerializedName(value = "baseIndexId") - private long baseIndexId; + protected long baseIndexId; @SerializedName(value = "rollupIndexId") - private long rollupIndexId; + protected long rollupIndexId; @SerializedName(value = "baseIndexName") - private String baseIndexName; + protected String baseIndexName; @SerializedName(value = "rollupIndexName") - private String rollupIndexName; + protected String rollupIndexName; @SerializedName(value = "rollupSchema") - private List rollupSchema = Lists.newArrayList(); + protected List rollupSchema = Lists.newArrayList(); @SerializedName(value = "whereColumn") - private Column whereColumn; + protected Column whereColumn; @SerializedName(value = "baseSchemaHash") - private int baseSchemaHash; + protected int baseSchemaHash; @SerializedName(value = "rollupSchemaHash") - private int rollupSchemaHash; + protected int rollupSchemaHash; @SerializedName(value = "rollupKeysType") - private KeysType rollupKeysType; + protected KeysType rollupKeysType; @SerializedName(value = "rollupShortKeyColumnCount") - private short rollupShortKeyColumnCount; + protected short rollupShortKeyColumnCount; @SerializedName(value = "origStmt") - private OriginStatement origStmt; + protected OriginStatement origStmt; // optional @SerializedName(value = "storageFormat") private TStorageFormat storageFormat = TStorageFormat.DEFAULT; + @SerializedName(value = "isCloudRollup") + protected boolean isCloudRollup = false; + // save all create rollup tasks private AgentBatchTask rollupBatchTask = new AgentBatchTask(); // save failed task after retry three times, tabletId -> agentTask @@ -140,17 +143,18 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { private Analyzer analyzer; - private RollupJobV2() { + protected RollupJobV2() { super(JobType.ROLLUP); } + // Don't call it directly, use AlterJobV2Factory to replace public RollupJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName, List rollupSchema, Column whereColumn, int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, short rollupShortKeyColumnCount, - OriginStatement origStmt) throws AnalysisException { + OriginStatement origStmt, boolean isCloudRollup) throws AnalysisException { super(rawSql, jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs); this.baseIndexId = baseIndexId; @@ -167,9 +171,14 @@ public RollupJobV2(String rawSql, long jobId, long dbId, long tableId, String ta this.rollupShortKeyColumnCount = rollupShortKeyColumnCount; this.origStmt = origStmt; + this.isCloudRollup = isCloudRollup; initAnalyzer(); } + public boolean isCloudRollup() { + return isCloudRollup; + } + public void addTabletIdMap(long partitionId, long rollupTabletId, long baseTabletId) { Map tabletIdMap = partitionIdToBaseRollupTabletIdMap .computeIfAbsent(partitionId, k -> Maps.newHashMap()); @@ -196,23 +205,9 @@ private void initAnalyzer() throws AnalysisException { analyzer = new Analyzer(Env.getCurrentEnv(), connectContext); } - /** - * runPendingJob(): - * 1. Create all rollup replicas and wait them finished. - * 2. After creating done, add this shadow rollup index to catalog, user can not see this - * rollup, but internal load process will generate data for this rollup index. - * 3. Get a new transaction id, then set job's state to WAITING_TXN - */ - @Override - protected void runPendingJob() throws Exception { - Preconditions.checkState(jobState == JobState.PENDING, jobState); - - LOG.info("begin to send create rollup replica tasks. job: {}", jobId); + protected void createRollupReplica() throws AlterCancelException { Database db = Env.getCurrentInternalCatalog() .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); - if (!checkTableStable(db)) { - return; - } // 1. create rollup replicas AgentBatchTask batchTask = new AgentBatchTask(); @@ -331,6 +326,26 @@ protected void runPendingJob() throws Exception { } finally { tbl.writeUnlock(); } + } + + /** + * runPendingJob(): + * 1. Create all rollup replicas and wait them finished. + * 2. After creating done, add this shadow rollup index to catalog, user can not see this + * rollup, but internal load process will generate data for this rollup index. + * 3. Get a new transaction id, then set job's state to WAITING_TXN + */ + @Override + protected void runPendingJob() throws Exception { + Preconditions.checkState(jobState == JobState.PENDING, jobState); + + LOG.info("begin to send create rollup replica tasks. job: {}", jobId); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); + if (!checkTableStable(db)) { + return; + } + createRollupReplica(); this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId(); this.jobState = JobState.WAITING_TXN; @@ -340,7 +355,7 @@ protected void runPendingJob() throws Exception { LOG.info("transfer rollup job {} state to {}, watershed txn id: {}", jobId, this.jobState, watershedTxnId); } - private void addRollupIndexToCatalog(OlapTable tbl) { + protected void addRollupIndexToCatalog(OlapTable tbl) { for (Partition partition : tbl.getPartitions()) { long partitionId = partition.getId(); MaterializedIndex rollupIndex = this.partitionIdToRollupIndex.get(partitionId); @@ -507,9 +522,11 @@ protected void runRunningJob() throws AlterCancelException { } catch (MetaNotFoundException e) { throw new AlterCancelException(e.getMessage()); } + LOG.debug("jobId:{}, cloudClusterName:{}", jobId, cloudClusterName); if (!rollupBatchTask.isFinished()) { LOG.info("rollup tasks not finished. job: {}", jobId); List tasks = rollupBatchTask.getUnfinishedTasks(2000); + checkCloudClusterName(tasks); for (AgentTask task : tasks) { if (task.getFailedTimes() > 0) { task.setFinished(true); @@ -574,7 +591,7 @@ protected void runRunningJob() throws AlterCancelException { } } // end for tablets } // end for partitions - + commitRollupIndex(); onFinished(tbl); } finally { tbl.writeUnlock(); @@ -629,6 +646,9 @@ protected boolean cancelImpl(String errMsg) { this.errMsg = errMsg; this.finishedTimeMs = System.currentTimeMillis(); Env.getCurrentEnv().getEditLog().logAlterJob(this); + // try best to drop roll index, when job is cancelled + postProcessRollupIndex(); + LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg); return true; } @@ -758,6 +778,8 @@ private void replayRunningJob(RollupJobV2 replayedJob) { */ private void replayCancelled(RollupJobV2 replayedJob) { cancelInternal(); + // try best to drop roll index, when job is cancelled + postProcessRollupIndex(); this.jobState = JobState.CANCELLED; this.finishedTimeMs = replayedJob.finishedTimeMs; this.errMsg = replayedJob.errMsg; @@ -879,6 +901,12 @@ public void gsonPostProcess() throws IOException { setColumnsDefineExpr(stmt.getMVColumnItemList()); } + protected void commitRollupIndex() throws AlterCancelException {} + + protected void postProcessRollupIndex() {} + + protected void checkCloudClusterName(List tasks) throws AlterCancelException {} + @Override public String toJson() { return GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index bf0d1fa8a1289dc..286b414121f5cd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1492,7 +1492,7 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map> indexSchemaMap, List indexes, List alterIndexes, boolean isDropIndex, long jobId, boolean isReplay) - throws DdlException { + throws DdlException, AnalysisException { LOG.debug("indexSchemaMap:{}, indexes:{}", indexSchemaMap, indexes); // for bitmapIndex @@ -2654,7 +2656,8 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o //for compatibility, we need create a finished state schema change job v2 - SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(rawSql, jobId, db.getId(), olapTable.getId(), + SchemaChangeJobV2 schemaChangeJob = AlterJobV2Factory.createSchemaChangeJobV2( + rawSql, jobId, db.getId(), olapTable.getId(), olapTable.getName(), 1000); for (Map.Entry> entry : changedIndexIdToSchema.entrySet()) { @@ -2717,7 +2720,8 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o } } - public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info) throws MetaNotFoundException { + public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info) + throws MetaNotFoundException, AnalysisException { LOG.debug("info:{}", info); long dbId = info.getDbId(); long tableId = info.getTableId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index af334c0a23e1805..32ca318835f6bab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -90,13 +90,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // partition id -> (shadow index id -> (shadow tablet id -> origin tablet id)) @SerializedName(value = "partitionIndexTabletMap") - private Table> partitionIndexTabletMap = HashBasedTable.create(); + protected Table> partitionIndexTabletMap = HashBasedTable.create(); // partition id -> (shadow index id -> shadow index)) @SerializedName(value = "partitionIndexMap") - private Table partitionIndexMap = HashBasedTable.create(); + protected Table partitionIndexMap = HashBasedTable.create(); // shadow index id -> origin index id @SerializedName(value = "indexIdMap") - private Map indexIdMap = Maps.newHashMap(); + protected Map indexIdMap = Maps.newHashMap(); // partition id -> origin index id @SerializedName(value = "partitionOriginIndexIdMap") private Map partitionOriginIndexIdMap = Maps.newHashMap(); @@ -105,42 +105,52 @@ public class SchemaChangeJobV2 extends AlterJobV2 { private Map indexIdToName = Maps.newHashMap(); // shadow index id -> index schema @SerializedName(value = "indexSchemaMap") - private Map> indexSchemaMap = Maps.newHashMap(); + protected Map> indexSchemaMap = Maps.newHashMap(); // shadow index id -> (shadow index schema version : schema hash) @SerializedName(value = "indexSchemaVersionAndHashMap") - private Map indexSchemaVersionAndHashMap = Maps.newHashMap(); + protected Map indexSchemaVersionAndHashMap = Maps.newHashMap(); // shadow index id -> shadow index short key count @SerializedName(value = "indexShortKeyMap") - private Map indexShortKeyMap = Maps.newHashMap(); + protected Map indexShortKeyMap = Maps.newHashMap(); // bloom filter info @SerializedName(value = "hasBfChange") private boolean hasBfChange; @SerializedName(value = "bfColumns") - private Set bfColumns = null; + protected Set bfColumns = null; @SerializedName(value = "bfFpp") - private double bfFpp = 0; + protected double bfFpp = 0; // alter index info @SerializedName(value = "indexChange") private boolean indexChange = false; @SerializedName(value = "indexes") - private List indexes = null; + protected List indexes = null; @SerializedName(value = "storageFormat") private TStorageFormat storageFormat = TStorageFormat.DEFAULT; + @SerializedName(value = "isCloudSchemaChange") + private boolean isCloudSchemaChange = false; + // save all schema change tasks private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask(); // save failed task after retry three times, tabletId -> agentTask private Map> failedAgentTasks = Maps.newHashMap(); - private SchemaChangeJobV2() { + protected SchemaChangeJobV2() { super(JobType.SCHEMA_CHANGE); } - public SchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs) { + // Don't call it directly, use AlterJobV2Factory to replace + public SchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs, + boolean isCloudSchemaChange) { super(rawSql, jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs); + this.isCloudSchemaChange = isCloudSchemaChange; + } + + public boolean isCloudSchemaChange() { + return isCloudSchemaChange; } public void addTabletIdMap(long partitionId, long shadowIdxId, long shadowTabletId, long originTabletId) { @@ -197,17 +207,7 @@ private void pruneMeta() { partitionOriginIndexIdMap.clear(); } - /** - * runPendingJob(): - * 1. Create all replicas of all shadow indexes and wait them finished. - * 2. After creating done, add the shadow indexes to catalog, user can not see this - * shadow index, but internal load process will generate data for these indexes. - * 3. Get a new transaction id, then set job's state to WAITING_TXN - */ - @Override - protected void runPendingJob() throws Exception { - Preconditions.checkState(jobState == JobState.PENDING, jobState); - LOG.info("begin to send create replica tasks. job: {}", jobId); + protected void createShadowIndexReplica() throws AlterCancelException { Database db = Env.getCurrentInternalCatalog() .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); @@ -299,7 +299,6 @@ protected void runPendingJob() throws Exception { } finally { tbl.readUnlock(); } - if (!FeConstants.runningUnitTest) { // send all tasks and wait them finished AgentTaskQueue.addBatchTask(batchTask); @@ -323,7 +322,7 @@ protected void runPendingJob() throws Exception { } else { // only show at most 3 results List subList = countDownLatch.getLeftMarks().stream().limit(3) - .map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")") + .map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")") .collect(Collectors.toList()); errMsg = "Error replicas:" + Joiner.on(", ").join(subList); } @@ -341,6 +340,27 @@ protected void runPendingJob() throws Exception { } finally { tbl.writeUnlock(); } + } + + /** + * runPendingJob(): + * 1. Create all replicas of all shadow indexes and wait them finished. + * 2. After creating done, add the shadow indexes to catalog, user can not see this + * shadow index, but internal load process will generate data for these indexes. + * 3. Get a new transaction id, then set job's state to WAITING_TXN + */ + @Override + protected void runPendingJob() throws Exception { + Preconditions.checkState(jobState == JobState.PENDING, jobState); + LOG.info("begin to send create replica tasks. job: {}", jobId); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); + + if (!checkTableStable(db)) { + return; + } + + createShadowIndexReplica(); this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId(); this.jobState = JobState.WAITING_TXN; @@ -351,7 +371,7 @@ protected void runPendingJob() throws Exception { jobId, this.jobState, watershedTxnId); } - private void addShadowIndexToCatalog(OlapTable tbl) { + protected void addShadowIndexToCatalog(OlapTable tbl) { for (long partitionId : partitionIndexMap.rowKeySet()) { Partition partition = tbl.getPartition(partitionId); if (partition == null) { @@ -515,6 +535,7 @@ protected void runRunningJob() throws AlterCancelException { if (!schemaChangeBatchTask.isFinished()) { LOG.info("schema change tasks not finished. job: {}", jobId); List tasks = schemaChangeBatchTask.getUnfinishedTasks(2000); + checkCloudClusterName(tasks); for (AgentTask task : tasks) { if (task.getFailedTimes() > 0) { task.setFinished(true); @@ -582,6 +603,7 @@ protected void runRunningJob() throws AlterCancelException { } // end for tablets } } // end for partitions + commitShadowIndex(); // all partitions are good onFinished(tbl); } finally { @@ -597,6 +619,8 @@ protected void runRunningJob() throws AlterCancelException { changeTableState(dbId, tableId, OlapTableState.NORMAL); LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId); + // try best to drop origin index + postProcessOriginIndex(); } private void waitWalFinished() { @@ -729,6 +753,8 @@ protected synchronized boolean cancelImpl(String errMsg) { changeTableState(dbId, tableId, OlapTableState.NORMAL); LOG.info("set table's state to NORMAL when cancel, table id: {}, job id: {}", tableId, jobId); + // try best to drop shadow index, when job is cancelled + postProcessShadowIndex(); return true; } @@ -852,6 +878,7 @@ private void replayRunningJob(SchemaChangeJobV2 replayedJob) { } } + postProcessOriginIndex(); jobState = JobState.FINISHED; this.finishedTimeMs = replayedJob.finishedTimeMs; LOG.info("replay finished schema change job: {} table id: {}", jobId, tableId); @@ -864,6 +891,8 @@ private void replayRunningJob(SchemaChangeJobV2 replayedJob) { */ private void replayCancelled(SchemaChangeJobV2 replayedJob) { cancelInternal(); + // try best to drop shadow index + postProcessShadowIndex(); this.jobState = JobState.CANCELLED; this.finishedTimeMs = replayedJob.finishedTimeMs; this.errMsg = replayedJob.errMsg; @@ -962,6 +991,14 @@ private void changeTableState(long dbId, long tableId, OlapTableState olapTableS } } + protected void commitShadowIndex() throws AlterCancelException {} + + protected void postProcessShadowIndex() {} + + protected void postProcessOriginIndex() {} + + protected void checkCloudClusterName(List tasks) throws AlterCancelException {} + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this, AlterJobV2.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 0d2b64feb647531..c9aae71d83733ac 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1982,7 +1982,8 @@ public long loadSyncJobs(DataInputStream dis, long checksum) throws IOException, return checksum; } - public long loadAlterJob(DataInputStream dis, long checksum) throws IOException { + public long loadAlterJob(DataInputStream dis, long checksum) + throws IOException, AnalysisException { long newChecksum = checksum; for (JobType type : JobType.values()) { newChecksum = loadAlterJob(dis, newChecksum, type); @@ -1991,7 +1992,8 @@ public long loadAlterJob(DataInputStream dis, long checksum) throws IOException return newChecksum; } - public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throws IOException { + public long loadAlterJob(DataInputStream dis, long checksum, JobType type) + throws IOException, AnalysisException { // alter jobs int size = dis.readInt(); long newChecksum = checksum ^ size; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 1a65faf5a94b7e7..130c0262e7d83cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -164,7 +164,7 @@ protected Partition createPartitionWithIndices(long dbId, long tableId, String t return partition; } - private OlapFile.TabletMetaCloudPB.Builder createTabletMetaBuilder(long tableId, long indexId, + public OlapFile.TabletMetaCloudPB.Builder createTabletMetaBuilder(long tableId, long indexId, long partitionId, Tablet tablet, TTabletType tabletType, int schemaHash, KeysType keysType, short shortKeyColumnCount, Set bfColumns, double bfFpp, List indexes, List schemaColumns, DataSortInfo dataSortInfo, TCompressionType compressionType, @@ -336,7 +336,7 @@ private void createCloudTablets(MaterializedIndex index, ReplicaState replicaSta protected void beforeCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) throws DdlException { if (partitionIds == null) { - prepareMaterializedIndex(tableId, indexIds); + prepareMaterializedIndex(tableId, indexIds, 0); } else { preparePartition(dbId, tableId, partitionIds, indexIds); } @@ -419,12 +419,13 @@ private void commitPartition(long tableId, List partitionIds, List i } } - private void prepareMaterializedIndex(Long tableId, List indexIds) throws DdlException { + // if `expiration` = 0, recycler will delete uncommitted indexes in `retention_seconds` + public void prepareMaterializedIndex(Long tableId, List indexIds, long expiration) throws DdlException { Cloud.IndexRequest.Builder indexRequestBuilder = Cloud.IndexRequest.newBuilder(); indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); indexRequestBuilder.addAllIndexIds(indexIds); indexRequestBuilder.setTableId(tableId); - indexRequestBuilder.setExpiration(0); + indexRequestBuilder.setExpiration(expiration); final Cloud.IndexRequest indexRequest = indexRequestBuilder.build(); Cloud.IndexResponse response = null; @@ -450,7 +451,7 @@ private void prepareMaterializedIndex(Long tableId, List indexIds) throws } } - private void commitMaterializedIndex(Long tableId, List indexIds) throws DdlException { + public void commitMaterializedIndex(Long tableId, List indexIds) throws DdlException { Cloud.IndexRequest.Builder indexRequestBuilder = Cloud.IndexRequest.newBuilder(); indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); indexRequestBuilder.addAllIndexIds(indexIds); @@ -480,7 +481,7 @@ private void commitMaterializedIndex(Long tableId, List indexIds) throws D } } - private void sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder requestBuilder) throws DdlException { + public void sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder requestBuilder) throws DdlException { requestBuilder.setCloudUniqueId(Config.cloud_unique_id); Cloud.CreateTabletsRequest createTabletsReq = requestBuilder.build(); @@ -625,7 +626,7 @@ private void dropCloudPartition(long dbId, long tableId, List partitionIds } } - private void dropMaterializedIndex(Long tableId, List indexIds) throws DdlException { + public void dropMaterializedIndex(Long tableId, List indexIds) throws DdlException { Cloud.IndexRequest.Builder indexRequestBuilder = Cloud.IndexRequest.newBuilder(); indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); indexRequestBuilder.addAllIndexIds(indexIds); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 7718355a7ae5861..e97ede95c6ef9f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -18,6 +18,8 @@ package org.apache.doris.persist.gson; import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.alter.CloudRollupJobV2; +import org.apache.doris.alter.CloudSchemaChangeJobV2; import org.apache.doris.alter.RollupJobV2; import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.catalog.AggStateType; @@ -199,7 +201,9 @@ public class GsonUtils { private static RuntimeTypeAdapterFactory alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory .of(AlterJobV2.class, "clazz") .registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName()) - .registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName()); + .registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName()) + .registerSubtype(CloudSchemaChangeJobV2.class, CloudSchemaChangeJobV2.class.getSimpleName()) + .registerSubtype(CloudRollupJobV2.class, CloudRollupJobV2.class.getSimpleName()); // runtime adapter for class "SyncJob" private static RuntimeTypeAdapterFactory syncJobTypeAdapterFactory = RuntimeTypeAdapterFactory diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index d4cdadf53bde2ec..2a47520f5c5e941 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -327,7 +327,8 @@ public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", ""); columns.add(column); - RollupJobV2 rollupJobV2 = new RollupJobV2("", 1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, null, 1, 1, + RollupJobV2 rollupJobV2 = AlterJobV2Factory.createRollupJobV2( + "", 1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, null, 1, 1, KeysType.AGG_KEYS, keysCount, new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test", 0)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 51b3bd3a8aaee0c..5e99599b041e201 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -399,7 +399,8 @@ public void testSerializeOfSchemaChangeJob() throws IOException { file.deleteOnExit(); DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); - SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2("", 1, 1, 1, "test", 600000); + SchemaChangeJobV2 schemaChangeJobV2 = AlterJobV2Factory.createSchemaChangeJobV2( + "", 1, 1, 1, "test", 600000); schemaChangeJobV2.setStorageFormat(TStorageFormat.V2); Deencapsulation.setField(schemaChangeJobV2, "jobState", AlterJobV2.JobState.FINISHED); Map indexSchemaVersionAndHashMap = Maps.newHashMap();