Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Feb 19, 2024
1 parent 0e5542d commit 53582fa
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 30 deletions.
5 changes: 2 additions & 3 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_meta_mgr.h"
// #include "cloud/utils.h"
#include "common/status.h"
#include "olap/delete_handler.h"
#include "olap/rowset/beta_rowset.h"
Expand All @@ -17,7 +16,7 @@
#include "olap/tablet_meta.h"
#include "service/backend_options.h"

namespace doris::cloud {
namespace doris {
using namespace ErrorCode;

static constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
Expand Down Expand Up @@ -332,4 +331,4 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}
return Status::OK();
}
} // namespace doris::cloud
} // namespace doris
6 changes: 3 additions & 3 deletions be/src/cloud/cloud_schema_change_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "cloud/cloud_tablet.h"
#include "olap/tablet_fwd.h"

namespace doris::cloud {
namespace doris {

class CloudSchemaChangeJob {
public:
Expand All @@ -28,8 +28,8 @@ class CloudSchemaChangeJob {
TabletSchemaSPtr _new_tablet_schema;
std::string _job_id;
std::vector<RowsetSharedPtr> _output_rowsets;
int64_t _output_cumulative_point;
int64_t _output_cumulative_point = 0;
int64_t _expiration;
};

} // namespace doris::cloud
} // namespace doris
71 changes: 50 additions & 21 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <tuple>
#include <utility>

#include "cloud/cloud_schema_change_job.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "common/signal_handler.h"
Expand Down Expand Up @@ -505,14 +506,15 @@ Status VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset
BaseTabletSPtr new_tablet,
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
LOG_INFO("lightman VBaseSchemaChangeWithSorting::_inner_process");
// for internal sorting
std::vector<std::unique_ptr<vectorized::Block>> blocks;

RowsetSharedPtr rowset = rowset_reader->rowset();
SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
int64_t newest_write_timestamp = rowset->newest_write_timestamp();
_temp_delta_versions.first = _temp_delta_versions.second;

_src_rowsets.clear(); // init _src_rowsets
auto create_rowset = [&]() -> Status {
if (blocks.empty()) {
return Status::OK();
Expand Down Expand Up @@ -586,6 +588,37 @@ Result<RowsetSharedPtr> VBaseSchemaChangeWithSorting::_internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) {
LOG_INFO("lightman VBaseSchemaChangeWithSorting::_inner_process");
uint64_t merged_rows = 0;
MultiBlockMerger merger(new_tablet);
RowsetWriterContext context;
context.version = version;
context.rowset_state = VISIBLE;
context.segments_overlap = segments_overlap;
context.tablet_schema = new_tablet_schema;
context.original_tablet_schema = new_tablet_schema;
context.newest_write_timestamp = newest_write_timestamp;
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
std::unique_ptr<RowsetWriter> rowset_writer;
// TODO(plat1ko): Use monad op
if (auto result = new_tablet->create_rowset_writer(context, false); !result.has_value())
[[unlikely]] {
return unexpected(std::move(result).error());
} else {
rowset_writer = std::move(result).value();
}
RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows));
_add_merged_rows(merged_rows);
RowsetSharedPtr rowset;
RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset));
return rowset;
}

Result<RowsetSharedPtr> VLocalSchemaChangeWithSorting::_internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) {

uint64_t merged_rows = 0;
MultiBlockMerger merger(new_tablet);
RowsetWriterContext context;
Expand All @@ -604,7 +637,10 @@ Result<RowsetSharedPtr> VBaseSchemaChangeWithSorting::_internal_sorting(
} else {
rowset_writer = std::move(result).value();
}
auto guard = _local_storage_engine.pending_local_rowsets().add(context.rowset_id);
_pending_rs_guards.push_back(std::move(guard));
RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows));
LOG_INFO("lightman _internal_sorting").tag("merged_rows", merged_rows);
_add_merged_rows(merged_rows);
RowsetSharedPtr rowset;
RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset));
Expand All @@ -625,6 +661,8 @@ Status VBaseSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>&
Merger::Statistics stats;
RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, ReaderType::READER_ALTER_TABLE,
*new_tablet_schema, rs_readers, rowset_writer, &stats));
LOG_INFO("lightman _external_sorting").tag("merged_rows", stats.merged_rows)
.tag("src_rowsets len", src_rowsets.size());
_add_merged_rows(stats.merged_rows);
_add_filtered_rows(stats.filtered_rows);
return Status::OK();
Expand All @@ -639,24 +677,12 @@ Status VLocalSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowse
_local_storage_engine.add_unused_rowset(row_set);
}
}};
_pending_rs_guards.clear();
LOG_INFO("lightman VLocalSchemaChangeWithSorting::_inner_process");
return VBaseSchemaChangeWithSorting::_inner_process(rowset_reader, rowset_writer, new_tablet,
base_tablet_schema, new_tablet_schema);
}

Result<RowsetSharedPtr> VLocalSchemaChangeWithSorting::_internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
const Version& temp_delta_versions, int64_t newest_write_timestamp,
BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) {
auto res = VBaseSchemaChangeWithSorting::_internal_sorting(blocks, temp_delta_versions, newest_write_timestamp,
new_tablet, new_rowset_type, segments_overlap, new_tablet_schema);
if (res.has_value()) {
auto guard = _local_storage_engine.pending_local_rowsets().add(res.value()->rowset_id());
_pending_rs_guards.push_back(std::move(guard));
}
return res;
}

Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {
if (!request.__isset.desc_tbl) {
return Status::Error<INVALID_ARGUMENT>(
Expand Down Expand Up @@ -762,8 +788,6 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques
// delete handlers for new tablet
DeleteHandler delete_handler;
std::vector<ColumnId> return_columns;
// Create a new tablet schema, should merge with dropped columns in light weight schema change
TabletSchemaSPtr base_tablet_schema = std::make_shared<TabletSchema>();

// Use tablet schema directly from base tablet, they are the newest schema, not contain
// dropped column during light weight schema change.
Expand Down Expand Up @@ -858,10 +882,10 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques
if (!rs_meta->has_delete_predicate() || rs_meta->start_version() > end_version) {
continue;
}
base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
_base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
del_preds.push_back(rs_meta);
}
res = delete_handler.init(base_tablet_schema, del_preds, end_version);
res = delete_handler.init(_base_tablet_schema, del_preds, end_version);
if (!res) {
LOG(WARNING) << "init delete handler failed. base_tablet="
<< _base_tablet->tablet_id() << ", end_version=" << end_version;
Expand Down Expand Up @@ -1060,7 +1084,7 @@ Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc

// c.Convert historical data
for (const auto& rs_reader : sc_params.ref_rowset_readers) {
VLOG_TRACE << "begin to convert a history rowset. version=" << rs_reader->version().first
LOG(INFO) << "lightman begin to convert a history rowset. version=" << rs_reader->version().first
<< "-" << rs_reader->version().second;

// set status for monitor
Expand Down Expand Up @@ -1402,7 +1426,12 @@ Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version)

Status SchemaChangeJob::execute_schema_change_job(const TAlterTabletReqV2& request) {
Status st;
if (!config::is_cloud_mode()) {
if (config::is_cloud_mode()) {
DCHECK(request.__isset.job_id);
CloudSchemaChangeJob job(ExecEnv::GetInstance()->storage_engine().to_cloud(),
std::to_string(request.job_id), request.expiration);
st = job.process_alter_tablet(request);
} else {
SchemaChangeJob job(ExecEnv::GetInstance()->storage_engine().to_local(), request);
st = job.process_alter_tablet(request);
}
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class SchemaChange {

_filtered_rows = 0;
_merged_rows = 0;

LOG_INFO("lightman SchemaChange::process").tag("base_tablet id", base_tablet->tablet_id())
.tag("new_tablet id", new_tablet->tablet_id());
RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema,
new_tablet_schema));

Expand Down Expand Up @@ -219,7 +220,6 @@ class VBaseSchemaChangeWithSorting : public SchemaChange {
// for external sorting
// src_rowsets to store the rowset generated by internal sorting
std::vector<RowsetSharedPtr> _src_rowsets;
std::vector<PendingRowsetGuard> _pending_rs_guards; // just for local schema change

private:
bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override {
Expand All @@ -233,7 +233,8 @@ class VBaseSchemaChangeWithSorting : public SchemaChange {
};

// @breif schema change with sorting
class VLocalSchemaChangeWithSorting : public VBaseSchemaChangeWithSorting {
// Mixin for local StorageEngine
class VLocalSchemaChangeWithSorting final : public VBaseSchemaChangeWithSorting {
public:
VLocalSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation
, StorageEngine& local_storage_engine) :
Expand All @@ -252,6 +253,7 @@ class VLocalSchemaChangeWithSorting : public VBaseSchemaChangeWithSorting {
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) override;
private:
StorageEngine& _local_storage_engine;
std::vector<PendingRowsetGuard> _pending_rs_guards;
};

struct AlterMaterializedViewParam {
Expand Down

0 comments on commit 53582fa

Please sign in to comment.