Skip to content

Commit

Permalink
[Feature] Support to do schema change in cloud mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Feb 18, 2024
1 parent 18737bf commit 0e5542d
Show file tree
Hide file tree
Showing 25 changed files with 1,371 additions and 366 deletions.
335 changes: 335 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions be/src/cloud/cloud_schema_change_job.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <memory>
#include "cloud/cloud_storage_engine.h"
#include "olap/rowset/rowset.h"
#include "olap/schema_change.h"
#include "cloud/cloud_tablet.h"
#include "olap/tablet_fwd.h"

namespace doris::cloud {

class CloudSchemaChangeJob {
public:
CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_engine, std::string job_id, int64_t expiration);
~CloudSchemaChangeJob();

// This method is idempotent for a same request.
Status process_alter_tablet(const TAlterTabletReqV2& request);

private:
Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);

private:
CloudStorageEngine& _cloud_storage_engine;
std::shared_ptr<CloudTablet> _base_tablet;
std::shared_ptr<CloudTablet> _new_tablet;
TabletSchemaSPtr _base_tablet_schema;
TabletSchemaSPtr _new_tablet_schema;
std::string _job_id;
std::vector<RowsetSharedPtr> _output_rowsets;
int64_t _output_cumulative_point;
int64_t _expiration;
};

} // namespace doris::cloud
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CloudTablet final : public BaseTablet {
bool vertical) override;

Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
bool skip_missing_version = false) override;

Status capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const override;
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class BaseTablet {
int32_t schema_hash() const { return _tablet_meta->schema_hash(); }
KeysType keys_type() const { return _tablet_meta->tablet_schema()->keys_type(); }
size_t num_key_columns() const { return _tablet_meta->tablet_schema()->num_key_columns(); }
std::mutex& get_schema_change_lock() { return _schema_change_lock; }
bool enable_unique_key_merge_on_write() const {
#ifdef BE_TEST
if (_tablet_meta == nullptr) {
Expand Down Expand Up @@ -238,6 +239,9 @@ class BaseTablet {
// metrics of this tablet
std::shared_ptr<MetricEntity> _metric_entity;

protected:
std::mutex _schema_change_lock;

public:
IntCounter* query_scan_bytes = nullptr;
IntCounter* query_scan_rows = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "common/config.h"
#include "common/logging.h"
#include "olap/base_tablet.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowid_conversion.h"
Expand Down
438 changes: 218 additions & 220 deletions be/src/olap/schema_change.cpp

Large diffs are not rendered by default.

152 changes: 91 additions & 61 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "runtime/memory/mem_tracker.h"
Expand Down Expand Up @@ -84,8 +86,8 @@ class BlockChanger {
bool has_where() const { return _where_expr != nullptr; }

private:
Status _check_cast_valid(vectorized::ColumnPtr ref_column, vectorized::ColumnPtr new_column,
AlterTabletType type) const;
static Status _check_cast_valid(vectorized::ColumnPtr ref_column, vectorized::ColumnPtr new_column,
AlterTabletType type);

// @brief column-mapping specification of new schema
SchemaMapping _schema_mapping;
Expand All @@ -105,7 +107,7 @@ class SchemaChange {
virtual ~SchemaChange() = default;

virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet,
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
Expand Down Expand Up @@ -140,7 +142,7 @@ class SchemaChange {
void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }

virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
return Status::NotSupported("inner process unsupported.");
}
Expand All @@ -167,16 +169,15 @@ class SchemaChange {

class LinkedSchemaChange : public SchemaChange {
public:
explicit LinkedSchemaChange(const BlockChanger& changer) : _changer(changer) {}
LinkedSchemaChange() = default;
~LinkedSchemaChange() override = default;

Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet,
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) override;

private:
const BlockChanger& _changer;
DISALLOW_COPY_AND_ASSIGN(LinkedSchemaChange);
};

Expand All @@ -186,7 +187,7 @@ class VSchemaChangeDirectly : public SchemaChange {

private:
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) override;

bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override {
Expand All @@ -196,26 +197,31 @@ class VSchemaChangeDirectly : public SchemaChange {
const BlockChanger& _changer;
};

// @breif schema change with sorting
class VSchemaChangeWithSorting : public SchemaChange {
class VBaseSchemaChangeWithSorting : public SchemaChange {
public:
VSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation);
~VSchemaChangeWithSorting() override = default;
VBaseSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation);
~VBaseSchemaChangeWithSorting() override = default;

private:
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) override;

Result<std::pair<RowsetSharedPtr, PendingRowsetGuard>> _internal_sorting(
virtual Result<RowsetSharedPtr> _internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
const Version& temp_delta_versions, int64_t newest_write_timestamp,
TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type,
BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema);

Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSchemaSPtr new_tablet_schema);
BaseTabletSPtr new_tablet, TabletSchemaSPtr new_tablet_schema);

protected:
// for external sorting
// src_rowsets to store the rowset generated by internal sorting
std::vector<RowsetSharedPtr> _src_rowsets;
std::vector<PendingRowsetGuard> _pending_rs_guards; // just for local schema change

private:
bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override {
return _changer.has_where() || SchemaChange::_check_row_nums(reader, writer);
}
Expand All @@ -226,72 +232,96 @@ class VSchemaChangeWithSorting : public SchemaChange {
std::unique_ptr<MemTracker> _mem_tracker;
};

class SchemaChangeHandler {
// @breif schema change with sorting
class VLocalSchemaChangeWithSorting : public VBaseSchemaChangeWithSorting {
public:
VLocalSchemaChangeWithSorting(const BlockChanger& changer, size_t memory_limitation
, StorageEngine& local_storage_engine) :
VBaseSchemaChangeWithSorting(changer, memory_limitation),
_local_storage_engine(local_storage_engine) {}
~VLocalSchemaChangeWithSorting() override = default;

Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
BaseTabletSPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) override;

Result<RowsetSharedPtr> _internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
const Version& temp_delta_versions, int64_t newest_write_timestamp,
BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) override;
private:
StorageEngine& _local_storage_engine;
};

struct AlterMaterializedViewParam {
std::string column_name;
std::string origin_column_name;
std::shared_ptr<TExpr> expr;
};

struct SchemaChangeParams {
AlterTabletType alter_tablet_type;
bool enable_unique_key_merge_on_write;
std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
DeleteHandler* delete_handler = nullptr;
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
DescriptorTbl* desc_tbl = nullptr;
ObjectPool pool;
int32_t be_exec_version;
};

class SchemaChangeJob {
public:
// schema change v2, it will not set alter task in base tablet
static Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
static Status execute_schema_change_job(const TAlterTabletReqV2& request);
SchemaChangeJob(StorageEngine& local_storage_engine, const TAlterTabletReqV2& request);
Status process_alter_tablet(const TAlterTabletReqV2& request);

static std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer,
std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer,
bool sc_sorting, bool sc_directly) {
if (sc_sorting) {
return std::make_unique<VSchemaChangeWithSorting>(
changer, config::memory_limitation_per_thread_for_schema_change_bytes);
return std::make_unique<VLocalSchemaChangeWithSorting>(changer,
config::memory_limitation_per_thread_for_schema_change_bytes, _local_storage_engine);
}

if (sc_directly) {
return std::make_unique<VSchemaChangeDirectly>(changer);
}

return std::make_unique<LinkedSchemaChange>(changer);
return std::make_unique<LinkedSchemaChange>();
}

static bool tablet_in_converting(int64_t tablet_id);
bool tablet_in_converting(int64_t tablet_id);

static Status parse_request(const SchemaChangeParams& sc_params, TabletSchema* base_tablet_schema,
TabletSchema* new_tablet_schema, BlockChanger* changer,
bool* sc_sorting, bool* sc_directly);

private:
static Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
std::vector<Version>* versions_to_be_changed,
Status _get_versions_to_be_changed(std::vector<Version>* versions_to_be_changed,
RowsetSharedPtr* max_rowset);

struct AlterMaterializedViewParam {
std::string column_name;
std::string origin_column_name;
std::shared_ptr<TExpr> expr;
};

struct SchemaChangeParams {
AlterTabletType alter_tablet_type;
TabletSharedPtr base_tablet;
TabletSharedPtr new_tablet;
TabletSchemaSPtr base_tablet_schema = nullptr;
TabletSchemaSPtr new_tablet_schema = nullptr;
std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
DeleteHandler* delete_handler = nullptr;
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
DescriptorTbl* desc_tbl = nullptr;
ObjectPool pool;
int32_t be_exec_version;
};

static Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request);

static Status _validate_alter_result(TabletSharedPtr new_tablet,
const TAlterTabletReqV2& request);

static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version);

static Status _parse_request(const SchemaChangeParams& sc_params, BlockChanger* changer,
bool* sc_sorting, bool* sc_directly);
Status _do_process_alter_tablet(const TAlterTabletReqV2& request);

Status _validate_alter_result(const TAlterTabletReqV2& request);

Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version);

// Initialization Settings for creating a default value
static Status _init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema, const std::string& value);

static Status _calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
int64_t alter_version);
Status _calc_delete_bitmap_for_mow_table(int64_t alter_version);

static std::shared_mutex _mutex;
static std::unordered_set<int64_t> _tablet_ids_in_converting;
static std::set<std::string> _supported_functions;
StorageEngine& _local_storage_engine;
TabletSharedPtr _base_tablet;
TabletSharedPtr _new_tablet;
TabletSchemaSPtr _base_tablet_schema;
TabletSchemaSPtr _new_tablet_schema;
std::shared_mutex _mutex;
std::unordered_set<int64_t> _tablet_ids_in_converting;
std::set<std::string> _supported_functions;
};
} // namespace doris
3 changes: 0 additions & 3 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ class Tablet final : public BaseTablet {

std::shared_timed_mutex& get_migration_lock() { return _migration_lock; }

std::mutex& get_schema_change_lock() { return _schema_change_lock; }

std::mutex& get_build_inverted_index_lock() { return _build_inverted_index_lock; }

// operation for compaction
Expand Down Expand Up @@ -506,7 +504,6 @@ class Tablet final : public BaseTablet {
std::mutex _ingest_lock;
std::mutex _base_compaction_lock;
std::mutex _cumulative_compaction_lock;
std::mutex _schema_change_lock;
std::shared_timed_mutex _migration_lock;
std::mutex _build_inverted_index_lock;

Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,17 @@ std::vector<const TabletIndex*> TabletSchema::get_indexes_for_column(
return indexes_for_column;
}

void TabletSchema::update_tablet_columns(const TabletSchema& tablet_schema,
const std::vector<TColumn>& t_columns) {
copy_from(tablet_schema);
if (!t_columns.empty() && t_columns[0].col_unique_id >= 0) {
clear_columns();
for (const auto& column : t_columns) {
append_column(TabletColumn(column));
}
}
}

bool TabletSchema::has_inverted_index(const TabletColumn& col) const {
// TODO use more efficient impl
int32_t col_unique_id = col.unique_id();
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ class TabletSchema {

std::shared_ptr<TabletSchema> copy_without_extracted_columns();

void update_tablet_columns(const TabletSchema& tablet_schema,
const std::vector<TColumn>& t_columns);

private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_alter_tablet_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Status EngineAlterTabletTask::execute() {
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
Status res = Status::OK();
try {
res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req);
res = SchemaChangeJob::execute_schema_change_job(_alter_tablet_req);
} catch (const Exception& e) {
res = e.to_status();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
Expand Down Expand Up @@ -81,7 +82,7 @@ protected void unlock() {
lock.unlock();
}

protected void addAlterJobV2(AlterJobV2 alterJob) {
protected void addAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
this.alterJobsV2.put(alterJob.getJobId(), alterJob);
LOG.info("add {} job {}", alterJob.getType(), alterJob.getJobId());
}
Expand Down Expand Up @@ -262,7 +263,7 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce
}

// replay the alter job v2
public void replayAlterJobV2(AlterJobV2 alterJob) {
public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
AlterJobV2 existingJob = alterJobsV2.get(alterJob.getJobId());
if (existingJob == null) {
// This is the first time to replay the alter job, so just using the replayed alterJob to call replay();
Expand Down
Loading

0 comments on commit 0e5542d

Please sign in to comment.