From 74aa0ca63adfd93a40c0d42d2c0c954bb8a5a899 Mon Sep 17 00:00:00 2001 From: meiyi Date: Mon, 19 Aug 2024 22:27:20 +0800 Subject: [PATCH] [fix](cluster key) cluster key support vertical_segment_writer (#38538) The mow cluster key does not support vertical_segment_writer. --- be/src/olap/rowset/segment_creator.cpp | 3 +- .../olap/rowset/segment_v2/segment_writer.cpp | 94 +++---- .../olap/rowset/segment_v2/segment_writer.h | 6 +- .../segment_v2/vertical_segment_writer.cpp | 229 +++++++++++++----- .../segment_v2/vertical_segment_writer.h | 24 +- .../test_mow_with_null_sequence.groovy | 2 +- .../test_pk_uk_case.groovy | 6 +- 7 files changed, 254 insertions(+), 110 deletions(-) diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index d969f5b390473e..b7cbd8b6022616 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -68,8 +68,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ RETURN_IF_ERROR(_parse_variant_columns(flush_block)); } bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; - if (config::enable_vertical_segment_writer && - _context.tablet_schema->cluster_key_idxes().empty()) { + if (config::enable_vertical_segment_writer) { std::unique_ptr writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows())); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 42e625746f3543..a450f8ffd99b9e 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -94,28 +94,28 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, _mem_tracker(std::make_unique(segment_mem_tracker_name(segment_id))), _mow_context(std::move(opts.mow_ctx)) { CHECK_NOTNULL(file_writer); - _num_key_columns = _tablet_schema->num_key_columns(); + _num_sort_key_columns = _tablet_schema->num_key_columns(); _num_short_key_columns = _tablet_schema->num_short_key_columns(); - if (_tablet_schema->cluster_key_idxes().empty()) { - DCHECK(_num_key_columns >= _num_short_key_columns) + if (!_is_mow_with_cluster_key()) { + DCHECK(_num_sort_key_columns >= _num_short_key_columns) << ", table_id=" << _tablet_schema->table_id() - << ", num_key_columns=" << _num_key_columns + << ", num_key_columns=" << _num_sort_key_columns << ", num_short_key_columns=" << _num_short_key_columns << ", cluster_key_columns=" << _tablet_schema->cluster_key_idxes().size(); } - for (size_t cid = 0; cid < _num_key_columns; ++cid) { + for (size_t cid = 0; cid < _num_sort_key_columns; ++cid) { const auto& column = _tablet_schema->column(cid); _key_coders.push_back(get_key_coder(column.type())); _key_index_size.push_back(column.index_length()); } - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_is_mow()) { // encode the sequence id into the primary key index if (_tablet_schema->has_sequence_col()) { const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); _seq_coder = get_key_coder(column.type()); } // encode the rowid into the primary key index - if (!_tablet_schema->cluster_key_idxes().empty()) { + if (_is_mow_with_cluster_key()) { const auto* type_info = get_scalar_type_info(); _rowid_coder = get_key_coder(type_info->type()); // primary keys @@ -123,7 +123,7 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, // cluster keys _key_coders.clear(); _key_index_size.clear(); - _num_key_columns = _tablet_schema->cluster_key_idxes().size(); + _num_sort_key_columns = _tablet_schema->cluster_key_idxes().size(); for (auto cid : _tablet_schema->cluster_key_idxes()) { const auto& column = _tablet_schema->column(cid); _key_coders.push_back(get_key_coder(column.type())); @@ -284,14 +284,14 @@ Status SegmentWriter::init(const std::vector& col_ids, bool has_key) { // we don't need the short key index for unique key merge on write table. if (_has_key) { - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_is_mow()) { size_t seq_col_length = 0; if (_tablet_schema->has_sequence_col()) { seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1; } size_t rowid_length = 0; - if (!_tablet_schema->cluster_key_idxes().empty()) { + if (_is_mow_with_cluster_key()) { rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; _short_key_index_builder.reset( new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block)); @@ -478,7 +478,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* block->columns(), _tablet_schema->num_key_columns(), _tablet_schema->num_columns())); } - DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); + DCHECK(_is_mow()); DCHECK(_opts.rowset_ctx->partial_update_info); // find missing column cids @@ -507,7 +507,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* if (!converted_result.first.ok()) { return converted_result.first; } - if (cid < _num_key_columns) { + if (cid < _num_sort_key_columns) { key_columns.push_back(converted_result.second); } else if (_tablet_schema->has_sequence_col() && cid == _tablet_schema->sequence_col_idx()) { @@ -906,19 +906,9 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po converted_result.second->get_data(), num_rows)); } if (_has_key) { - // for now we don't need to query short key index for CLUSTER BY feature, - // but we still write the index for future usage. - bool need_primary_key_indexes = (_tablet_schema->keys_type() == UNIQUE_KEYS && - _opts.enable_unique_key_merge_on_write); - bool need_short_key_indexes = - !need_primary_key_indexes || - (need_primary_key_indexes && !_tablet_schema->cluster_key_idxes().empty()); - if (need_primary_key_indexes && !need_short_key_indexes) { // mow table without cluster keys - RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, - num_rows, false)); - } else if (!need_primary_key_indexes && need_short_key_indexes) { // other tables - RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); - } else if (need_primary_key_indexes && need_short_key_indexes) { // mow with cluster keys + if (_is_mow_with_cluster_key()) { + // for now we don't need to query short key index for CLUSTER BY feature, + // but we still write the index for future usage. // 1. generate primary key index, the key_columns is primary_key_columns RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns, seq_column, num_rows, true)); @@ -938,6 +928,11 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po } } RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); + } else if (_is_mow()) { + RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, + num_rows, false)); + } else { + RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); } } @@ -967,8 +962,9 @@ int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) { std::string SegmentWriter::_full_encode_keys( const std::vector& key_columns, size_t pos, bool null_first) { - assert(_key_index_size.size() == _num_key_columns); - assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns); + assert(_key_index_size.size() == _num_sort_key_columns); + assert(key_columns.size() == _num_sort_key_columns && + _key_coders.size() == _num_sort_key_columns); return _full_encode_keys(_key_coders, key_columns, pos, null_first); } @@ -1047,7 +1043,7 @@ Status SegmentWriter::append_row(const RowType& row) { RETURN_IF_ERROR(_column_writers[cid]->append(cell)); } std::string full_encoded_key; - encode_key(&full_encoded_key, row, _num_key_columns); + encode_key(&full_encoded_key, row, _num_sort_key_columns); if (_tablet_schema->has_sequence_col()) { full_encoded_key.push_back(KEY_NORMAL_MARKER); auto cid = _tablet_schema->sequence_col_idx(); @@ -1055,7 +1051,10 @@ Status SegmentWriter::append_row(const RowType& row) { row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &full_encoded_key); } - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_is_mow_with_cluster_key()) { + return Status::InternalError( + "SegmentWriter::append_row does not support mow tables with cluster key"); + } else if (_is_mow()) { RETURN_IF_ERROR(_primary_key_index_builder->add_item(full_encoded_key)); } else { // At the beginning of one block, so add a short key index entry @@ -1082,7 +1081,9 @@ uint64_t SegmentWriter::estimate_segment_size() { for (auto& column_writer : _column_writers) { size += column_writer->estimate_buffer_size(); } - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_is_mow_with_cluster_key()) { + size += _primary_key_index_builder->size() + _short_key_index_builder->size(); + } else if (_is_mow()) { size += _primary_key_index_builder->size(); } else { size += _short_key_index_builder->size(); @@ -1126,19 +1127,17 @@ Status SegmentWriter::finalize_columns_index(uint64_t* index_size) { *index_size = _file_writer->bytes_appended() - index_start; if (_has_key) { - bool write_short_key_index = _tablet_schema->keys_type() != UNIQUE_KEYS || - (_tablet_schema->keys_type() == UNIQUE_KEYS && - !_opts.enable_unique_key_merge_on_write) || - (_tablet_schema->keys_type() == UNIQUE_KEYS && - _opts.enable_unique_key_merge_on_write && - !_tablet_schema->cluster_key_idxes().empty()); - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_is_mow_with_cluster_key()) { + RETURN_IF_ERROR(_write_short_key_index()); + *index_size = _file_writer->bytes_appended() - index_start; + RETURN_IF_ERROR(_write_primary_key_index()); + *index_size += _primary_key_index_builder->disk_size(); + } else if (_is_mow()) { RETURN_IF_ERROR(_write_primary_key_index()); // IndexedColumnWriter write data pages mixed with segment data, we should use // the stat from primary key index builder. *index_size += _primary_key_index_builder->disk_size(); - } - if (write_short_key_index) { + } else { RETURN_IF_ERROR(_write_short_key_index()); *index_size = _file_writer->bytes_appended() - index_start; } @@ -1297,14 +1296,12 @@ Status SegmentWriter::_write_raw_data(const std::vector& slices) { } Slice SegmentWriter::min_encoded_key() { - return (_primary_key_index_builder == nullptr || !_tablet_schema->cluster_key_idxes().empty()) - ? Slice(_min_key.data(), _min_key.size()) - : _primary_key_index_builder->min_key(); + return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), _min_key.size()) + : _primary_key_index_builder->min_key(); } Slice SegmentWriter::max_encoded_key() { - return (_primary_key_index_builder == nullptr || !_tablet_schema->cluster_key_idxes().empty()) - ? Slice(_max_key.data(), _max_key.size()) - : _primary_key_index_builder->max_key(); + return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), _max_key.size()) + : _primary_key_index_builder->max_key(); } void SegmentWriter::set_min_max_key(const Slice& key) { @@ -1400,5 +1397,12 @@ int64_t SegmentWriter::get_inverted_index_total_size() { return 0; } +inline bool SegmentWriter::_is_mow() { + return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write; +} + +inline bool SegmentWriter::_is_mow_with_cluster_key() { + return _is_mow() && !_tablet_schema->cluster_key_idxes().empty(); +} } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 32723e72fb0e71..3cdb71a45d7b15 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -184,6 +184,8 @@ class SegmentWriter { vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort); Status _generate_short_key_index(std::vector& key_columns, size_t num_rows, const std::vector& short_key_pos); + bool _is_mow(); + bool _is_mow_with_cluster_key(); private: uint32_t _segment_id; @@ -196,7 +198,9 @@ class SegmentWriter { io::FileWriter* _file_writer = nullptr; std::unique_ptr _inverted_index_file_writer; SegmentFooterPB _footer; - size_t _num_key_columns; + // for mow tables with cluster key, the sort key is the cluster keys not unique keys + // for other tables, the sort key is the keys + size_t _num_sort_key_columns; size_t _num_short_key_columns; InvertedIndexFileInfo _inverted_index_file_info; std::unique_ptr _short_key_index_builder; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index ecb248b7fe9d4d..c14f3b557d7f2a 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -98,19 +98,42 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 vertical_segment_writer_mem_tracker_name(segment_id))), _mow_context(std::move(opts.mow_ctx)) { CHECK_NOTNULL(file_writer); - _num_key_columns = _tablet_schema->num_key_columns(); + _num_sort_key_columns = _tablet_schema->num_key_columns(); _num_short_key_columns = _tablet_schema->num_short_key_columns(); - DCHECK(_num_key_columns >= _num_short_key_columns); - for (size_t cid = 0; cid < _num_key_columns; ++cid) { + if (!_is_mow_with_cluster_key()) { + DCHECK(_num_sort_key_columns >= _num_short_key_columns) + << ", table_id=" << _tablet_schema->table_id() + << ", num_key_columns=" << _num_sort_key_columns + << ", num_short_key_columns=" << _num_short_key_columns + << ", cluster_key_columns=" << _tablet_schema->cluster_key_idxes().size(); + } + for (size_t cid = 0; cid < _num_sort_key_columns; ++cid) { const auto& column = _tablet_schema->column(cid); _key_coders.push_back(get_key_coder(column.type())); _key_index_size.push_back(column.index_length()); } // encode the sequence id into the primary key index - if (_tablet_schema->has_sequence_col() && _tablet_schema->keys_type() == UNIQUE_KEYS && - _opts.enable_unique_key_merge_on_write) { - const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); - _seq_coder = get_key_coder(column.type()); + if (_is_mow()) { + if (_tablet_schema->has_sequence_col()) { + const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); + _seq_coder = get_key_coder(column.type()); + } + // encode the rowid into the primary key index + if (_is_mow_with_cluster_key()) { + const auto* type_info = get_scalar_type_info(); + _rowid_coder = get_key_coder(type_info->type()); + // primary keys + _primary_key_coders.swap(_key_coders); + // cluster keys + _key_coders.clear(); + _key_index_size.clear(); + _num_sort_key_columns = _tablet_schema->cluster_key_idxes().size(); + for (auto cid : _tablet_schema->cluster_key_idxes()) { + const auto& column = _tablet_schema->column(cid); + _key_coders.push_back(get_key_coder(column.type())); + _key_index_size.push_back(column.index_length()); + } + } } if (_tablet_schema->has_inverted_index()) { _inverted_index_file_writer = std::make_unique( @@ -248,14 +271,14 @@ Status VerticalSegmentWriter::init() { _olap_data_convertor->reserve(_tablet_schema->num_columns()); _column_writers.reserve(_tablet_schema->columns().size()); // we don't need the short key index for unique key merge on write table. - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_is_mow()) { size_t seq_col_length = 0; if (_tablet_schema->has_sequence_col()) { seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1; } size_t rowid_length = 0; - if (!_tablet_schema->cluster_key_idxes().empty()) { + if (_is_mow_with_cluster_key()) { rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; _short_key_index_builder.reset( new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block)); @@ -318,7 +341,7 @@ void VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl // 3. set columns to data convertor and then write all columns Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block) { - DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); + DCHECK(_is_mow()); DCHECK(_opts.rowset_ctx->partial_update_info != nullptr); // create full block and fill with input columns @@ -344,7 +367,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da if (!status.ok()) { return status; } - if (cid < _num_key_columns) { + if (cid < _num_sort_key_columns) { key_columns.push_back(column); } else if (_tablet_schema->has_sequence_col() && cid == _tablet_schema->sequence_col_idx()) { @@ -846,6 +869,7 @@ Status VerticalSegmentWriter::write_batch() { std::vector key_columns; vectorized::IOlapColumnDataAccessor* seq_column = nullptr; + std::map cid_to_column; for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); for (auto& data : _batched_blocks) { @@ -857,12 +881,18 @@ Status VerticalSegmentWriter::write_batch() { if (!status.ok()) { return status; } - if (cid < _num_key_columns) { + if (cid < _tablet_schema->num_key_columns()) { key_columns.push_back(column); - } else if (_tablet_schema->has_sequence_col() && - cid == _tablet_schema->sequence_col_idx()) { + } + if (_tablet_schema->has_sequence_col() && cid == _tablet_schema->sequence_col_idx()) { seq_column = column; } + if (_is_mow_with_cluster_key() && + std::find(_tablet_schema->cluster_key_idxes().begin(), + _tablet_schema->cluster_key_idxes().end(), + cid) != _tablet_schema->cluster_key_idxes().end()) { + cid_to_column[cid] = column; + } RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(), data.num_rows)); _olap_data_convertor->clear_source_content(); @@ -878,44 +908,7 @@ Status VerticalSegmentWriter::write_batch() { for (auto& data : _batched_blocks) { _olap_data_convertor->set_source_content(data.block, data.row_pos, data.num_rows); - // find all row pos for short key indexes - std::vector short_key_pos; - // We build a short key index every `_opts.num_rows_per_block` rows. Specifically, we - // build a short key index using 1st rows for first block and `_short_key_row_pos - _row_count` - // for next blocks. - if (_short_key_row_pos == 0 && _num_rows_written == 0) { - short_key_pos.push_back(0); - } - while (_short_key_row_pos + _opts.num_rows_per_block < _num_rows_written + data.num_rows) { - _short_key_row_pos += _opts.num_rows_per_block; - short_key_pos.push_back(_short_key_row_pos - _num_rows_written); - } - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { - // create primary indexes - std::string last_key; - for (size_t pos = 0; pos < data.num_rows; pos++) { - std::string key = _full_encode_keys(key_columns, pos); - _maybe_invalid_row_cache(key); - if (_tablet_schema->has_sequence_col()) { - _encode_seq_column(seq_column, pos, &key); - } - DCHECK(key.compare(last_key) > 0) - << "found duplicate key or key is not sorted! current key: " << key - << ", last key" << last_key; - RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); - last_key = std::move(key); - } - } else { - // create short key indexes' - // for min_max key - _set_min_key(_full_encode_keys(key_columns, 0)); - _set_max_key(_full_encode_keys(key_columns, data.num_rows - 1)); - - key_columns.resize(_num_short_key_columns); - for (const auto pos : short_key_pos) { - RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, pos))); - } - } + RETURN_IF_ERROR(_generate_key_index(data, key_columns, seq_column, cid_to_column)); _olap_data_convertor->clear_source_content(); _num_rows_written += data.num_rows; } @@ -937,10 +930,117 @@ Status VerticalSegmentWriter::write_batch() { return Status::OK(); } +Status VerticalSegmentWriter::_generate_key_index( + RowsInBlock& data, std::vector& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, + std::map& cid_to_column) { + // find all row pos for short key indexes + std::vector short_key_pos; + // We build a short key index every `_opts.num_rows_per_block` rows. Specifically, we + // build a short key index using 1st rows for first block and `_short_key_row_pos - _row_count` + // for next blocks. + if (_short_key_row_pos == 0 && _num_rows_written == 0) { + short_key_pos.push_back(0); + } + while (_short_key_row_pos + _opts.num_rows_per_block < _num_rows_written + data.num_rows) { + _short_key_row_pos += _opts.num_rows_per_block; + short_key_pos.push_back(_short_key_row_pos - _num_rows_written); + } + if (_is_mow_with_cluster_key()) { + // 1. generate primary key index + RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns, seq_column, + data.num_rows, true)); + // 2. generate short key index (use cluster key) + std::vector short_key_columns; + for (const auto& cid : _tablet_schema->cluster_key_idxes()) { + short_key_columns.push_back(cid_to_column[cid]); + } + RETURN_IF_ERROR(_generate_short_key_index(short_key_columns, data.num_rows, short_key_pos)); + } else if (_is_mow()) { + RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, + data.num_rows, false)); + } else { // other tables + RETURN_IF_ERROR(_generate_short_key_index(key_columns, data.num_rows, short_key_pos)); + } + return Status::OK(); +} + +Status VerticalSegmentWriter::_generate_primary_key_index( + const std::vector& primary_key_coders, + const std::vector& primary_key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort) { + if (!need_sort) { // mow table without cluster key + std::string last_key; + for (size_t pos = 0; pos < num_rows; pos++) { + // use _key_coders + std::string key = _full_encode_keys(primary_key_columns, pos); + _maybe_invalid_row_cache(key); + if (_tablet_schema->has_sequence_col()) { + _encode_seq_column(seq_column, pos, &key); + } + DCHECK(key.compare(last_key) > 0) + << "found duplicate key or key is not sorted! current key: " << key + << ", last key" << last_key; + RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); + last_key = std::move(key); + } + } else { // mow table with cluster key + // 1. generate primary keys in memory + std::vector primary_keys; + for (uint32_t pos = 0; pos < num_rows; pos++) { + std::string key = _full_encode_keys(primary_key_coders, primary_key_columns, pos); + _maybe_invalid_row_cache(key); + if (_tablet_schema->has_sequence_col()) { + _encode_seq_column(seq_column, pos, &key); + } + _encode_rowid(pos, &key); + primary_keys.emplace_back(std::move(key)); + } + // 2. sort primary keys + std::sort(primary_keys.begin(), primary_keys.end()); + // 3. write primary keys index + std::string last_key; + for (const auto& key : primary_keys) { + DCHECK(key.compare(last_key) > 0) + << "found duplicate key or key is not sorted! current key: " << key + << ", last key" << last_key; + RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); + } + } + return Status::OK(); +} + +Status VerticalSegmentWriter::_generate_short_key_index( + std::vector& key_columns, size_t num_rows, + const std::vector& short_key_pos) { + // use _key_coders + _set_min_key(_full_encode_keys(key_columns, 0)); + _set_max_key(_full_encode_keys(key_columns, num_rows - 1)); + + key_columns.resize(_num_short_key_columns); + for (const auto pos : short_key_pos) { + RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, pos))); + } + return Status::OK(); +} + +void VerticalSegmentWriter::_encode_rowid(const uint32_t rowid, string* encoded_keys) { + encoded_keys->push_back(KEY_NORMAL_MARKER); + _rowid_coder->full_encode_ascending(&rowid, encoded_keys); +} + +std::string VerticalSegmentWriter::_full_encode_keys( + const std::vector& key_columns, size_t pos) { + assert(_key_index_size.size() == _num_sort_key_columns); + assert(key_columns.size() == _num_sort_key_columns && + _key_coders.size() == _num_sort_key_columns); + return _full_encode_keys(_key_coders, key_columns, pos); +} + std::string VerticalSegmentWriter::_full_encode_keys( + const std::vector& key_coders, const std::vector& key_columns, size_t pos) { - assert(_key_index_size.size() == _num_key_columns); - assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns); + assert(key_columns.size() == key_coders.size()); std::string encoded_keys; size_t cid = 0; @@ -952,7 +1052,8 @@ std::string VerticalSegmentWriter::_full_encode_keys( continue; } encoded_keys.push_back(KEY_NORMAL_MARKER); - _key_coders[cid]->full_encode_ascending(field, &encoded_keys); + DCHECK(key_coders[cid] != nullptr); + key_coders[cid]->full_encode_ascending(field, &encoded_keys); ++cid; } return encoded_keys; @@ -999,7 +1100,9 @@ std::string VerticalSegmentWriter::_encode_keys( uint64_t VerticalSegmentWriter::_estimated_remaining_size() { // footer_size(4) + checksum(4) + segment_magic(4) uint64_t size = 12; - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_is_mow_with_cluster_key()) { + size += _primary_key_index_builder->size() + _short_key_index_builder->size(); + } else if (_is_mow()) { size += _primary_key_index_builder->size(); } else { size += _short_key_index_builder->size(); @@ -1019,7 +1122,12 @@ Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) { RETURN_IF_ERROR(_write_bloom_filter_index()); *index_size = _file_writer->bytes_appended() - index_start; - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_is_mow_with_cluster_key()) { + RETURN_IF_ERROR(_write_short_key_index()); + *index_size = _file_writer->bytes_appended() - index_start; + RETURN_IF_ERROR(_write_primary_key_index()); + *index_size += _primary_key_index_builder->disk_size(); + } else if (_is_mow()) { RETURN_IF_ERROR(_write_primary_key_index()); // IndexedColumnWriter write data pages mixed with segment data, we should use // the stat from primary key index builder. @@ -1202,5 +1310,12 @@ int64_t VerticalSegmentWriter::get_inverted_index_total_size() { return 0; } +inline bool VerticalSegmentWriter::_is_mow() { + return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write; +} + +inline bool VerticalSegmentWriter::_is_mow_with_cluster_key() { + return _is_mow() && !_tablet_schema->cluster_key_idxes().empty(); +} } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 831747712b0dba..66525ea4c768d5 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -146,9 +146,14 @@ class VerticalSegmentWriter { // used for unique-key with merge on write and segment min_max key std::string _full_encode_keys( const std::vector& key_columns, size_t pos); + std::string _full_encode_keys( + const std::vector& key_coders, + const std::vector& key_columns, size_t pos); // used for unique-key with merge on write void _encode_seq_column(const vectorized::IOlapColumnDataAccessor* seq_column, size_t pos, string* encoded_keys); + // used for unique-key with merge on write tables with cluster keys + void _encode_rowid(const uint32_t rowid, string* encoded_keys); void _set_min_max_key(const Slice& key); void _set_min_key(const Slice& key); void _set_max_key(const Slice& key); @@ -159,6 +164,18 @@ class VerticalSegmentWriter { const std::vector& use_default_or_null_flag, bool has_default_or_nullable, const size_t& segment_start_pos, const vectorized::Block* block); + Status _generate_key_index( + RowsInBlock& data, std::vector& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, + std::map& cid_to_column); + Status _generate_primary_key_index( + const std::vector& primary_key_coders, + const std::vector& primary_key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort); + Status _generate_short_key_index(std::vector& key_columns, + size_t num_rows, const std::vector& short_key_pos); + bool _is_mow(); + bool _is_mow_with_cluster_key(); private: uint32_t _segment_id; @@ -172,7 +189,9 @@ class VerticalSegmentWriter { std::unique_ptr _inverted_index_file_writer; SegmentFooterPB _footer; - size_t _num_key_columns; + // for mow tables with cluster key, the sort key is the cluster keys not unique keys + // for other tables, the sort key is the keys + size_t _num_sort_key_columns; size_t _num_short_key_columns; InvertedIndexFileInfo _inverted_index_file_info; std::unique_ptr _short_key_index_builder; @@ -183,7 +202,10 @@ class VerticalSegmentWriter { std::unique_ptr _olap_data_convertor; // used for building short key index or primary key index during vectorized write. std::vector _key_coders; + // for mow table with cluster keys, this is primary keys + std::vector _primary_key_coders; const KeyCoder* _seq_coder = nullptr; + const KeyCoder* _rowid_coder = nullptr; std::vector _key_index_size; size_t _short_key_row_pos = 0; diff --git a/regression-test/suites/unique_with_mow_c_p0/test_mow_with_null_sequence.groovy b/regression-test/suites/unique_with_mow_c_p0/test_mow_with_null_sequence.groovy index 3e6f7cce599e5d..44286fdc27d53c 100644 --- a/regression-test/suites/unique_with_mow_c_p0/test_mow_with_null_sequence.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/test_mow_with_null_sequence.groovy @@ -88,5 +88,5 @@ suite("test_mow_with_null_sequence") { order_qt_sql "select * from $tableName" - sql """ DROP TABLE IF EXISTS $tableName """ + // sql """ DROP TABLE IF EXISTS $tableName """ } diff --git a/regression-test/suites/unique_with_mow_c_p0/test_pk_uk_case.groovy b/regression-test/suites/unique_with_mow_c_p0/test_pk_uk_case.groovy index f4bc7b2f205ba9..90a992aa3528ee 100644 --- a/regression-test/suites/unique_with_mow_c_p0/test_pk_uk_case.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/test_pk_uk_case.groovy @@ -26,13 +26,13 @@ import java.util.Map; import java.util.UUID; import java.time.format.DateTimeFormatter; -suite("test_pk_uk_case_cluster_key") { +suite("test_pk_uk_case") { def tableNamePk = "primary_key_pk_uk" def tableNameUk = "unique_key_pk_uk" onFinish { - try_sql("DROP TABLE IF EXISTS ${tableNamePk}") - try_sql("DROP TABLE IF EXISTS ${tableNameUk}") + // try_sql("DROP TABLE IF EXISTS ${tableNamePk}") + // try_sql("DROP TABLE IF EXISTS ${tableNameUk}") } sql """ DROP TABLE IF EXISTS ${tableNamePk} """