Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Feb 20, 2024
1 parent 9250c49 commit c3677da
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 29 deletions.
32 changes: 13 additions & 19 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,19 +687,21 @@ Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {
"desc_tbl is not set. Maybe the FE version is not equal to the BE "
"version.");
}
if (_base_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("fail to find base tablet. base_tablet={}",
request.base_tablet_id);
}
if (_new_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("fail to find new tablet. new_tablet={}",
request.new_tablet_id);
}

LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
<< ", new_tablet_id=" << request.new_tablet_id
<< ", alter_version=" << request.alter_version;

TabletSharedPtr base_tablet =
_local_storage_engine.tablet_manager()->get_tablet(request.base_tablet_id);
if (base_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("fail to find base tablet. base_tablet={}",
request.base_tablet_id);
}
// Lock schema_change_lock util schema change info is stored in tablet header
std::unique_lock<std::mutex> schema_change_lock(base_tablet->get_schema_change_lock(),
std::unique_lock<std::mutex> schema_change_lock(_base_tablet->get_schema_change_lock(),
std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>("failed to obtain schema change lock. base_tablet={}",
Expand All @@ -716,8 +718,8 @@ SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine,
: _local_storage_engine(local_storage_engine) {
_base_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.base_tablet_id);
_new_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.new_tablet_id);
_base_tablet_schema = std::make_shared<TabletSchema>();
if (_base_tablet && _new_tablet) {
_base_tablet_schema = std::make_shared<TabletSchema>();
_base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns);
// During a schema change, the extracted columns of a variant should not be included in the tablet schema.
// This is because the schema change for a variant needs to ignore the extracted columns.
Expand All @@ -733,15 +735,7 @@ SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine,
// The admin should upgrade all BE and then upgrade FE.
// Should delete the old code after upgrade finished.
Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& request) {
if (_base_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("fail to find base tablet. base_tablet={}",
request.base_tablet_id);
}
if (_new_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("fail to find new tablet. new_tablet={}",
request.new_tablet_id);
}
Status res = Status::OK();
Status res;
signal::tablet_id = _base_tablet->get_table_id();

// check if tablet's state is not_ready, if it is ready, it means the tablet already finished
Expand Down Expand Up @@ -1078,7 +1072,7 @@ Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc
}

// b. Generate historical data converter
auto sc_procedure = get_sc_procedure(changer, sc_sorting, sc_directly);
auto sc_procedure = _get_sc_procedure(changer, sc_sorting, sc_directly);

// c.Convert historical data
for (const auto& rs_reader : sc_params.ref_rowset_readers) {
Expand Down Expand Up @@ -1420,7 +1414,7 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version)
return Status::OK();
}

Status SchemaChangeJob::execute_schema_change_job(const TAlterTabletReqV2& request) {
Status execute_schema_change_job(const TAlterTabletReqV2& request) {
Status st;
if (config::is_cloud_mode()) {
DCHECK(request.__isset.job_id);
Expand Down
19 changes: 10 additions & 9 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,21 @@ struct SchemaChangeParams {
int32_t be_exec_version;
};

Status execute_schema_change_job(const TAlterTabletReqV2& request);

class SchemaChangeJob {
public:
static Status execute_schema_change_job(const TAlterTabletReqV2& request);
SchemaChangeJob(StorageEngine& local_storage_engine, const TAlterTabletReqV2& request);
Status process_alter_tablet(const TAlterTabletReqV2& request);

std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer, bool sc_sorting,
bool tablet_in_converting(int64_t tablet_id);

static Status parse_request(const SchemaChangeParams& sc_params,
TabletSchema* base_tablet_schema, TabletSchema* new_tablet_schema,
BlockChanger* changer, bool* sc_sorting, bool* sc_directly);

private:
std::unique_ptr<SchemaChange> _get_sc_procedure(const BlockChanger& changer, bool sc_sorting,
bool sc_directly) {
if (sc_sorting) {
return std::make_unique<VLocalSchemaChangeWithSorting>(
Expand All @@ -296,13 +304,6 @@ class SchemaChangeJob {
return std::make_unique<LinkedSchemaChange>();
}

bool tablet_in_converting(int64_t tablet_id);

static Status parse_request(const SchemaChangeParams& sc_params,
TabletSchema* base_tablet_schema, TabletSchema* new_tablet_schema,
BlockChanger* changer, bool* sc_sorting, bool* sc_directly);

private:
Status _get_versions_to_be_changed(std::vector<Version>* versions_to_be_changed,
RowsetSharedPtr* max_rowset);

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_alter_tablet_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Status EngineAlterTabletTask::execute() {
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
Status res = Status::OK();
try {
res = SchemaChangeJob::execute_schema_change_job(_alter_tablet_req);
res = execute_schema_change_job(_alter_tablet_req);
} catch (const Exception& e) {
res = e.to_status();
}
Expand Down

0 comments on commit c3677da

Please sign in to comment.