From 0a980d304a860c2363e3c615095a5d17af73d240 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 27 Sep 2024 01:53:35 +0800 Subject: [PATCH 1/5] Save --- dbms/src/Columns/ColumnArray.cpp | 88 ++++++++++++++++++- dbms/src/Columns/ColumnArray.h | 3 + dbms/src/Columns/IColumn.h | 1 + dbms/src/Core/Block.h | 8 ++ dbms/src/DataTypes/DataTypeArray.cpp | 65 ++++++++++++-- .../tests/gtest_data_type_get_common_type.cpp | 15 ++++ .../Flash/Coprocessor/CHBlockChunkCodec.cpp | 8 +- .../Flash/Coprocessor/CHBlockChunkCodecV1.cpp | 2 +- .../Coprocessor/ChunkDecodeAndSquash.cpp | 8 ++ .../src/Flash/Coprocessor/CoprocessorReader.h | 1 + .../tests/gtest_block_chunk_codec.cpp | 48 ++++++++-- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 5 ++ dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp | 1 + dbms/src/Interpreters/Aggregator.h | 1 + .../Operators/ExchangeReceiverSourceOp.cpp | 7 +- dbms/src/Operators/Operator.cpp | 1 + dbms/src/TestUtils/ColumnGenerator.cpp | 44 ++++++++++ dbms/src/TestUtils/ColumnGenerator.h | 6 +- 18 files changed, 293 insertions(+), 19 deletions(-) diff --git a/dbms/src/Columns/ColumnArray.cpp b/dbms/src/Columns/ColumnArray.cpp index 575f21c796a..aa4ad5d568f 100644 --- a/dbms/src/Columns/ColumnArray.cpp +++ b/dbms/src/Columns/ColumnArray.cpp @@ -32,6 +32,10 @@ #include +#include "Common/FmtUtils.h" +#include "Common/Logger.h" +#include "common/logger_useful.h" + namespace DB { namespace ErrorCodes @@ -52,6 +56,16 @@ ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && if (!typeid_cast(offsets.get())) throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::ILLEGAL_COLUMN); + if (offsets->size() > 1) + { + for (size_t i = 1; i < offsets->size(); ++i) + { + auto prev_off = static_cast(offsets->assumeMutableRef()).getData()[i - 1]; + auto curr_off = static_cast(offsets->assumeMutableRef()).getData()[i]; + RUNTIME_CHECK(curr_off >= prev_off, i, prev_off, curr_off); + } + } + /** NOTE * Arrays with constant value are possible and used in implementation of higher order functions (see FunctionReplicate). * But in most cases, arrays with constant value are unexpected and code will work wrong. Use with caution. @@ -107,6 +121,7 @@ MutableColumnPtr ColumnArray::cloneResized(size_t to_size) const res->getOffsets()[i] = offset; } + res->check(); return res; } @@ -183,6 +198,8 @@ void ColumnArray::insertData(const char * pos, size_t length) throw Exception("Incorrect length argument for method ColumnArray::insertData", ErrorCodes::BAD_ARGUMENTS); getOffsets().push_back((getOffsets().empty() ? 0 : getOffsets().back()) + elems); + + check(); } @@ -196,6 +213,17 @@ StringRef ColumnArray::serializeValueIntoArena( size_t array_size = sizeAt(n); size_t offset = offsetAt(n); + if (array_size > 4294967295) + { + auto col_sz = size(); + FmtBuffer fmt_buf; + for (size_t o = 0; o < col_sz; ++o) + { + fmt_buf.fmtAppend("i={} offset={},", o, offsetAt(o)); + } + LOG_WARNING(Logger::get(), "!!!!! n={} array_size={} offset={} {}", n, array_size, offset, fmt_buf.toString()); + } + char * pos = arena.allocContinue(sizeof(array_size), begin); memcpy(pos, &array_size, sizeof(array_size)); @@ -216,6 +244,9 @@ const char * ColumnArray::deserializeAndInsertFromArena(const char * pos, const pos = getData().deserializeAndInsertFromArena(pos, collator); getOffsets().push_back((getOffsets().empty() ? 0 : getOffsets().back()) + array_size); + + + check(); return pos; } @@ -333,6 +364,8 @@ void ColumnArray::insert(const Field & x) for (size_t i = 0; i < size; ++i) getData().insert(array[i]); getOffsets().push_back((getOffsets().empty() ? 0 : getOffsets().back()) + size); + + check(); } @@ -344,12 +377,16 @@ void ColumnArray::insertFrom(const IColumn & src_, size_t n) getData().insertRangeFrom(src.getData(), offset, size); getOffsets().push_back((getOffsets().empty() ? 0 : getOffsets().back()) + size); + + check(); } void ColumnArray::insertDefault() { getOffsets().push_back(getOffsets().empty() ? 0 : getOffsets().back()); + + check(); } void ColumnArray::insertManyDefaults(size_t length) @@ -359,6 +396,9 @@ void ColumnArray::insertManyDefaults(size_t length) if (!offsets.empty()) v = offsets.back(); offsets.resize_fill(offsets.size() + length, v); + + + check(); } void ColumnArray::popBack(size_t n) @@ -368,6 +408,8 @@ void ColumnArray::popBack(size_t n) if (nested_n) getData().popBack(nested_n); offsets.resize_assume_reserved(offsets.size() - n); + + check(); } @@ -449,6 +491,7 @@ bool ColumnArray::hasEqualOffsets(const ColumnArray & other) const ColumnPtr ColumnArray::convertToFullColumnIfConst() const { + check(); ColumnPtr new_data; if (ColumnPtr full_column = getData().convertToFullColumnIfConst()) @@ -456,6 +499,7 @@ ColumnPtr ColumnArray::convertToFullColumnIfConst() const else new_data = data; + check(); return ColumnArray::create(new_data, offsets); } @@ -524,6 +568,9 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng for (size_t i = 0; i < length; ++i) cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset + prev_max_offset; } + + + check(); } @@ -576,6 +623,7 @@ ColumnPtr ColumnArray::filterNumber(const Filter & filt, ssize_t result_size_hin res_offsets, filt, result_size_hint); +res->check(); return res; } @@ -645,6 +693,7 @@ ColumnPtr ColumnArray::filterString(const Filter & filt, ssize_t result_size_hin } } +res->check(); return res; } @@ -690,6 +739,7 @@ ColumnPtr ColumnArray::filterGeneric(const Filter & filt, ssize_t result_size_hi } } + res->check(); return res; } @@ -714,9 +764,11 @@ ColumnPtr ColumnArray::filterNullable(const Filter & filt, ssize_t result_size_h filt, result_size_hint); - return ColumnArray::create( + auto a =ColumnArray::create( ColumnNullable::create(filtered_array_of_nested.getDataPtr(), std::move(res_null_map)), filtered_offsets); + a->check(); + return a; } ColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_size_hint) const @@ -742,9 +794,11 @@ ColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_size_hint for (size_t i = 0; i < tuple_size; ++i) tuple_columns[i] = static_cast(*temporary_arrays[i]).getDataPtr(); - return ColumnArray::create( + auto a= ColumnArray::create( ColumnTuple::create(tuple_columns), static_cast(*temporary_arrays.front()).getOffsetsPtr()); + a->check(); + return a; } @@ -782,6 +836,8 @@ ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const if (current_offset != 0) res->data = data->permute(nested_perm, current_offset); + res->check(); + return res; } @@ -897,6 +953,8 @@ ColumnPtr ColumnArray::replicateNumber(const Offsets & replicate_offsets) const prev_data_offset = src_offsets[i]; } + array_res.check(); + return res; } @@ -975,6 +1033,8 @@ ColumnPtr ColumnArray::replicateString(const Offsets & replicate_offsets) const prev_src_string_offset += sum_chars_size; } + array_res.check(); + return res; } @@ -1116,6 +1176,8 @@ void ColumnArray::insertFromDatumData(const char * data, size_t length) auto precise_data_size = n * sizeof(Float32); RUNTIME_CHECK(length >= sizeof(UInt32) + precise_data_size, n, length); insertData(data, precise_data_size); + + check(); } std::pair ColumnArray::getElementRef(size_t element_idx) const @@ -1123,4 +1185,26 @@ std::pair ColumnArray::getElementRef(size_t element_idx) cons return {static_cast(sizeAt(element_idx)), getDataAt(element_idx)}; } +void ColumnArray::check() const +{ + if (size() > 1) + { + for (size_t i = 1; i < size(); ++i) + { + auto prev_off = offsetAt(i - 1); + auto curr_off = offsetAt(i); + if (curr_off < prev_off) + { + FmtBuffer fmt_buf; + for (size_t o = 0; o < size(); ++o) + { + fmt_buf.fmtAppend("i={} offset={}, ", o, offsetAt(o)); + } + LOG_WARNING(Logger::get(), "!!!!! check failure!!! {}", fmt_buf.toString()); + } + RUNTIME_CHECK(curr_off >= prev_off, i, prev_off, curr_off); + } + } +} + } // namespace DB diff --git a/dbms/src/Columns/ColumnArray.h b/dbms/src/Columns/ColumnArray.h index f18068e6ea0..44aabbfc7a4 100644 --- a/dbms/src/Columns/ColumnArray.h +++ b/dbms/src/Columns/ColumnArray.h @@ -35,6 +35,7 @@ class ColumnArray final : public COWPtrHelper { private: friend class COWPtrHelper; + friend class DataTypeArray; /** Create an array column with specified values and offsets. */ ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column); @@ -63,6 +64,8 @@ class ColumnArray final : public COWPtrHelper return Base::create(std::forward(args)...); } + void check() const override; + /** On the index i there is an offset to the beginning of the i + 1 -th element. */ using ColumnOffsets = ColumnVector; diff --git a/dbms/src/Columns/IColumn.h b/dbms/src/Columns/IColumn.h index 06a906fdb63..07ba3f908c9 100644 --- a/dbms/src/Columns/IColumn.h +++ b/dbms/src/Columns/IColumn.h @@ -488,6 +488,7 @@ class IColumn : public COWPtr /// Can be inside ColumnNullable. virtual bool canBeInsideNullable() const { return false; } + virtual void check() const {} virtual ~IColumn() = default; diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index 8b90c032256..b50a1681df1 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -57,6 +57,14 @@ class Block public: BlockInfo info; + void ffff() const + { + for (const auto & c : data) + { + c.column->check(); + } + } + Block() = default; Block(std::initializer_list il); explicit Block(const ColumnsWithTypeAndName & data_); diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index aad1538ca45..f9feba93a2d 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -23,6 +23,10 @@ #include #include +#include "Common/Exception.h" +#include "Common/FmtUtils.h" +#include "common/logger_useful.h" + namespace DB { @@ -220,6 +224,19 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( SubstreamPath & path) const { ColumnArray & column_array = typeid_cast(column); + column_array.check(); + + { + const size_t last_offset = (column_array.getOffsets().empty() ? 0 : column_array.getOffsets().back()); + FmtBuffer fmt_buf; + fmt_buf.fmtAppend("column@{} data->size={} offsets=[", fmt::ptr(&column), column_array.data->size()); + for (size_t o = 0; o < column_array.size(); ++o) + { + fmt_buf.fmtAppend("i={} offset={}, ", o, column_array.offsetAt(o)); + } + fmt_buf.fmtAppend("] last_offset={}", last_offset); + LOG_WARNING(Logger::get(), "!!!!! check deserialize 1 !!! {}", fmt_buf.toString()); + } path.push_back(Substream::ArraySizes); if (auto * stream = getter(path)) @@ -236,10 +253,36 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( ColumnArray::Offsets & offset_values = column_array.getOffsets(); IColumn & nested_column = column_array.getData(); + try + { + const size_t last_offset = (offset_values.empty() ? 0 : offset_values.back()); + FmtBuffer fmt_buf; + fmt_buf.fmtAppend("column@{} data->size={} offsets=[", fmt::ptr(&column), column_array.data->size()); + for (size_t o = 0; o < column_array.size(); ++o) + { + fmt_buf.fmtAppend("i={} offset={}, ", o, column_array.offsetAt(o)); + } + fmt_buf.fmtAppend("] last_offset={}", last_offset); + LOG_WARNING(Logger::get(), "!!!!! check deserialize 2 !!! {}", fmt_buf.toString()); + + // column_array.check(); + } + catch (DB::Exception & e) + { + tryLogCurrentException( + Logger::get(), + fmt::format("position_independent_encoding={}", position_independent_encoding)); + throw; + } + /// Number of values corresponding with `offset_values` must be read. - size_t last_offset = (offset_values.empty() ? 0 : offset_values.back()); + const size_t last_offset = (offset_values.empty() ? 0 : offset_values.back()); if (last_offset < nested_column.size()) - throw Exception("Nested column is longer than last offset", ErrorCodes::LOGICAL_ERROR); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Nested column is longer than last offset, last_offset={} nest_column_size={}", + last_offset, + nested_column.size()); size_t nested_limit = last_offset - nested_column.size(); nested->deserializeBinaryBulkWithMultipleStreams( nested_column, @@ -249,13 +292,25 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( position_independent_encoding, path); + column_array.check(); + { + FmtBuffer fmt_buf; + fmt_buf.fmtAppend("column@{} data->size={} offsets=[", fmt::ptr(&column), column_array.data->size()); + for (size_t o = 0; o < column_array.size(); ++o) + { + fmt_buf.fmtAppend("i={} offset={}, ", o, column_array.offsetAt(o)); + } + fmt_buf.fmtAppend("] last_offset={}", last_offset); + LOG_WARNING(Logger::get(), "!!!!! check deserialize 3 !!! {}", fmt_buf.toString()); + } /// Check consistency between offsets and elements subcolumns. /// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER. if (!nested_column.empty() && nested_column.size() != last_offset) throw Exception( - "Cannot read all array values: read just " + toString(nested_column.size()) + " of " - + toString(last_offset), - ErrorCodes::CANNOT_READ_ALL_DATA); + ErrorCodes::CANNOT_READ_ALL_DATA, + "Cannot read all array values: read just {} of {}", + nested_column.size(), + last_offset); } diff --git a/dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp b/dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp index 3ba432a9886..197c955f086 100644 --- a/dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp +++ b/dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp @@ -20,6 +20,8 @@ #include +#include "DataTypes/DataTypeNullable.h" + namespace DB { namespace tests @@ -330,6 +332,19 @@ try // not true for nullable ASSERT_FALSE(ntype->isDateOrDateTime()) << "type: " + type->getName(); } + + { + // array can be wrapped by Nullable + auto type = typeFromString("Array(Float32)"); + ASSERT_NE(type, nullptr); + auto ntype = DataTypeNullable(type); + ASSERT_TRUE(ntype.isNullable()); + } + + { + auto type = typeFromString("Nullable(Array(Float32))"); + ASSERT_TRUE(type->isNullable()); + } } CATCH diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index 265428a521c..89a1c53a977 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -98,7 +98,13 @@ void WriteColumnData(const IDataType & type, const ColumnPtr & column, WriteBuff IDataType::OutputStreamGetter output_stream_getter = [&](const IDataType::SubstreamPath &) { return &ostr; }; - type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {}); + type.serializeBinaryBulkWithMultipleStreams( + *full_column, + output_stream_getter, + offset, + limit, + /*position_independent_encoding*/ true, + {}); } void CHBlockChunkCodec::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows) diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp index fb0ec7de918..b333fb3c722 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp @@ -136,7 +136,7 @@ static inline void decodeColumnsByBlock(ReadBuffer & istr, Block & res, size_t r [&](const IDataType::SubstreamPath &) { return &istr; }, sz, 0, - {}, + /*position_independent_encoding=*/true, {}); } } diff --git a/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp b/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp index ab8e51e999f..75a3c681254 100644 --- a/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp +++ b/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp @@ -57,6 +57,7 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquashV1Impl(ReadBuff if (rows) { DecodeColumns(istr, block, rows, static_cast(rows_limit * 1.5)); + block.ffff(); accumulated_block.emplace(std::move(block)); } } @@ -65,10 +66,12 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquashV1Impl(ReadBuff size_t rows{}; DecodeHeader(istr, codec.header, rows); DecodeColumns(istr, *accumulated_block, rows, 0); + accumulated_block->ffff(); } if (accumulated_block && accumulated_block->rows() >= rows_limit) { + accumulated_block->ffff(); /// Return accumulated data and reset accumulated_block res.swap(accumulated_block); return res; @@ -93,7 +96,10 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & /// so it should be larger than 1.0, just use 1.5 here, no special meaning Block block = codec.decodeImpl(istr, static_cast(rows_limit * 1.5)); if (block) + { + block.ffff(); accumulated_block.emplace(std::move(block)); + } } else { @@ -104,6 +110,7 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & if (rows) { + accumulated_block->ffff(); auto mutable_columns = accumulated_block->mutateColumns(); for (size_t i = 0; i < columns; ++i) { @@ -112,6 +119,7 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & CHBlockChunkCodec::readData(*column.type, *(mutable_columns[i]), istr, rows); } accumulated_block->setColumns(std::move(mutable_columns)); + accumulated_block->ffff(); } } diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 7a246ba1e74..37f43f1d7b2 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -244,6 +244,7 @@ class CoprocessorReader /// CHBlockChunkCodec::decode already checked the schema. if (resp->encode_type() != tipb::EncodeType::TypeCHBlock) assertBlockSchema(header, block, "CoprocessorReader decode chunks"); + block.ffff(); block_queue.push(std::move(block)); } return detail; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp index 62369f4c9e8..c4352aa80ce 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp @@ -14,8 +14,11 @@ #include #include +#include #include +#include #include +#include #include #include @@ -27,13 +30,25 @@ namespace DB::tests static Block prepareBlock(size_t rows) { Block block; - for (size_t i = 0; i < 5; ++i) - { - DataTypePtr int64_data_type = std::make_shared(); - auto int64_column = ColumnGenerator::instance().generate({rows, "Int64", RANDOM}).column; - block.insert( - ColumnWithTypeAndName{std::move(int64_column), int64_data_type, String("col") + std::to_string(i)}); - } + size_t col_idx = 0; + block.insert(ColumnGenerator::instance().generate({ + // + rows, + "Array(Float64)", + RANDOM, + fmt::format("col{}", col_idx), + 128, + // DataDistribution::RANDOM, + DataDistribution::FIXED, + 3, + })); + ++col_idx; + + // for (; col_idx < 5; ++col_idx) + // { + // DataTypePtr int64_data_type = std::make_shared(); + // block.insert(ColumnGenerator::instance().generate({rows, "Int64", RANDOM, fmt::format("col{}", col_idx)})); + // } return block; } @@ -69,7 +84,7 @@ void test_enocde_release_data(VecCol && batch_columns, const Block & header, con } } -TEST(CHBlockChunkCodec, ChunkCodecV1) +TEST(CHBlockChunkCodecTest, ChunkCodecV1) try { size_t block_num = 10; @@ -98,6 +113,7 @@ try ASSERT_EQ(codec.original_size, 0); } { + // test encode one block auto codec = CHBlockChunkCodecV1{ header, }; @@ -140,6 +156,7 @@ try ASSERT_TRUE(col.column); } } + // test encode moved blocks auto codec = CHBlockChunkCodecV1{ header, }; @@ -230,4 +247,19 @@ try } CATCH +TEST(CHBlockChunkCodecTest, ChunkDecodeAndSquash) +{ + auto header = prepareBlock(0); + Blocks blocks = {prepareBlock(11), prepareBlock(17), prepareBlock(23)}; + + CHBlockChunkCodecV1 codec(header); + CHBlockChunkDecodeAndSquash decoder(header, 13); + for (const auto & b : blocks) + { + LOG_INFO(Logger::get(), "ser/deser block {}", getColumnsContent(b.getColumnsWithTypeAndName())); + auto str = codec.encode(b, CompressionMethod::LZ4); + decoder.decodeAndSquashV1(str); + } +} + } // namespace DB::tests diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index ad7fbfbc373..80a5a870665 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -743,6 +743,7 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( detail.rows += result->rows(); if likely (result->rows() > 0) { + result.value().ffff(); block_queue.push(std::move(result.value())); } } @@ -756,6 +757,7 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( if (!result || !result->rows()) continue; detail.rows += result->rows(); + result->ffff(); block_queue.push(std::move(*result)); } return detail; @@ -843,6 +845,8 @@ ExchangeReceiverResult ExchangeReceiverBase::handleUnnormalChannel( std::unique_ptr & decoder_ptr) { std::optional last_block = decoder_ptr->flush(); + if (last_block) + last_block->ffff(); std::lock_guard lock(mu); if (this->state != DB::ExchangeReceiverState::NORMAL) { @@ -860,6 +864,7 @@ ExchangeReceiverResult ExchangeReceiverBase::handleUnnormalChannel( auto result = ExchangeReceiverResult::newOk(nullptr, 0, ""); result.decode_detail.packets = 0; result.decode_detail.rows = last_block->rows(); + last_block->ffff(); block_queue.push(std::move(last_block.value())); return result; } diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp index 5ef36b27766..a7ff288a762 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp @@ -160,6 +160,7 @@ OperatorStatus PipelineExec::fetchBlock(Block & block, size_t & start_transform_ } start_transform_op_index = 0; op_status = source_op->read(block); + block.ffff(); HANDLE_LAST_OP_STATUS(source_op, op_status); } diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 60de0a195c3..6b6003dd7a6 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -1327,6 +1327,7 @@ class Aggregator void resetBlock(const Block & block_) { RUNTIME_CHECK_MSG(allBlockDataHandled(), "Previous block is not processed yet"); + block_.ffff(); block = block_; start_row = 0; end_row = 0; diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp index 8e017fa497c..d27cef3f5b2 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp @@ -24,8 +24,11 @@ void ExchangeReceiverSourceOp::operateSuffixImpl() Block ExchangeReceiverSourceOp::popFromBlockQueue() { assert(!block_queue.empty()); - Block block = std::move(block_queue.front()); + Block block; + block.ffff(); + block = std::move(block_queue.front()); block_queue.pop(); + block.ffff(); return block; } @@ -34,6 +37,7 @@ OperatorStatus ExchangeReceiverSourceOp::readImpl(Block & block) if (!block_queue.empty()) { block = popFromBlockQueue(); + block.ffff(); return OperatorStatus::HAS_OUTPUT; } @@ -87,6 +91,7 @@ OperatorStatus ExchangeReceiverSourceOp::readImpl(Block & block) continue; block = popFromBlockQueue(); + block.ffff(); return OperatorStatus::HAS_OUTPUT; } return await_status; diff --git a/dbms/src/Operators/Operator.cpp b/dbms/src/Operators/Operator.cpp index 87c120a086b..d9a6834b295 100644 --- a/dbms/src/Operators/Operator.cpp +++ b/dbms/src/Operators/Operator.cpp @@ -100,6 +100,7 @@ OperatorStatus SourceOp::read(Block & block) profile_info.anchor(); assert(!block); auto op_status = readImpl(block); + block.ffff(); #ifndef NDEBUG if (op_status == OperatorStatus::HAS_OUTPUT && block) { diff --git a/dbms/src/TestUtils/ColumnGenerator.cpp b/dbms/src/TestUtils/ColumnGenerator.cpp index 69474f1711c..49667c1b70e 100644 --- a/dbms/src/TestUtils/ColumnGenerator.cpp +++ b/dbms/src/TestUtils/ColumnGenerator.cpp @@ -13,10 +13,15 @@ // limitations under the License. #include #include +#include #include #include #include +#include "Common/Exception.h" +#include "Core/Types.h" +#include "magic_enum.hpp" + namespace DB::tests { ColumnWithTypeAndName ColumnGenerator::generateNullMapColumn(const ColumnGeneratorOpts & opts) @@ -134,8 +139,24 @@ ColumnWithTypeAndName ColumnGenerator::generate(const ColumnGeneratorOpts & opts for (size_t i = 0; i < opts.size; ++i) genEnumValue(col, type); break; + case TypeIndex::Array: + { + auto nested_type = typeid_cast(type.get())->getNestedType(); + size_t elems_size = opts.array_elems_max_size; + for (size_t i = 0; i < opts.size; ++i) + { + if (opts.array_elems_distribution == DataDistribution::RANDOM) + elems_size = static_cast(rand_gen()) % opts.array_elems_max_size; + genVector(col, nested_type, elems_size); + } + break; + } default: throw std::invalid_argument("RandomColumnGenerator invalid type"); + throw DB::Exception( + ErrorCodes::LOGICAL_ERROR, + "RandomColumnGenerator invalid type, type_id={}", + magic_enum::enum_name(type_id)); } return {std::move(col), type, opts.name}; @@ -246,4 +267,27 @@ void ColumnGenerator::genDecimal(MutableColumnPtr & col, DataTypePtr & data_type fmt::format("RandomColumnGenerator parseDecimal({}, {}) prec {} scale {} fail", s, negative, prec, scale)); } } + +void ColumnGenerator::genVector(MutableColumnPtr & col, DataTypePtr & nested_type, size_t num_vals) +{ + switch (nested_type->getTypeId()) + { + case TypeIndex::Float32: + case TypeIndex::Float64: + { + Array arr; + for (size_t i = 0; i < num_vals; ++i) + // arr.push_back(static_cast(real_rand_gen(rand_gen))); + arr.push_back(static_cast(2.5)); + col->insert(arr); + break; + } + default: + throw DB::Exception( + ErrorCodes::LOGICAL_ERROR, + "RandomColumnGenerator invalid nested type in Array(...), type_id={}", + magic_enum::enum_name(nested_type->getTypeId())); + } +} + } // namespace DB::tests diff --git a/dbms/src/TestUtils/ColumnGenerator.h b/dbms/src/TestUtils/ColumnGenerator.h index 1722dee83fe..92f02083c2e 100644 --- a/dbms/src/TestUtils/ColumnGenerator.h +++ b/dbms/src/TestUtils/ColumnGenerator.h @@ -25,6 +25,7 @@ namespace DB::tests enum DataDistribution { RANDOM, + FIXED, // TODO support zipf and more distribution. }; @@ -35,6 +36,8 @@ struct ColumnGeneratorOpts DataDistribution distribution; String name = ""; // NOLINT size_t string_max_size = 128; + DataDistribution array_elems_distribution = DataDistribution::RANDOM; + size_t array_elems_max_size = 10; }; class ColumnGenerator : public ext::Singleton @@ -61,5 +64,6 @@ class ColumnGenerator : public ext::Singleton static void genDuration(MutableColumnPtr & col); void genDecimal(MutableColumnPtr & col, DataTypePtr & data_type); void genEnumValue(MutableColumnPtr & col, DataTypePtr & enum_type); + void genVector(MutableColumnPtr & col, DataTypePtr & nested_type, size_t num_vals); }; -} // namespace DB::tests \ No newline at end of file +} // namespace DB::tests From 07fe18a397800217779e1c8b8fecd8dd3411a9e1 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 27 Sep 2024 09:25:33 +0800 Subject: [PATCH 2/5] Remove hack codes Signed-off-by: JaySon-Huang --- dbms/src/Columns/ColumnArray.cpp | 88 +------------------ dbms/src/Columns/ColumnArray.h | 3 - dbms/src/Columns/IColumn.h | 1 - dbms/src/Core/Block.h | 8 -- dbms/src/DataTypes/DataTypeArray.cpp | 51 +---------- .../tests/gtest_data_type_get_common_type.cpp | 5 +- .../Coprocessor/ChunkDecodeAndSquash.cpp | 8 -- .../src/Flash/Coprocessor/CoprocessorReader.h | 1 - .../tests/gtest_block_chunk_codec.cpp | 10 +-- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 5 -- dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp | 1 - dbms/src/Interpreters/Aggregator.h | 1 - .../Operators/ExchangeReceiverSourceOp.cpp | 4 - dbms/src/Operators/Operator.cpp | 1 - dbms/src/TestUtils/ColumnGenerator.cpp | 6 +- 15 files changed, 12 insertions(+), 181 deletions(-) diff --git a/dbms/src/Columns/ColumnArray.cpp b/dbms/src/Columns/ColumnArray.cpp index aa4ad5d568f..575f21c796a 100644 --- a/dbms/src/Columns/ColumnArray.cpp +++ b/dbms/src/Columns/ColumnArray.cpp @@ -32,10 +32,6 @@ #include -#include "Common/FmtUtils.h" -#include "Common/Logger.h" -#include "common/logger_useful.h" - namespace DB { namespace ErrorCodes @@ -56,16 +52,6 @@ ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && if (!typeid_cast(offsets.get())) throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::ILLEGAL_COLUMN); - if (offsets->size() > 1) - { - for (size_t i = 1; i < offsets->size(); ++i) - { - auto prev_off = static_cast(offsets->assumeMutableRef()).getData()[i - 1]; - auto curr_off = static_cast(offsets->assumeMutableRef()).getData()[i]; - RUNTIME_CHECK(curr_off >= prev_off, i, prev_off, curr_off); - } - } - /** NOTE * Arrays with constant value are possible and used in implementation of higher order functions (see FunctionReplicate). * But in most cases, arrays with constant value are unexpected and code will work wrong. Use with caution. @@ -121,7 +107,6 @@ MutableColumnPtr ColumnArray::cloneResized(size_t to_size) const res->getOffsets()[i] = offset; } - res->check(); return res; } @@ -198,8 +183,6 @@ void ColumnArray::insertData(const char * pos, size_t length) throw Exception("Incorrect length argument for method ColumnArray::insertData", ErrorCodes::BAD_ARGUMENTS); getOffsets().push_back((getOffsets().empty() ? 0 : getOffsets().back()) + elems); - - check(); } @@ -213,17 +196,6 @@ StringRef ColumnArray::serializeValueIntoArena( size_t array_size = sizeAt(n); size_t offset = offsetAt(n); - if (array_size > 4294967295) - { - auto col_sz = size(); - FmtBuffer fmt_buf; - for (size_t o = 0; o < col_sz; ++o) - { - fmt_buf.fmtAppend("i={} offset={},", o, offsetAt(o)); - } - LOG_WARNING(Logger::get(), "!!!!! n={} array_size={} offset={} {}", n, array_size, offset, fmt_buf.toString()); - } - char * pos = arena.allocContinue(sizeof(array_size), begin); memcpy(pos, &array_size, sizeof(array_size)); @@ -244,9 +216,6 @@ const char * ColumnArray::deserializeAndInsertFromArena(const char * pos, const pos = getData().deserializeAndInsertFromArena(pos, collator); getOffsets().push_back((getOffsets().empty() ? 0 : getOffsets().back()) + array_size); - - - check(); return pos; } @@ -364,8 +333,6 @@ void ColumnArray::insert(const Field & x) for (size_t i = 0; i < size; ++i) getData().insert(array[i]); getOffsets().push_back((getOffsets().empty() ? 0 : getOffsets().back()) + size); - - check(); } @@ -377,16 +344,12 @@ void ColumnArray::insertFrom(const IColumn & src_, size_t n) getData().insertRangeFrom(src.getData(), offset, size); getOffsets().push_back((getOffsets().empty() ? 0 : getOffsets().back()) + size); - - check(); } void ColumnArray::insertDefault() { getOffsets().push_back(getOffsets().empty() ? 0 : getOffsets().back()); - - check(); } void ColumnArray::insertManyDefaults(size_t length) @@ -396,9 +359,6 @@ void ColumnArray::insertManyDefaults(size_t length) if (!offsets.empty()) v = offsets.back(); offsets.resize_fill(offsets.size() + length, v); - - - check(); } void ColumnArray::popBack(size_t n) @@ -408,8 +368,6 @@ void ColumnArray::popBack(size_t n) if (nested_n) getData().popBack(nested_n); offsets.resize_assume_reserved(offsets.size() - n); - - check(); } @@ -491,7 +449,6 @@ bool ColumnArray::hasEqualOffsets(const ColumnArray & other) const ColumnPtr ColumnArray::convertToFullColumnIfConst() const { - check(); ColumnPtr new_data; if (ColumnPtr full_column = getData().convertToFullColumnIfConst()) @@ -499,7 +456,6 @@ ColumnPtr ColumnArray::convertToFullColumnIfConst() const else new_data = data; - check(); return ColumnArray::create(new_data, offsets); } @@ -568,9 +524,6 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng for (size_t i = 0; i < length; ++i) cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset + prev_max_offset; } - - - check(); } @@ -623,7 +576,6 @@ ColumnPtr ColumnArray::filterNumber(const Filter & filt, ssize_t result_size_hin res_offsets, filt, result_size_hint); -res->check(); return res; } @@ -693,7 +645,6 @@ ColumnPtr ColumnArray::filterString(const Filter & filt, ssize_t result_size_hin } } -res->check(); return res; } @@ -739,7 +690,6 @@ ColumnPtr ColumnArray::filterGeneric(const Filter & filt, ssize_t result_size_hi } } - res->check(); return res; } @@ -764,11 +714,9 @@ ColumnPtr ColumnArray::filterNullable(const Filter & filt, ssize_t result_size_h filt, result_size_hint); - auto a =ColumnArray::create( + return ColumnArray::create( ColumnNullable::create(filtered_array_of_nested.getDataPtr(), std::move(res_null_map)), filtered_offsets); - a->check(); - return a; } ColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_size_hint) const @@ -794,11 +742,9 @@ ColumnPtr ColumnArray::filterTuple(const Filter & filt, ssize_t result_size_hint for (size_t i = 0; i < tuple_size; ++i) tuple_columns[i] = static_cast(*temporary_arrays[i]).getDataPtr(); - auto a= ColumnArray::create( + return ColumnArray::create( ColumnTuple::create(tuple_columns), static_cast(*temporary_arrays.front()).getOffsetsPtr()); - a->check(); - return a; } @@ -836,8 +782,6 @@ ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const if (current_offset != 0) res->data = data->permute(nested_perm, current_offset); - res->check(); - return res; } @@ -953,8 +897,6 @@ ColumnPtr ColumnArray::replicateNumber(const Offsets & replicate_offsets) const prev_data_offset = src_offsets[i]; } - array_res.check(); - return res; } @@ -1033,8 +975,6 @@ ColumnPtr ColumnArray::replicateString(const Offsets & replicate_offsets) const prev_src_string_offset += sum_chars_size; } - array_res.check(); - return res; } @@ -1176,8 +1116,6 @@ void ColumnArray::insertFromDatumData(const char * data, size_t length) auto precise_data_size = n * sizeof(Float32); RUNTIME_CHECK(length >= sizeof(UInt32) + precise_data_size, n, length); insertData(data, precise_data_size); - - check(); } std::pair ColumnArray::getElementRef(size_t element_idx) const @@ -1185,26 +1123,4 @@ std::pair ColumnArray::getElementRef(size_t element_idx) cons return {static_cast(sizeAt(element_idx)), getDataAt(element_idx)}; } -void ColumnArray::check() const -{ - if (size() > 1) - { - for (size_t i = 1; i < size(); ++i) - { - auto prev_off = offsetAt(i - 1); - auto curr_off = offsetAt(i); - if (curr_off < prev_off) - { - FmtBuffer fmt_buf; - for (size_t o = 0; o < size(); ++o) - { - fmt_buf.fmtAppend("i={} offset={}, ", o, offsetAt(o)); - } - LOG_WARNING(Logger::get(), "!!!!! check failure!!! {}", fmt_buf.toString()); - } - RUNTIME_CHECK(curr_off >= prev_off, i, prev_off, curr_off); - } - } -} - } // namespace DB diff --git a/dbms/src/Columns/ColumnArray.h b/dbms/src/Columns/ColumnArray.h index 44aabbfc7a4..f18068e6ea0 100644 --- a/dbms/src/Columns/ColumnArray.h +++ b/dbms/src/Columns/ColumnArray.h @@ -35,7 +35,6 @@ class ColumnArray final : public COWPtrHelper { private: friend class COWPtrHelper; - friend class DataTypeArray; /** Create an array column with specified values and offsets. */ ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column); @@ -64,8 +63,6 @@ class ColumnArray final : public COWPtrHelper return Base::create(std::forward(args)...); } - void check() const override; - /** On the index i there is an offset to the beginning of the i + 1 -th element. */ using ColumnOffsets = ColumnVector; diff --git a/dbms/src/Columns/IColumn.h b/dbms/src/Columns/IColumn.h index 07ba3f908c9..06a906fdb63 100644 --- a/dbms/src/Columns/IColumn.h +++ b/dbms/src/Columns/IColumn.h @@ -488,7 +488,6 @@ class IColumn : public COWPtr /// Can be inside ColumnNullable. virtual bool canBeInsideNullable() const { return false; } - virtual void check() const {} virtual ~IColumn() = default; diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index b50a1681df1..8b90c032256 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -57,14 +57,6 @@ class Block public: BlockInfo info; - void ffff() const - { - for (const auto & c : data) - { - c.column->check(); - } - } - Block() = default; Block(std::initializer_list il); explicit Block(const ColumnsWithTypeAndName & data_); diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index f9feba93a2d..4f4b96ff01c 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -23,10 +24,6 @@ #include #include -#include "Common/Exception.h" -#include "Common/FmtUtils.h" -#include "common/logger_useful.h" - namespace DB { @@ -224,19 +221,6 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( SubstreamPath & path) const { ColumnArray & column_array = typeid_cast(column); - column_array.check(); - - { - const size_t last_offset = (column_array.getOffsets().empty() ? 0 : column_array.getOffsets().back()); - FmtBuffer fmt_buf; - fmt_buf.fmtAppend("column@{} data->size={} offsets=[", fmt::ptr(&column), column_array.data->size()); - for (size_t o = 0; o < column_array.size(); ++o) - { - fmt_buf.fmtAppend("i={} offset={}, ", o, column_array.offsetAt(o)); - } - fmt_buf.fmtAppend("] last_offset={}", last_offset); - LOG_WARNING(Logger::get(), "!!!!! check deserialize 1 !!! {}", fmt_buf.toString()); - } path.push_back(Substream::ArraySizes); if (auto * stream = getter(path)) @@ -253,28 +237,6 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( ColumnArray::Offsets & offset_values = column_array.getOffsets(); IColumn & nested_column = column_array.getData(); - try - { - const size_t last_offset = (offset_values.empty() ? 0 : offset_values.back()); - FmtBuffer fmt_buf; - fmt_buf.fmtAppend("column@{} data->size={} offsets=[", fmt::ptr(&column), column_array.data->size()); - for (size_t o = 0; o < column_array.size(); ++o) - { - fmt_buf.fmtAppend("i={} offset={}, ", o, column_array.offsetAt(o)); - } - fmt_buf.fmtAppend("] last_offset={}", last_offset); - LOG_WARNING(Logger::get(), "!!!!! check deserialize 2 !!! {}", fmt_buf.toString()); - - // column_array.check(); - } - catch (DB::Exception & e) - { - tryLogCurrentException( - Logger::get(), - fmt::format("position_independent_encoding={}", position_independent_encoding)); - throw; - } - /// Number of values corresponding with `offset_values` must be read. const size_t last_offset = (offset_values.empty() ? 0 : offset_values.back()); if (last_offset < nested_column.size()) @@ -292,17 +254,6 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( position_independent_encoding, path); - column_array.check(); - { - FmtBuffer fmt_buf; - fmt_buf.fmtAppend("column@{} data->size={} offsets=[", fmt::ptr(&column), column_array.data->size()); - for (size_t o = 0; o < column_array.size(); ++o) - { - fmt_buf.fmtAppend("i={} offset={}, ", o, column_array.offsetAt(o)); - } - fmt_buf.fmtAppend("] last_offset={}", last_offset); - LOG_WARNING(Logger::get(), "!!!!! check deserialize 3 !!! {}", fmt_buf.toString()); - } /// Check consistency between offsets and elements subcolumns. /// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER. if (!nested_column.empty() && nested_column.size() != last_offset) diff --git a/dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp b/dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp index 197c955f086..946819fd893 100644 --- a/dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp +++ b/dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp @@ -13,15 +13,12 @@ // limitations under the License. #include +#include #include #include #include #include -#include - -#include "DataTypes/DataTypeNullable.h" - namespace DB { namespace tests diff --git a/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp b/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp index 75a3c681254..ab8e51e999f 100644 --- a/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp +++ b/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp @@ -57,7 +57,6 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquashV1Impl(ReadBuff if (rows) { DecodeColumns(istr, block, rows, static_cast(rows_limit * 1.5)); - block.ffff(); accumulated_block.emplace(std::move(block)); } } @@ -66,12 +65,10 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquashV1Impl(ReadBuff size_t rows{}; DecodeHeader(istr, codec.header, rows); DecodeColumns(istr, *accumulated_block, rows, 0); - accumulated_block->ffff(); } if (accumulated_block && accumulated_block->rows() >= rows_limit) { - accumulated_block->ffff(); /// Return accumulated data and reset accumulated_block res.swap(accumulated_block); return res; @@ -96,10 +93,7 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & /// so it should be larger than 1.0, just use 1.5 here, no special meaning Block block = codec.decodeImpl(istr, static_cast(rows_limit * 1.5)); if (block) - { - block.ffff(); accumulated_block.emplace(std::move(block)); - } } else { @@ -110,7 +104,6 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & if (rows) { - accumulated_block->ffff(); auto mutable_columns = accumulated_block->mutateColumns(); for (size_t i = 0; i < columns; ++i) { @@ -119,7 +112,6 @@ std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & CHBlockChunkCodec::readData(*column.type, *(mutable_columns[i]), istr, rows); } accumulated_block->setColumns(std::move(mutable_columns)); - accumulated_block->ffff(); } } diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 37f43f1d7b2..7a246ba1e74 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -244,7 +244,6 @@ class CoprocessorReader /// CHBlockChunkCodec::decode already checked the schema. if (resp->encode_type() != tipb::EncodeType::TypeCHBlock) assertBlockSchema(header, block, "CoprocessorReader decode chunks"); - block.ffff(); block_queue.push(std::move(block)); } return detail; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp index c4352aa80ce..ccbfa2c784f 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp @@ -44,11 +44,11 @@ static Block prepareBlock(size_t rows) })); ++col_idx; - // for (; col_idx < 5; ++col_idx) - // { - // DataTypePtr int64_data_type = std::make_shared(); - // block.insert(ColumnGenerator::instance().generate({rows, "Int64", RANDOM, fmt::format("col{}", col_idx)})); - // } + for (; col_idx < 5; ++col_idx) + { + DataTypePtr int64_data_type = std::make_shared(); + block.insert(ColumnGenerator::instance().generate({rows, "Int64", RANDOM, fmt::format("col{}", col_idx)})); + } return block; } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 80a5a870665..ad7fbfbc373 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -743,7 +743,6 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( detail.rows += result->rows(); if likely (result->rows() > 0) { - result.value().ffff(); block_queue.push(std::move(result.value())); } } @@ -757,7 +756,6 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( if (!result || !result->rows()) continue; detail.rows += result->rows(); - result->ffff(); block_queue.push(std::move(*result)); } return detail; @@ -845,8 +843,6 @@ ExchangeReceiverResult ExchangeReceiverBase::handleUnnormalChannel( std::unique_ptr & decoder_ptr) { std::optional last_block = decoder_ptr->flush(); - if (last_block) - last_block->ffff(); std::lock_guard lock(mu); if (this->state != DB::ExchangeReceiverState::NORMAL) { @@ -864,7 +860,6 @@ ExchangeReceiverResult ExchangeReceiverBase::handleUnnormalChannel( auto result = ExchangeReceiverResult::newOk(nullptr, 0, ""); result.decode_detail.packets = 0; result.decode_detail.rows = last_block->rows(); - last_block->ffff(); block_queue.push(std::move(last_block.value())); return result; } diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp index a7ff288a762..5ef36b27766 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp @@ -160,7 +160,6 @@ OperatorStatus PipelineExec::fetchBlock(Block & block, size_t & start_transform_ } start_transform_op_index = 0; op_status = source_op->read(block); - block.ffff(); HANDLE_LAST_OP_STATUS(source_op, op_status); } diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 6b6003dd7a6..60de0a195c3 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -1327,7 +1327,6 @@ class Aggregator void resetBlock(const Block & block_) { RUNTIME_CHECK_MSG(allBlockDataHandled(), "Previous block is not processed yet"); - block_.ffff(); block = block_; start_row = 0; end_row = 0; diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp index d27cef3f5b2..9a200742e50 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp @@ -25,10 +25,8 @@ Block ExchangeReceiverSourceOp::popFromBlockQueue() { assert(!block_queue.empty()); Block block; - block.ffff(); block = std::move(block_queue.front()); block_queue.pop(); - block.ffff(); return block; } @@ -37,7 +35,6 @@ OperatorStatus ExchangeReceiverSourceOp::readImpl(Block & block) if (!block_queue.empty()) { block = popFromBlockQueue(); - block.ffff(); return OperatorStatus::HAS_OUTPUT; } @@ -91,7 +88,6 @@ OperatorStatus ExchangeReceiverSourceOp::readImpl(Block & block) continue; block = popFromBlockQueue(); - block.ffff(); return OperatorStatus::HAS_OUTPUT; } return await_status; diff --git a/dbms/src/Operators/Operator.cpp b/dbms/src/Operators/Operator.cpp index d9a6834b295..87c120a086b 100644 --- a/dbms/src/Operators/Operator.cpp +++ b/dbms/src/Operators/Operator.cpp @@ -100,7 +100,6 @@ OperatorStatus SourceOp::read(Block & block) profile_info.anchor(); assert(!block); auto op_status = readImpl(block); - block.ffff(); #ifndef NDEBUG if (op_status == OperatorStatus::HAS_OUTPUT && block) { diff --git a/dbms/src/TestUtils/ColumnGenerator.cpp b/dbms/src/TestUtils/ColumnGenerator.cpp index 49667c1b70e..c72b8fca6bf 100644 --- a/dbms/src/TestUtils/ColumnGenerator.cpp +++ b/dbms/src/TestUtils/ColumnGenerator.cpp @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. #include +#include #include +#include #include #include #include #include -#include "Common/Exception.h" -#include "Core/Types.h" -#include "magic_enum.hpp" +#include namespace DB::tests { From 1ef124914a8e45fdb978225a3f9060ff913df882 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 27 Sep 2024 09:38:42 +0800 Subject: [PATCH 3/5] Refine test case --- .../tests/gtest_block_chunk_codec.cpp | 38 ++++++++++++++++--- dbms/src/TestUtils/ColumnGenerator.cpp | 17 ++++++--- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp index ccbfa2c784f..1cfdfeb66e5 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp @@ -26,7 +26,7 @@ namespace DB::tests { -// Return a block with **rows** and 5 Int64 column. +// Return a block with **rows**, containing a random elems size array(f32) and 5 Int64 column. static Block prepareBlock(size_t rows) { Block block; @@ -34,11 +34,35 @@ static Block prepareBlock(size_t rows) block.insert(ColumnGenerator::instance().generate({ // rows, - "Array(Float64)", + "Array(Float32)", + RANDOM, + fmt::format("col{}", col_idx), + 128, + DataDistribution::RANDOM, + 3, + })); + ++col_idx; + + for (; col_idx < 5; ++col_idx) + { + DataTypePtr int64_data_type = std::make_shared(); + block.insert(ColumnGenerator::instance().generate({rows, "Int64", RANDOM, fmt::format("col{}", col_idx)})); + } + return block; +} + +// Return a block with **rows**, containing a fixed elems size array(f32) and 5 Int64 column. +static Block prepareBlockWithFixedVecF32(size_t rows) +{ + Block block; + size_t col_idx = 0; + block.insert(ColumnGenerator::instance().generate({ + // + rows, + "Array(Float32)", RANDOM, fmt::format("col{}", col_idx), 128, - // DataDistribution::RANDOM, DataDistribution::FIXED, 3, })); @@ -249,8 +273,12 @@ CATCH TEST(CHBlockChunkCodecTest, ChunkDecodeAndSquash) { - auto header = prepareBlock(0); - Blocks blocks = {prepareBlock(11), prepareBlock(17), prepareBlock(23)}; + auto header = prepareBlockWithFixedVecF32(0); + Blocks blocks = { + prepareBlockWithFixedVecF32(11), + prepareBlockWithFixedVecF32(17), + prepareBlockWithFixedVecF32(23), + }; CHBlockChunkCodecV1 codec(header); CHBlockChunkDecodeAndSquash decoder(header, 13); diff --git a/dbms/src/TestUtils/ColumnGenerator.cpp b/dbms/src/TestUtils/ColumnGenerator.cpp index c72b8fca6bf..a7b31b4ab6f 100644 --- a/dbms/src/TestUtils/ColumnGenerator.cpp +++ b/dbms/src/TestUtils/ColumnGenerator.cpp @@ -37,6 +37,7 @@ ColumnWithTypeAndName ColumnGenerator::generateNullMapColumn(const ColumnGenerat ColumnWithTypeAndName ColumnGenerator::generate(const ColumnGeneratorOpts & opts) { + RUNTIME_CHECK(opts.distribution == DataDistribution::RANDOM); DataTypePtr type; if (opts.type_name == "Decimal") type = createDecimalType(); @@ -152,7 +153,6 @@ ColumnWithTypeAndName ColumnGenerator::generate(const ColumnGeneratorOpts & opts break; } default: - throw std::invalid_argument("RandomColumnGenerator invalid type"); throw DB::Exception( ErrorCodes::LOGICAL_ERROR, "RandomColumnGenerator invalid type, type_id={}", @@ -263,8 +263,13 @@ void ColumnGenerator::genDecimal(MutableColumnPtr & col, DataTypePtr & data_type } else { - throw std::invalid_argument( - fmt::format("RandomColumnGenerator parseDecimal({}, {}) prec {} scale {} fail", s, negative, prec, scale)); + throw DB::Exception( + ErrorCodes::LOGICAL_ERROR, + "RandomColumnGenerator parseDecimal({}, {}) prec {} scale {} fail", + s, + negative, + prec, + scale); } } @@ -277,8 +282,10 @@ void ColumnGenerator::genVector(MutableColumnPtr & col, DataTypePtr & nested_typ { Array arr; for (size_t i = 0; i < num_vals; ++i) - // arr.push_back(static_cast(real_rand_gen(rand_gen))); - arr.push_back(static_cast(2.5)); + { + arr.push_back(static_cast(real_rand_gen(rand_gen))); + // arr.push_back(static_cast(2.5)); + } col->insert(arr); break; } From 9fc7841b0b4de404b854a2eaff3f38fef42f6401 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 27 Sep 2024 15:53:19 +0800 Subject: [PATCH 4/5] Add more ut case --- dbms/src/DataTypes/DataTypeArray.cpp | 20 ++++- dbms/src/DataTypes/IDataType.h | 6 +- .../Flash/Coprocessor/CHBlockChunkCodec.cpp | 10 ++- .../tests/gtest_block_chunk_codec.cpp | 80 ++++++++++++++++++- dbms/src/TestUtils/ColumnGenerator.h | 4 + 5 files changed, 112 insertions(+), 8 deletions(-) diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index 4f4b96ff01c..ae36c61b3e6 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -112,13 +112,12 @@ void serializeArraySizesPositionIndependent(const IColumn & column, WriteBuffer { const ColumnArray & column_array = typeid_cast(column); const ColumnArray::Offsets & offset_values = column_array.getOffsets(); - size_t size = offset_values.size(); - if (!size) + size_t size = offset_values.size(); + if (size == 0) return; size_t end = limit && (offset + limit < size) ? offset + limit : size; - ColumnArray::Offset prev_offset = offset == 0 ? 0 : offset_values[offset - 1]; for (size_t i = offset; i < end; ++i) { @@ -174,6 +173,12 @@ void DataTypeArray::serializeBinaryBulkWithMultipleStreams( path.push_back(Substream::ArraySizes); if (auto * stream = getter(path)) { + // `position_independent_encoding == false` indicates that the `column_array.offsets` + // is serialized as is, which can provide better performance but only supports + // deserialization into an empty column. Conversely, when `position_independent_encoding == true`, + // the `column_array.offsets` is encoded into a format that supports deserializing + // and appending data into a column containing existing data. + // If you are unsure, set position_independent_encoding to true. if (position_independent_encoding) serializeArraySizesPositionIndependent(column, *stream, offset, limit); else @@ -225,11 +230,20 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( path.push_back(Substream::ArraySizes); if (auto * stream = getter(path)) { + // `position_independent_encoding == false` indicates that the `column_array.offsets` + // is serialized as is, which can provide better performance but only supports + // deserialization into an empty column. Conversely, when `position_independent_encoding == true`, + // the `column_array.offsets` is encoded into a format that supports deserializing + // and appending data into a column containing existing data. + // If you are unsure, set position_independent_encoding to true. if (position_independent_encoding) deserializeArraySizesPositionIndependent(column, *stream, limit); else + { + DataTypeNumber() .deserializeBinaryBulk(column_array.getOffsetsColumn(), *stream, limit, 0); + } } path.back() = Substream::ArrayElements; diff --git a/dbms/src/DataTypes/IDataType.h b/dbms/src/DataTypes/IDataType.h index 94d4fe5d0ad..9641c9eee66 100644 --- a/dbms/src/DataTypes/IDataType.h +++ b/dbms/src/DataTypes/IDataType.h @@ -124,6 +124,8 @@ class IDataType : private boost::noncopyable * offset must be not greater than size of column. * offset + limit could be greater than size of column * - in that case, column is serialized till the end. + * `position_independent_encoding` - provide better performance when it is false, but it requires not to be + * deserialized the data into a column with existing data. */ virtual void serializeBinaryBulkWithMultipleStreams( const IColumn & column, @@ -149,7 +151,9 @@ class IDataType : private boost::noncopyable } /** Read no more than limit values and append them into column. - * avg_value_size_hint - if not zero, may be used to avoid reallocations while reading column of String type. + * `avg_value_size_hint` - if not zero, may be used to avoid reallocations while reading column of String type. + * `position_independent_encoding` - provide better performance when it is false, but it requires not to be + * deserialized the data into a column with existing data. */ virtual void deserializeBinaryBulkWithMultipleStreams( IColumn & column, diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index 89a1c53a977..f2d6cf0d3f5 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -103,7 +103,7 @@ void WriteColumnData(const IDataType & type, const ColumnPtr & column, WriteBuff output_stream_getter, offset, limit, - /*position_independent_encoding*/ true, + /*position_independent_encoding=*/true, {}); } @@ -112,7 +112,13 @@ void CHBlockChunkCodec::readData(const IDataType & type, IColumn & column, ReadB IDataType::InputStreamGetter input_stream_getter = [&](const IDataType::SubstreamPath &) { return &istr; }; - type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, 0, false, {}); + type.deserializeBinaryBulkWithMultipleStreams( + column, + input_stream_getter, + rows, + 0, + /*position_independent_encoding=*/true, + {}); } size_t ApproxBlockBytes(const Block & block) diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp index 1cfdfeb66e5..c43f876a6c0 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -22,6 +23,11 @@ #include #include +#include + +#include "Common/formatReadable.h" +#include "common/logger_useful.h" + namespace DB::tests { @@ -279,15 +285,85 @@ TEST(CHBlockChunkCodecTest, ChunkDecodeAndSquash) prepareBlockWithFixedVecF32(17), prepareBlockWithFixedVecF32(23), }; + size_t num_rows = 0; CHBlockChunkCodecV1 codec(header); CHBlockChunkDecodeAndSquash decoder(header, 13); + size_t num_rows_decoded = 0; + Blocks blocks_decoded; + auto check = [&](std::optional && block_opt) { + if (block_opt) + { + block_opt->checkNumberOfRows(); + num_rows_decoded += block_opt->rows(); + blocks_decoded.emplace_back(std::move(*block_opt)); + } + }; for (const auto & b : blocks) { - LOG_INFO(Logger::get(), "ser/deser block {}", getColumnsContent(b.getColumnsWithTypeAndName())); + num_rows += b.rows(); + LOG_DEBUG(Logger::get(), "ser/deser block {}", getColumnsContent(b.getColumnsWithTypeAndName())); auto str = codec.encode(b, CompressionMethod::LZ4); - decoder.decodeAndSquashV1(str); + check(decoder.decodeAndSquashV1(str)); } + check(decoder.flush()); + ASSERT_EQ(num_rows, num_rows_decoded); + + auto input_block = vstackBlocks(std::move(blocks)); + auto decoded_block = vstackBlocks(std::move(blocks_decoded)); + ASSERT_BLOCK_EQ(input_block, decoded_block); +} + + +TEST(CHBlockChunkCodecTest, ChunkDecodeAndSquashRandom) +{ + std::mt19937_64 rand_gen; + + auto header = prepareBlockWithFixedVecF32(0); + size_t num_blocks = std::uniform_int_distribution(1, 64)(rand_gen); + size_t num_rows = 0; + Blocks blocks; + for (size_t i = 0; i < num_blocks; ++i) + { + auto b = prepareBlockWithFixedVecF32(std::uniform_int_distribution<>(0, 8192)(rand_gen)); + num_rows += b.rows(); + blocks.emplace_back(std::move(b)); + } + + LOG_DEBUG(Logger::get(), "generate blocks, num_blocks={} num_rows={}", num_blocks, num_rows); + + CHBlockChunkCodecV1 codec(header); + CHBlockChunkDecodeAndSquash decoder(header, 1024); + size_t num_rows_decoded = 0; + size_t num_bytes = 0; + Blocks blocks_decoded; + auto check = [&](std::optional && block_opt) { + if (block_opt) + { + block_opt->checkNumberOfRows(); + num_rows_decoded += block_opt->rows(); + blocks_decoded.emplace_back(std::move(*block_opt)); + } + }; + for (const auto & b : blocks) + { + // LOG_DEBUG(Logger::get(), "ser/deser block {}", getColumnsContent(b.getColumnsWithTypeAndName())); + auto str = codec.encode(b, CompressionMethod::LZ4); + num_bytes += str.size(); + check(decoder.decodeAndSquashV1(str)); + } + check(decoder.flush()); + ASSERT_EQ(num_rows, num_rows_decoded); + LOG_DEBUG( + Logger::get(), + "ser/deser done, num_blocks={} num_rows={} num_bytes={}", + num_blocks, + num_rows, + formatReadableSizeWithBinarySuffix(num_bytes)); + + auto input_block = vstackBlocks(std::move(blocks)); + auto decoded_block = vstackBlocks(std::move(blocks_decoded)); + ASSERT_BLOCK_EQ(input_block, decoded_block); } } // namespace DB::tests diff --git a/dbms/src/TestUtils/ColumnGenerator.h b/dbms/src/TestUtils/ColumnGenerator.h index 92f02083c2e..515f303bfe9 100644 --- a/dbms/src/TestUtils/ColumnGenerator.h +++ b/dbms/src/TestUtils/ColumnGenerator.h @@ -36,6 +36,10 @@ struct ColumnGeneratorOpts DataDistribution distribution; String name = ""; // NOLINT size_t string_max_size = 128; + // - `array_elems_distribution == RANDOM` generate array with random num of elems + // the range for num of elems is [0, array_elems_max_size) + // - `array_elems_distribution == RANDOM` generate array with fixed num of elems + // the num of elems == array_elems_max_size DataDistribution array_elems_distribution = DataDistribution::RANDOM; size_t array_elems_max_size = 10; }; From 011c1e2f4f5a406bfbcf096d42d8e633f3a4a29b Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 27 Sep 2024 16:08:19 +0800 Subject: [PATCH 5/5] Add check and format files --- dbms/src/DataTypes/DataTypeArray.cpp | 5 +- dbms/src/DataTypes/IDataType.h | 56 +++++++++---------- .../tests/gtest_block_chunk_codec.cpp | 10 ++-- .../Operators/ExchangeReceiverSourceOp.cpp | 3 +- 4 files changed, 39 insertions(+), 35 deletions(-) diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index ae36c61b3e6..9dd14f2b5de 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -240,7 +240,10 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( deserializeArraySizesPositionIndependent(column, *stream, limit); else { - + RUNTIME_CHECK_MSG( + column_array.getOffsetsColumn().empty(), + "try to deserialize Array type to non-empty column without position idenpendent encoding, type_name={}", + getName()); DataTypeNumber() .deserializeBinaryBulk(column_array.getOffsetsColumn(), *stream, limit, 0); } diff --git a/dbms/src/DataTypes/IDataType.h b/dbms/src/DataTypes/IDataType.h index 9641c9eee66..a9fcab5e06f 100644 --- a/dbms/src/DataTypes/IDataType.h +++ b/dbms/src/DataTypes/IDataType.h @@ -54,7 +54,7 @@ class IDataType : private boost::noncopyable /// static constexpr bool is_parametric = false; /// Name of data type (examples: UInt64, Array(String)). - virtual String getName() const { return getFamilyName(); }; + virtual String getName() const { return getFamilyName(); } virtual TypeIndex getTypeId() const = 0; @@ -299,61 +299,61 @@ class IDataType : private boost::noncopyable /** Can appear in table definition. * Counterexamples: Interval, Nothing. */ - virtual bool cannotBeStoredInTables() const { return false; }; + virtual bool cannotBeStoredInTables() const { return false; } /** In text formats that render "pretty" tables, * is it better to align value right in table cell. * Examples: numbers, even nullable. */ - virtual bool shouldAlignRightInPrettyFormats() const { return false; }; + virtual bool shouldAlignRightInPrettyFormats() const { return false; } /** Does formatted value in any text format can contain anything but valid UTF8 sequences. * Example: String (because it can contain arbitary bytes). * Counterexamples: numbers, Date, DateTime. * For Enum, it depends. */ - virtual bool textCanContainOnlyValidUTF8() const { return false; }; + virtual bool textCanContainOnlyValidUTF8() const { return false; } /** Is it possible to compare for less/greater, to calculate min/max? * Not necessarily totally comparable. For example, floats are comparable despite the fact that NaNs compares to nothing. * The same for nullable of comparable types: they are comparable (but not totally-comparable). */ - virtual bool isComparable() const { return false; }; + virtual bool isComparable() const { return false; } /** Does it make sense to use this type with COLLATE modifier in ORDER BY. * Example: String, but not FixedString. */ - virtual bool canBeComparedWithCollation() const { return false; }; + virtual bool canBeComparedWithCollation() const { return false; } /** If the type is totally comparable (Ints, Date, DateTime, not nullable, not floats) * and "simple" enough (not String, FixedString) to be used as version number * (to select rows with maximum version). */ - virtual bool canBeUsedAsVersion() const { return false; }; + virtual bool canBeUsedAsVersion() const { return false; } /** Values of data type can be summed (possibly with overflow, within the same data type). * Example: numbers, even nullable. Not Date/DateTime. Not Enum. * Enums can be passed to aggregate function 'sum', but the result is Int64, not Enum, so they are not summable. */ - virtual bool isSummable() const { return false; }; + virtual bool isSummable() const { return false; } /** Can be used in operations like bit and, bit shift, bit not, etc. */ - virtual bool canBeUsedInBitOperations() const { return false; }; + virtual bool canBeUsedInBitOperations() const { return false; } /** Can be used in boolean context (WHERE, HAVING). * UInt8, maybe nullable. */ - virtual bool canBeUsedInBooleanContext() const { return false; }; + virtual bool canBeUsedInBooleanContext() const { return false; } /** Integers, floats, not Nullable. Not Enums. Not Date/DateTime. */ - virtual bool isNumber() const { return false; }; + virtual bool isNumber() const { return false; } /** Integers. Not Nullable. Not Enums. Not Date/DateTime. */ - virtual bool isInteger() const { return false; }; - virtual bool isUnsignedInteger() const { return false; }; + virtual bool isInteger() const { return false; } + virtual bool isUnsignedInteger() const { return false; } /** Floating point values. Not Nullable. Not Enums. Not Date/DateTime. */ @@ -361,27 +361,27 @@ class IDataType : private boost::noncopyable /** Date, DateTime, MyDate, MyDateTime. Not Nullable. */ - virtual bool isDateOrDateTime() const { return false; }; + virtual bool isDateOrDateTime() const { return false; } /** MyDate, MyDateTime. Not Nullable. */ - virtual bool isMyDateOrMyDateTime() const { return false; }; + virtual bool isMyDateOrMyDateTime() const { return false; } /** MyTime. Not Nullable. */ - virtual bool isMyTime() const { return false; }; + virtual bool isMyTime() const { return false; } /** Decimal. Not Nullable. */ - virtual bool isDecimal() const { return false; }; + virtual bool isDecimal() const { return false; } /** Numbers, Enums, Date, DateTime, MyDate, MyDateTime. Not nullable. */ - virtual bool isValueRepresentedByNumber() const { return false; }; + virtual bool isValueRepresentedByNumber() const { return false; } /** Integers, Enums, Date, DateTime, MyDate, MyDateTime. Not nullable. */ - virtual bool isValueRepresentedByInteger() const { return false; }; + virtual bool isValueRepresentedByInteger() const { return false; } /** Values are unambiguously identified by contents of contiguous memory region, * that can be obtained by IColumn::getDataAt method. @@ -390,23 +390,23 @@ class IDataType : private boost::noncopyable * (because Array(String) values became ambiguous if you concatenate Strings). * Counterexamples: Nullable, Tuple. */ - virtual bool isValueUnambiguouslyRepresentedInContiguousMemoryRegion() const { return false; }; + virtual bool isValueUnambiguouslyRepresentedInContiguousMemoryRegion() const { return false; } virtual bool isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion() const { return isValueUnambiguouslyRepresentedInContiguousMemoryRegion() && (isValueRepresentedByNumber() || isFixedString()); - }; + } - virtual bool isString() const { return false; }; - virtual bool isFixedString() const { return false; }; - virtual bool isStringOrFixedString() const { return isString() || isFixedString(); }; + virtual bool isString() const { return false; } + virtual bool isFixedString() const { return false; } + virtual bool isStringOrFixedString() const { return isString() || isFixedString(); } /** Example: numbers, Date, DateTime, FixedString, Enum... Nullable and Tuple of such types. * Counterexamples: String, Array. * It's Ok to return false for AggregateFunction despite the fact that some of them have fixed size state. */ - virtual bool haveMaximumSizeOfValue() const { return false; }; + virtual bool haveMaximumSizeOfValue() const { return false; } /** Size in amount of bytes in memory. Throws an exception if not haveMaximumSizeOfValue. */ @@ -418,9 +418,9 @@ class IDataType : private boost::noncopyable /** Integers (not floats), Enum, String, FixedString. */ - virtual bool isCategorial() const { return false; }; + virtual bool isCategorial() const { return false; } - virtual bool isEnum() const { return false; }; + virtual bool isEnum() const { return false; } virtual bool isNullable() const { return false; } /** Is this type can represent only NULL value? (It also implies isNullable) @@ -429,7 +429,7 @@ class IDataType : private boost::noncopyable /** If this data type cannot be wrapped in Nullable data type. */ - virtual bool canBeInsideNullable() const { return false; }; + virtual bool canBeInsideNullable() const { return false; } /// Updates avg_value_size_hint for newly read column. Uses to optimize deserialization. Zero expected for first column. static void updateAvgValueSizeHint(const IColumn & column, double & avg_value_size_hint); diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp index c43f876a6c0..28875d6c689 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -21,13 +22,10 @@ #include #include #include -#include +#include #include -#include "Common/formatReadable.h" -#include "common/logger_useful.h" - namespace DB::tests { @@ -278,6 +276,7 @@ try CATCH TEST(CHBlockChunkCodecTest, ChunkDecodeAndSquash) +try { auto header = prepareBlockWithFixedVecF32(0); Blocks blocks = { @@ -313,9 +312,11 @@ TEST(CHBlockChunkCodecTest, ChunkDecodeAndSquash) auto decoded_block = vstackBlocks(std::move(blocks_decoded)); ASSERT_BLOCK_EQ(input_block, decoded_block); } +CATCH TEST(CHBlockChunkCodecTest, ChunkDecodeAndSquashRandom) +try { std::mt19937_64 rand_gen; @@ -365,5 +366,6 @@ TEST(CHBlockChunkCodecTest, ChunkDecodeAndSquashRandom) auto decoded_block = vstackBlocks(std::move(blocks_decoded)); ASSERT_BLOCK_EQ(input_block, decoded_block); } +CATCH } // namespace DB::tests diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp index 9a200742e50..8e017fa497c 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp @@ -24,8 +24,7 @@ void ExchangeReceiverSourceOp::operateSuffixImpl() Block ExchangeReceiverSourceOp::popFromBlockQueue() { assert(!block_queue.empty()); - Block block; - block = std::move(block_queue.front()); + Block block = std::move(block_queue.front()); block_queue.pop(); return block; }