Skip to content

Commit

Permalink
[Improvement](execute) optimize for ColumnNullable's serialize_vec/de…
Browse files Browse the repository at this point in the history
…serialize_vec (#28788)

optimize for ColumnNullable's serialize_vec/deserialize_vec
  • Loading branch information
BiteTheDDDDt authored Jan 11, 2024
1 parent 6d828b6 commit ef64bf7
Show file tree
Hide file tree
Showing 44 changed files with 337 additions and 396 deletions.
47 changes: 30 additions & 17 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,22 @@ struct CommonFindOp {

void find_batch(const BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column,
uint8_t* results) const {
const T* __restrict data = nullptr;
const uint8_t* __restrict nullmap = nullptr;
if (column->is_nullable()) {
const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get());
const auto& nullmap =
assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();
if (nullable->has_null()) {
nullmap =
assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data()
.data();
}
data = (T*)nullable->get_nested_column().get_raw_data().data;
} else {
data = (T*)column->get_raw_data().data;
}

const T* data = (T*)nullable->get_nested_column().get_raw_data().data;
if (nullmap) {
for (size_t i = 0; i < column->size(); i++) {
if (!nullmap[i]) {
results[i] = bloom_filter.test_element(data[i]);
Expand All @@ -327,7 +336,6 @@ struct CommonFindOp {
}
}
} else {
const T* data = (T*)column->get_raw_data().data;
for (size_t i = 0; i < column->size(); i++) {
results[i] = bloom_filter.test_element(data[i]);
}
Expand All @@ -340,8 +348,8 @@ struct CommonFindOp {
};

struct StringFindOp : CommonFindOp<StringRef> {
void insert_batch(BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column,
size_t start) {
static void insert_batch(BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column,
size_t start) {
if (column->is_nullable()) {
const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get());
const auto& col =
Expand All @@ -363,21 +371,26 @@ struct StringFindOp : CommonFindOp<StringRef> {
}
}

void find_batch(const BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column,
uint8_t* results) {
static void find_batch(const BloomFilterAdaptor& bloom_filter,
const vectorized::ColumnPtr& column, uint8_t* results) {
if (column->is_nullable()) {
const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get());
const auto& col =
assert_cast<const vectorized::ColumnString&>(nullable->get_nested_column());
const auto& nullmap =
assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();

for (size_t i = 0; i < column->size(); i++) {
if (!nullmap[i]) {
if (nullable->has_null()) {
for (size_t i = 0; i < column->size(); i++) {
if (!nullmap[i]) {
results[i] = bloom_filter.test_element(col.get_data_at(i));
} else {
results[i] = false;
}
}
} else {
for (size_t i = 0; i < column->size(); i++) {
results[i] = bloom_filter.test_element(col.get_data_at(i));
} else {
results[i] = false;
}
}
} else {
Expand All @@ -392,9 +405,9 @@ struct StringFindOp : CommonFindOp<StringRef> {
// We do not need to judge whether data is empty, because null will not appear
// when filer used by the storage engine
struct FixedStringFindOp : public StringFindOp {
uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data,
const uint8* nullmap, uint16_t* offsets, int number,
const bool is_parse_column) {
static uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data,
const uint8* nullmap, uint16_t* offsets, int number,
const bool is_parse_column) {
return find_batch_olap<StringRef, true>(bloom_filter, data, nullmap, offsets, number,
is_parse_column);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void ExchangeSinkBuffer<Parent>::close() {

template <typename Parent>
bool ExchangeSinkBuffer<Parent>::can_write() const {
size_t max_package_size = 64 * _instance_to_package_queue.size();
size_t max_package_size = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size();
size_t total_package_size = 0;
for (auto& [_, q] : _instance_to_package_queue) {
total_package_size += q.size();
Expand Down
30 changes: 21 additions & 9 deletions be/src/vec/aggregate_functions/aggregate_function_null.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,29 @@ class AggregateFunctionNullUnaryInline final
}
}

void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset,
void add_batch(size_t batch_size, AggregateDataPtr* __restrict places, size_t place_offset,
const IColumn** columns, Arena* arena, bool agg_many) const override {
const ColumnNullable* column = assert_cast<const ColumnNullable*>(columns[0]);
// The overhead introduced is negligible here, just an extra memory read from NullMap
const auto* __restrict null_map_data = column->get_null_map_data().data();
const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
const IColumn* nested_column = &column->get_nested_column();
for (int i = 0; i < batch_size; ++i) {
if (!null_map_data[i]) {
AggregateDataPtr __restrict place = places[i] + place_offset;
this->set_flag(place);
this->nested_function->add(this->nested_place(place), &nested_column, i, arena);
if (column->has_null()) {
const auto* __restrict null_map_data = column->get_null_map_data().data();
for (int i = 0; i < batch_size; ++i) {
if (!null_map_data[i]) {
AggregateDataPtr __restrict place = places[i] + place_offset;
this->set_flag(place);
this->nested_function->add(this->nested_place(place), &nested_column, i, arena);
}
}
} else {
if constexpr (result_is_nullable) {
for (int i = 0; i < batch_size; ++i) {
AggregateDataPtr __restrict place = places[i] + place_offset;
place[0] |= 1;
this->nested_function->add(this->nested_place(place), &nested_column, i, arena);
}
} else {
this->nested_function->add_batch(batch_size, places, place_offset, &nested_column,
arena, agg_many);
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,9 +603,6 @@ class IColumn : public COW<IColumn> {
// true if column has null element [0,size)
virtual bool has_null(size_t size) const { return false; }

/// It's a special kind of column, that contain single value, but is not a ColumnConst.
virtual bool is_dummy() const { return false; }

virtual bool is_exclusive() const { return use_count() == 1; }

/// Clear data of column, just like vector clear
Expand Down Expand Up @@ -733,7 +730,7 @@ using ColumnPtr = IColumn::Ptr;
using MutableColumnPtr = IColumn::MutablePtr;
using Columns = std::vector<ColumnPtr>;
using MutableColumns = std::vector<MutableColumnPtr>;

using ColumnPtrs = std::vector<ColumnPtr>;
using ColumnRawPtrs = std::vector<const IColumn*>;

template <typename... Args>
Expand Down
15 changes: 11 additions & 4 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,17 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp
}

void ColumnArray::insert(const Field& x) {
const Array& array = doris::vectorized::get<const Array&>(x);
size_t size = array.size();
for (size_t i = 0; i < size; ++i) get_data().insert(array[i]);
get_offsets().push_back(get_offsets().back() + size);
if (x.is_null()) {
get_data().insert(Null());
get_offsets().push_back(get_offsets().back() + 1);
} else {
const auto& array = doris::vectorized::get<const Array&>(x);
size_t size = array.size();
for (size_t i = 0; i < size; ++i) {
get_data().insert(array[i]);
}
get_offsets().push_back(get_offsets().back() + size);
}
}

void ColumnArray::insert_from(const IColumn& src_, size_t n) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/columns/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
public:
ColumnPtr convert_to_full_column() const;

ColumnPtr convert_to_full_column_if_const() const override { return convert_to_full_column(); }
ColumnPtr convert_to_full_column_if_const() const override {
return convert_to_full_column()->convert_to_full_column_if_const();
}

ColumnPtr remove_low_cardinality() const;

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/columns/column_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ template <typename T>
void ColumnDecimal<T>::serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
size_t max_row_byte_size) const {
for (size_t i = 0; i < num_rows; ++i) {
memcpy(const_cast<char*>(keys[i].data + keys[i].size), &data[i], sizeof(T));
memcpy_fixed<T>(const_cast<char*>(keys[i].data + keys[i].size), (char*)&data[i]);
keys[i].size += sizeof(T);
}
}
Expand All @@ -89,7 +89,7 @@ void ColumnDecimal<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys,
const uint8_t* null_map) const {
for (size_t i = 0; i < num_rows; ++i) {
if (null_map[i] == 0) {
memcpy(const_cast<char*>(keys[i].data + keys[i].size), &data[i], sizeof(T));
memcpy_fixed<T>(const_cast<char*>(keys[i].data + keys[i].size), (char*)&data[i]);
keys[i].size += sizeof(T);
}
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/columns/column_dummy.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ class IColumnDummy : public IColumn {

void addSize(size_t delta) { s += delta; }

bool is_dummy() const override { return true; }

void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override {
LOG(FATAL) << "should not call the method in column dummy";
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,4 +520,10 @@ size_t ColumnMap::allocated_bytes() const {
get_offsets().allocated_bytes();
}

ColumnPtr ColumnMap::convert_to_full_column_if_const() const {
return ColumnMap::create(keys_column->convert_to_full_column_if_const(),
values_column->convert_to_full_column_if_const(),
offsets_column->convert_to_full_column_if_const());
}

} // namespace doris::vectorized
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
offsets_column->clear();
}

ColumnPtr convert_to_full_column_if_const() const override;

MutableColumnPtr clone_resized(size_t size) const override;

Field operator[](size_t n) const override;
Expand Down
29 changes: 20 additions & 9 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,22 @@ size_t ColumnNullable::get_max_row_byte_size() const {

void ColumnNullable::serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
size_t max_row_byte_size) const {
const auto& arr = get_null_map_data();
static constexpr auto s = sizeof(arr[0]);
for (size_t i = 0; i < num_rows; ++i) {
auto* val = const_cast<char*>(keys[i].data + keys[i].size);
*val = (arr[i] ? 1 : 0);
keys[i].size += s;
if (has_null()) {
const auto& arr = get_null_map_data();
for (size_t i = 0; i < num_rows; ++i) {
auto* val = const_cast<char*>(keys[i].data + keys[i].size);
*val = (arr[i] ? 1 : 0);
keys[i].size++;
}
get_nested_column().serialize_vec_with_null_map(keys, num_rows, arr.data());
} else {
for (size_t i = 0; i < num_rows; ++i) {
auto* val = const_cast<char*>(keys[i].data + keys[i].size);
*val = 0;
keys[i].size++;
}
get_nested_column().serialize_vec(keys, num_rows, max_row_byte_size);
}

get_nested_column().serialize_vec_with_null_map(keys, num_rows, arr.data());
}

void ColumnNullable::deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) {
Expand All @@ -282,7 +289,11 @@ void ColumnNullable::deserialize_vec(std::vector<StringRef>& keys, const size_t
keys[i].data += sizeof(val);
keys[i].size -= sizeof(val);
}
get_nested_column().deserialize_vec_with_null_map(keys, num_rows, arr.data());
if (_has_null) {
get_nested_column().deserialize_vec_with_null_map(keys, num_rows, arr.data());
} else {
get_nested_column().deserialize_vec(keys, num_rows);
}
}

void ColumnNullable::insert_range_from(const IColumn& src, size_t start, size_t length) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
size_t size_of_value_if_fixed() const override {
return null_map->size_of_value_if_fixed() + nested_column->size_of_value_if_fixed();
}
bool only_null() const override { return nested_column->is_dummy(); }

bool only_null() const override { return size() == 1 && is_null_at(0); }

// used in schema change
void change_nested_column(ColumnPtr& other) { ((ColumnPtr&)nested_column) = other; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ void ColumnObject::Subcolumn::finalize() {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
st.to_string() + ", real_code:{}", st.code());
}
part = ptr;
part = ptr->convert_to_full_column_if_const();
}
result_column->insert_range_from(*part, 0, part_size);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/columns/column_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ void ColumnString::serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
uint32_t string_size(size_at(i));

auto* ptr = const_cast<char*>(keys[i].data + keys[i].size);
memcpy(ptr, &string_size, sizeof(string_size));
memcpy_fixed<uint32_t>(ptr, (char*)&string_size);
memcpy(ptr + sizeof(string_size), &chars[offset], string_size);
keys[i].size += sizeof(string_size) + string_size;
}
Expand All @@ -327,7 +327,7 @@ void ColumnString::serialize_vec_with_null_map(std::vector<StringRef>& keys, siz
uint32_t string_size(size_at(i));

auto* ptr = const_cast<char*>(keys[i].data + keys[i].size);
memcpy(ptr, &string_size, sizeof(string_size));
memcpy_fixed<uint32_t>(ptr, (char*)&string_size);
memcpy(ptr + sizeof(string_size), &chars[offset], string_size);
keys[i].size += sizeof(string_size) + string_size;
}
Expand Down
11 changes: 7 additions & 4 deletions be/src/vec/common/hash_table/hash_map_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,14 @@ struct MethodKeysFixed : public MethodBase<TData> {
assert_cast<const ColumnUInt8&>(*nullmap_columns[j]).get_data().data();
for (size_t i = 0; i < row_numbers; ++i) {
// make sure null cell is filled by 0x0
memcpy_fixed<Fixed>((char*)(&result[i]) + offset,
nullmap[i] ? (char*)&zero : data + i * sizeof(Fixed));
memcpy_fixed<Fixed, true>(
(char*)(&result[i]) + offset,
nullmap[i] ? (char*)&zero : data + i * sizeof(Fixed));
}
} else {
for (size_t i = 0; i < row_numbers; ++i) {
memcpy_fixed<Fixed>((char*)(&result[i]) + offset, data + i * sizeof(Fixed));
memcpy_fixed<Fixed, true>((char*)(&result[i]) + offset,
data + i * sizeof(Fixed));
}
}
};
Expand Down Expand Up @@ -474,7 +476,8 @@ struct MethodKeysFixed : public MethodBase<TData> {
auto foo = [&]<typename Fixed>(Fixed zero) {
CHECK_EQ(sizeof(Fixed), size);
for (size_t j = 0; j < num_rows; j++) {
memcpy_fixed<Fixed>(data + j * sizeof(Fixed), (char*)(&input_keys[j]) + pos);
memcpy_fixed<Fixed, true>(data + j * sizeof(Fixed),
(char*)(&input_keys[j]) + pos);
}
};

Expand Down
9 changes: 7 additions & 2 deletions be/src/vec/common/memcpy_small.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ inline void memcpy_small_allow_read_write_overflow15(void* __restrict dst,

#endif

template <typename T>
// assume input address not aligned by default
template <typename T, bool aligned = false>
void memcpy_fixed(char* lhs, const char* rhs) {
*(T*)lhs = *(T*)rhs;
if constexpr (aligned || sizeof(T) <= 8) {
*(T*)lhs = *(T*)rhs;
} else {
memcpy(lhs, rhs, sizeof(T));
}
}
Loading

0 comments on commit ef64bf7

Please sign in to comment.