Skip to content

Commit

Permalink
[fix](cluster key) cluster key support vertical_segment_writer (apach…
Browse files Browse the repository at this point in the history
…e#38538)

The mow cluster key does not support vertical_segment_writer.
  • Loading branch information
mymeiyi authored Aug 19, 2024
1 parent 0ae9f11 commit 74aa0ca
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 110 deletions.
3 changes: 1 addition & 2 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer &&
_context.tablet_schema->cluster_key_idxes().empty()) {
if (config::enable_vertical_segment_writer) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
Expand Down
94 changes: 49 additions & 45 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,36 +94,36 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
_mem_tracker(std::make_unique<MemTracker>(segment_mem_tracker_name(segment_id))),
_mow_context(std::move(opts.mow_ctx)) {
CHECK_NOTNULL(file_writer);
_num_key_columns = _tablet_schema->num_key_columns();
_num_sort_key_columns = _tablet_schema->num_key_columns();
_num_short_key_columns = _tablet_schema->num_short_key_columns();
if (_tablet_schema->cluster_key_idxes().empty()) {
DCHECK(_num_key_columns >= _num_short_key_columns)
if (!_is_mow_with_cluster_key()) {
DCHECK(_num_sort_key_columns >= _num_short_key_columns)
<< ", table_id=" << _tablet_schema->table_id()
<< ", num_key_columns=" << _num_key_columns
<< ", num_key_columns=" << _num_sort_key_columns
<< ", num_short_key_columns=" << _num_short_key_columns
<< ", cluster_key_columns=" << _tablet_schema->cluster_key_idxes().size();
}
for (size_t cid = 0; cid < _num_key_columns; ++cid) {
for (size_t cid = 0; cid < _num_sort_key_columns; ++cid) {
const auto& column = _tablet_schema->column(cid);
_key_coders.push_back(get_key_coder(column.type()));
_key_index_size.push_back(column.index_length());
}
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow()) {
// encode the sequence id into the primary key index
if (_tablet_schema->has_sequence_col()) {
const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx());
_seq_coder = get_key_coder(column.type());
}
// encode the rowid into the primary key index
if (!_tablet_schema->cluster_key_idxes().empty()) {
if (_is_mow_with_cluster_key()) {
const auto* type_info = get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
_rowid_coder = get_key_coder(type_info->type());
// primary keys
_primary_key_coders.swap(_key_coders);
// cluster keys
_key_coders.clear();
_key_index_size.clear();
_num_key_columns = _tablet_schema->cluster_key_idxes().size();
_num_sort_key_columns = _tablet_schema->cluster_key_idxes().size();
for (auto cid : _tablet_schema->cluster_key_idxes()) {
const auto& column = _tablet_schema->column(cid);
_key_coders.push_back(get_key_coder(column.type()));
Expand Down Expand Up @@ -284,14 +284,14 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {

// we don't need the short key index for unique key merge on write table.
if (_has_key) {
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow()) {
size_t seq_col_length = 0;
if (_tablet_schema->has_sequence_col()) {
seq_col_length =
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
}
size_t rowid_length = 0;
if (!_tablet_schema->cluster_key_idxes().empty()) {
if (_is_mow_with_cluster_key()) {
rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
_short_key_index_builder.reset(
new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
Expand Down Expand Up @@ -478,7 +478,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
block->columns(), _tablet_schema->num_key_columns(),
_tablet_schema->num_columns()));
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write);
DCHECK(_is_mow());

DCHECK(_opts.rowset_ctx->partial_update_info);
// find missing column cids
Expand Down Expand Up @@ -507,7 +507,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
if (!converted_result.first.ok()) {
return converted_result.first;
}
if (cid < _num_key_columns) {
if (cid < _num_sort_key_columns) {
key_columns.push_back(converted_result.second);
} else if (_tablet_schema->has_sequence_col() &&
cid == _tablet_schema->sequence_col_idx()) {
Expand Down Expand Up @@ -906,19 +906,9 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
converted_result.second->get_data(), num_rows));
}
if (_has_key) {
// for now we don't need to query short key index for CLUSTER BY feature,
// but we still write the index for future usage.
bool need_primary_key_indexes = (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
bool need_short_key_indexes =
!need_primary_key_indexes ||
(need_primary_key_indexes && !_tablet_schema->cluster_key_idxes().empty());
if (need_primary_key_indexes && !need_short_key_indexes) { // mow table without cluster keys
RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
num_rows, false));
} else if (!need_primary_key_indexes && need_short_key_indexes) { // other tables
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
} else if (need_primary_key_indexes && need_short_key_indexes) { // mow with cluster keys
if (_is_mow_with_cluster_key()) {
// for now we don't need to query short key index for CLUSTER BY feature,
// but we still write the index for future usage.
// 1. generate primary key index, the key_columns is primary_key_columns
RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns,
seq_column, num_rows, true));
Expand All @@ -938,6 +928,11 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
}
}
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
} else if (_is_mow()) {
RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
num_rows, false));
} else {
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
}
}

Expand Down Expand Up @@ -967,8 +962,9 @@ int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) {
std::string SegmentWriter::_full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first) {
assert(_key_index_size.size() == _num_key_columns);
assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns);
assert(_key_index_size.size() == _num_sort_key_columns);
assert(key_columns.size() == _num_sort_key_columns &&
_key_coders.size() == _num_sort_key_columns);
return _full_encode_keys(_key_coders, key_columns, pos, null_first);
}

Expand Down Expand Up @@ -1047,15 +1043,18 @@ Status SegmentWriter::append_row(const RowType& row) {
RETURN_IF_ERROR(_column_writers[cid]->append(cell));
}
std::string full_encoded_key;
encode_key<RowType, true>(&full_encoded_key, row, _num_key_columns);
encode_key<RowType, true>(&full_encoded_key, row, _num_sort_key_columns);
if (_tablet_schema->has_sequence_col()) {
full_encoded_key.push_back(KEY_NORMAL_MARKER);
auto cid = _tablet_schema->sequence_col_idx();
auto cell = row.cell(cid);
row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &full_encoded_key);
}

if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow_with_cluster_key()) {
return Status::InternalError(
"SegmentWriter::append_row does not support mow tables with cluster key");
} else if (_is_mow()) {
RETURN_IF_ERROR(_primary_key_index_builder->add_item(full_encoded_key));
} else {
// At the beginning of one block, so add a short key index entry
Expand All @@ -1082,7 +1081,9 @@ uint64_t SegmentWriter::estimate_segment_size() {
for (auto& column_writer : _column_writers) {
size += column_writer->estimate_buffer_size();
}
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow_with_cluster_key()) {
size += _primary_key_index_builder->size() + _short_key_index_builder->size();
} else if (_is_mow()) {
size += _primary_key_index_builder->size();
} else {
size += _short_key_index_builder->size();
Expand Down Expand Up @@ -1126,19 +1127,17 @@ Status SegmentWriter::finalize_columns_index(uint64_t* index_size) {

*index_size = _file_writer->bytes_appended() - index_start;
if (_has_key) {
bool write_short_key_index = _tablet_schema->keys_type() != UNIQUE_KEYS ||
(_tablet_schema->keys_type() == UNIQUE_KEYS &&
!_opts.enable_unique_key_merge_on_write) ||
(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write &&
!_tablet_schema->cluster_key_idxes().empty());
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_is_mow_with_cluster_key()) {
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _file_writer->bytes_appended() - index_start;
RETURN_IF_ERROR(_write_primary_key_index());
*index_size += _primary_key_index_builder->disk_size();
} else if (_is_mow()) {
RETURN_IF_ERROR(_write_primary_key_index());
// IndexedColumnWriter write data pages mixed with segment data, we should use
// the stat from primary key index builder.
*index_size += _primary_key_index_builder->disk_size();
}
if (write_short_key_index) {
} else {
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _file_writer->bytes_appended() - index_start;
}
Expand Down Expand Up @@ -1297,14 +1296,12 @@ Status SegmentWriter::_write_raw_data(const std::vector<Slice>& slices) {
}

Slice SegmentWriter::min_encoded_key() {
return (_primary_key_index_builder == nullptr || !_tablet_schema->cluster_key_idxes().empty())
? Slice(_min_key.data(), _min_key.size())
: _primary_key_index_builder->min_key();
return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), _min_key.size())
: _primary_key_index_builder->min_key();
}
Slice SegmentWriter::max_encoded_key() {
return (_primary_key_index_builder == nullptr || !_tablet_schema->cluster_key_idxes().empty())
? Slice(_max_key.data(), _max_key.size())
: _primary_key_index_builder->max_key();
return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), _max_key.size())
: _primary_key_index_builder->max_key();
}

void SegmentWriter::set_min_max_key(const Slice& key) {
Expand Down Expand Up @@ -1400,5 +1397,12 @@ int64_t SegmentWriter::get_inverted_index_total_size() {
return 0;
}

inline bool SegmentWriter::_is_mow() {
return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write;
}

inline bool SegmentWriter::_is_mow_with_cluster_key() {
return _is_mow() && !_tablet_schema->cluster_key_idxes().empty();
}
} // namespace segment_v2
} // namespace doris
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ class SegmentWriter {
vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort);
Status _generate_short_key_index(std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t num_rows, const std::vector<size_t>& short_key_pos);
bool _is_mow();
bool _is_mow_with_cluster_key();

private:
uint32_t _segment_id;
Expand All @@ -196,7 +198,9 @@ class SegmentWriter {
io::FileWriter* _file_writer = nullptr;
std::unique_ptr<InvertedIndexFileWriter> _inverted_index_file_writer;
SegmentFooterPB _footer;
size_t _num_key_columns;
// for mow tables with cluster key, the sort key is the cluster keys not unique keys
// for other tables, the sort key is the keys
size_t _num_sort_key_columns;
size_t _num_short_key_columns;
InvertedIndexFileInfo _inverted_index_file_info;
std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder;
Expand Down
Loading

0 comments on commit 74aa0ca

Please sign in to comment.