Skip to content

Commit

Permalink
[Pick](Variant) pick some fix (#37926)
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored Jul 17, 2024
1 parent 2d7acd0 commit 2994232
Show file tree
Hide file tree
Showing 31 changed files with 1,027 additions and 206 deletions.
7 changes: 6 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,12 @@ Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);

RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(true);
// Variant schema maybe updated, so we need to update the schema as well.
// The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap`
// will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset
// for variant type
bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0;
RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema);
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
Status st =
retry_rpc("update committed rowset", req, &resp, &MetaService_Stub::update_tmp_rowset);
Expand Down
7 changes: 5 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,11 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_write
RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = std::make_shared<TabletSchema>();
context.tablet_schema->copy_from(*(rowset.tablet_schema()));
// During a partial update, the extracted columns of a variant should not be included in the tablet schema.
// This is because the partial update for a variant needs to ignore the extracted columns.
// Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
// the complete variant is constructed by reading all the sub-columns of the variant.
context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = table_id();
context.enable_segcompaction = false;
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true");
DEFINE_mBool(variant_enable_flatten_nested, "false");
DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "1000");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");

// block file cache
DEFINE_Bool(enable_file_cache, "false");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,8 @@ DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column);
// Threshold to estimate a column is sparsed
// Notice: TEST ONLY
DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
// Treat invalid json format str as string, instead of throwing exception if false
DECLARE_mBool(variant_throw_exeception_on_invalid_json);

DECLARE_mBool(enable_merge_on_write_correctness_check);
// USED FOR DEBUGING
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,9 @@ Status VariantRootColumnIterator::next_batch(size_t* n, vectorized::MutableColum
if (obj.is_null_root()) {
obj.create_root();
}
if (!obj.is_finalized()) {
obj.finalize();
}
auto root_column = obj.get_root();
RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));
obj.incr_num_rows(*n);
Expand Down Expand Up @@ -1634,6 +1637,9 @@ Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const si
if (obj.is_null_root()) {
obj.create_root();
}
if (!obj.is_finalized()) {
obj.finalize();
}
auto root_column = obj.get_root();
RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, root_column));
obj.incr_num_rows(count);
Expand Down
129 changes: 94 additions & 35 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "vec/columns/column_object.h"

#include <assert.h>
#include <fmt/core.h>
#include <fmt/format.h>
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
Expand All @@ -34,6 +35,7 @@
#include <map>
#include <memory>
#include <optional>
#include <sstream>
#include <vector>

#include "common/compiler_util.h" // IWYU pragma: keep
Expand Down Expand Up @@ -677,8 +679,6 @@ void ColumnObject::check_consistency() const {
}
for (const auto& leaf : subcolumns) {
if (num_rows != leaf->data.size()) {
// LOG(FATAL) << "unmatched column:" << leaf->path.get_path()
// << ", expeted rows:" << num_rows << ", but meet:" << leaf->data.size();
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"unmatched column: {}, expeted rows: {}, but meet: {}",
leaf->path.get_path(), num_rows, leaf->data.size());
Expand Down Expand Up @@ -791,26 +791,61 @@ void ColumnObject::insert_default() {
++num_rows;
}

Field ColumnObject::operator[](size_t n) const {
if (!is_finalized()) {
const_cast<ColumnObject*>(this)->finalize();
}
VariantMap map;
for (const auto& entry : subcolumns) {
if (WhichDataType(remove_nullable(entry->data.data_types.back())).is_json()) {
void ColumnObject::Subcolumn::get(size_t n, Field& res) const {
if (is_finalized()) {
if (least_common_type.get_base_type_id() == TypeIndex::JSONB) {
// JsonbFiled is special case
Field f = JsonbField();
(*entry->data.data.back()).get(n, f);
map[entry->path.get_path()] = std::move(f);
continue;
res = JsonbField();
}
get_finalized_column().get(n, res);
return;
}

size_t ind = n;
if (ind < num_of_defaults_in_prefix) {
if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
res = Null();
return;
}
map[entry->path.get_path()] = (*entry->data.data.back())[n];
res = least_common_type.get()->get_default();
return;
}
return map;

ind -= num_of_defaults_in_prefix;
for (size_t i = 0; i < data.size(); ++i) {
const auto& part = data[i];
const auto& part_type = data_types[i];
if (ind < part->size()) {
res = vectorized::remove_nullable(part_type)->get_default();
part->get(ind, res);
Field new_field;
convert_field_to_type(res, *least_common_type.get(), &new_field);
res = new_field;
return;
}

ind -= part->size();
}

throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range",
n);
}

Field ColumnObject::operator[](size_t n) const {
Field object;
get(n, object);
return object;
}

void ColumnObject::get(size_t n, Field& res) const {
res = (*this)[n];
assert(n < size());
res = VariantMap();
auto& object = res.get<VariantMap&>();

for (const auto& entry : subcolumns) {
auto it = object.try_emplace(entry->path.get_path()).first;
entry->data.get(n, it->second);
}
}

Status ColumnObject::try_insert_indices_from(const IColumn& src, const int* indices_begin,
Expand Down Expand Up @@ -1380,7 +1415,10 @@ void ColumnObject::strip_outer_array() {

ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const {
if (!is_finalized()) {
const_cast<ColumnObject*>(this)->finalize();
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnObject&>(*finalized);
return finalized_object.apply_for_subcolumns(
[&](const auto& subcolumn) { return subcolumn.filter(filter, count); });
}
auto new_column = ColumnObject::create(true, false);
for (auto& entry : subcolumns) {
Expand Down Expand Up @@ -1545,29 +1583,54 @@ void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* indic
}
}

void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const {
void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback callback) const {
if (!is_finalized()) {
// finalize has no side effect and can be safely used in const functions
const_cast<ColumnObject*>(this)->finalize();
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnObject&>(*finalized);
finalized_object.for_each_imutable_subcolumn(callback);
return;
}
for_each_imutable_subcolumn([&](const auto& subcolumn) {
if (n >= subcolumn.size()) {
LOG(FATAL) << n << " greater than column size " << subcolumn.size()
<< " sub_column_info:" << subcolumn.dump_structure()
<< " total lines of this column " << num_rows;
}
return subcolumn.update_hash_with_value(n, hash);
});
}

void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback callback) const {
for (const auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
callback(*part);
}
}
}

void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const {
for_each_imutable_subcolumn(
[&](const auto& subcolumn) { return subcolumn.update_hash_with_value(n, hash); });
}

void ColumnObject::update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const {
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_hashes_with_value(hashes, nullptr);
});
}

void ColumnObject::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const {
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_xxHash_with_value(start, end, hash, nullptr);
});
}

void ColumnObject::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type,
uint32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_crcs_with_value(hash, type, rows, offset, nullptr);
});
}

void ColumnObject::update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const {
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_crc_with_value(start, end, hash, nullptr);
});
}

std::string ColumnObject::debug_string() const {
std::stringstream res;
res << get_family_name() << "(num_row = " << num_rows;
Expand Down Expand Up @@ -1600,8 +1663,4 @@ Status ColumnObject::sanitize() const {
return Status::OK();
}

void ColumnObject::replace_column_data(const IColumn& col, size_t row, size_t self_row) {
LOG(FATAL) << "Method replace_column_data is not supported for " << get_name();
}

} // namespace doris::vectorized
Loading

0 comments on commit 2994232

Please sign in to comment.