diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index e4d1c1912153ecb..346ebb6c74a6509 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -5,7 +5,6 @@ #include "cloud/cloud_tablet_mgr.h" #include "cloud/cloud_meta_mgr.h" -// #include "cloud/utils.h" #include "common/status.h" #include "olap/delete_handler.h" #include "olap/rowset/beta_rowset.h" @@ -17,7 +16,7 @@ #include "olap/tablet_meta.h" #include "service/backend_options.h" -namespace doris::cloud { +namespace doris { using namespace ErrorCode; static constexpr int ALTER_TABLE_BATCH_SIZE = 4096; @@ -332,4 +331,4 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam } return Status::OK(); } -} // namespace doris::cloud +} // namespace doris diff --git a/be/src/cloud/cloud_schema_change_job.h b/be/src/cloud/cloud_schema_change_job.h index 7dd89b56fb6259f..4286dcd94cc30ca 100644 --- a/be/src/cloud/cloud_schema_change_job.h +++ b/be/src/cloud/cloud_schema_change_job.h @@ -7,7 +7,7 @@ #include "cloud/cloud_tablet.h" #include "olap/tablet_fwd.h" -namespace doris::cloud { +namespace doris { class CloudSchemaChangeJob { public: @@ -28,8 +28,8 @@ class CloudSchemaChangeJob { TabletSchemaSPtr _new_tablet_schema; std::string _job_id; std::vector _output_rowsets; - int64_t _output_cumulative_point; + int64_t _output_cumulative_point = 0; int64_t _expiration; }; -} // namespace doris::cloud +} // namespace doris diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index c32054dd7b94b7c..a078f6424693780 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -26,6 +26,7 @@ #include #include +#include "cloud/cloud_schema_change_job.h" #include "cloud/config.h" #include "common/logging.h" #include "common/signal_handler.h" @@ -505,6 +506,7 @@ Status VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema, TabletSchemaSPtr new_tablet_schema) { + LOG_INFO("lightman VBaseSchemaChangeWithSorting::_inner_process"); // for internal sorting std::vector> blocks; @@ -512,7 +514,7 @@ Status VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap(); int64_t newest_write_timestamp = rowset->newest_write_timestamp(); _temp_delta_versions.first = _temp_delta_versions.second; - + _src_rowsets.clear(); // init _src_rowsets auto create_rowset = [&]() -> Status { if (blocks.empty()) { return Status::OK(); @@ -586,6 +588,37 @@ Result VBaseSchemaChangeWithSorting::_internal_sorting( const std::vector>& blocks, const Version& version, int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) { + LOG_INFO("lightman VBaseSchemaChangeWithSorting::_inner_process"); + uint64_t merged_rows = 0; + MultiBlockMerger merger(new_tablet); + RowsetWriterContext context; + context.version = version; + context.rowset_state = VISIBLE; + context.segments_overlap = segments_overlap; + context.tablet_schema = new_tablet_schema; + context.original_tablet_schema = new_tablet_schema; + context.newest_write_timestamp = newest_write_timestamp; + context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; + std::unique_ptr rowset_writer; + // TODO(plat1ko): Use monad op + if (auto result = new_tablet->create_rowset_writer(context, false); !result.has_value()) + [[unlikely]] { + return unexpected(std::move(result).error()); + } else { + rowset_writer = std::move(result).value(); + } + RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows)); + _add_merged_rows(merged_rows); + RowsetSharedPtr rowset; + RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset)); + return rowset; +} + +Result VLocalSchemaChangeWithSorting::_internal_sorting( + const std::vector>& blocks, const Version& version, + int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, + SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) { + uint64_t merged_rows = 0; MultiBlockMerger merger(new_tablet); RowsetWriterContext context; @@ -604,7 +637,10 @@ Result VBaseSchemaChangeWithSorting::_internal_sorting( } else { rowset_writer = std::move(result).value(); } + auto guard = _local_storage_engine.pending_local_rowsets().add(context.rowset_id); + _pending_rs_guards.push_back(std::move(guard)); RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows)); + LOG_INFO("lightman _internal_sorting").tag("merged_rows", merged_rows); _add_merged_rows(merged_rows); RowsetSharedPtr rowset; RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset)); @@ -625,6 +661,8 @@ Status VBaseSchemaChangeWithSorting::_external_sorting(vector& Merger::Statistics stats; RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, ReaderType::READER_ALTER_TABLE, *new_tablet_schema, rs_readers, rowset_writer, &stats)); + LOG_INFO("lightman _external_sorting").tag("merged_rows", stats.merged_rows) + .tag("src_rowsets len", src_rowsets.size()); _add_merged_rows(stats.merged_rows); _add_filtered_rows(stats.filtered_rows); return Status::OK(); @@ -639,24 +677,12 @@ Status VLocalSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowse _local_storage_engine.add_unused_rowset(row_set); } }}; + _pending_rs_guards.clear(); + LOG_INFO("lightman VLocalSchemaChangeWithSorting::_inner_process"); return VBaseSchemaChangeWithSorting::_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema, new_tablet_schema); } -Result VLocalSchemaChangeWithSorting::_internal_sorting( - const std::vector>& blocks, - const Version& temp_delta_versions, int64_t newest_write_timestamp, - BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, - SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) { - auto res = VBaseSchemaChangeWithSorting::_internal_sorting(blocks, temp_delta_versions, newest_write_timestamp, - new_tablet, new_rowset_type, segments_overlap, new_tablet_schema); - if (res.has_value()) { - auto guard = _local_storage_engine.pending_local_rowsets().add(res.value()->rowset_id()); - _pending_rs_guards.push_back(std::move(guard)); - } - return res; -} - Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { if (!request.__isset.desc_tbl) { return Status::Error( @@ -762,8 +788,6 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques // delete handlers for new tablet DeleteHandler delete_handler; std::vector return_columns; - // Create a new tablet schema, should merge with dropped columns in light weight schema change - TabletSchemaSPtr base_tablet_schema = std::make_shared(); // Use tablet schema directly from base tablet, they are the newest schema, not contain // dropped column during light weight schema change. @@ -858,10 +882,10 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques if (!rs_meta->has_delete_predicate() || rs_meta->start_version() > end_version) { continue; } - base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema()); + _base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema()); del_preds.push_back(rs_meta); } - res = delete_handler.init(base_tablet_schema, del_preds, end_version); + res = delete_handler.init(_base_tablet_schema, del_preds, end_version); if (!res) { LOG(WARNING) << "init delete handler failed. base_tablet=" << _base_tablet->tablet_id() << ", end_version=" << end_version; @@ -1060,7 +1084,7 @@ Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc // c.Convert historical data for (const auto& rs_reader : sc_params.ref_rowset_readers) { - VLOG_TRACE << "begin to convert a history rowset. version=" << rs_reader->version().first + LOG(INFO) << "lightman begin to convert a history rowset. version=" << rs_reader->version().first << "-" << rs_reader->version().second; // set status for monitor @@ -1402,7 +1426,12 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) Status SchemaChangeJob::execute_schema_change_job(const TAlterTabletReqV2& request) { Status st; - if (!config::is_cloud_mode()) { + if (config::is_cloud_mode()) { + DCHECK(request.__isset.job_id); + CloudSchemaChangeJob job(ExecEnv::GetInstance()->storage_engine().to_cloud(), + std::to_string(request.job_id), request.expiration); + st = job.process_alter_tablet(request); + } else { SchemaChangeJob job(ExecEnv::GetInstance()->storage_engine().to_local(), request); st = job.process_alter_tablet(request); } diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index adda1ce7bef8db2..c93a9b3b076eaa4 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -117,7 +117,8 @@ class SchemaChange { _filtered_rows = 0; _merged_rows = 0; - + LOG_INFO("lightman SchemaChange::process").tag("base_tablet id", base_tablet->tablet_id()) + .tag("new_tablet id", new_tablet->tablet_id()); RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema, new_tablet_schema)); @@ -219,7 +220,6 @@ class VBaseSchemaChangeWithSorting : public SchemaChange { // for external sorting // src_rowsets to store the rowset generated by internal sorting std::vector _src_rowsets; - std::vector _pending_rs_guards; // just for local schema change private: bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override { @@ -233,7 +233,8 @@ class VBaseSchemaChangeWithSorting : public SchemaChange { }; // @breif schema change with sorting -class VLocalSchemaChangeWithSorting : public VBaseSchemaChangeWithSorting { +// Mixin for local StorageEngine +class VLocalSchemaChangeWithSorting final : public VBaseSchemaChangeWithSorting { public: VLocalSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation , StorageEngine& local_storage_engine) : @@ -252,6 +253,7 @@ class VLocalSchemaChangeWithSorting : public VBaseSchemaChangeWithSorting { SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) override; private: StorageEngine& _local_storage_engine; + std::vector _pending_rs_guards; }; struct AlterMaterializedViewParam {