Skip to content

Commit

Permalink
update data type and serde ut
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan committed Nov 28, 2024
1 parent ffce1e2 commit 363fc27
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 38 deletions.
184 changes: 150 additions & 34 deletions be/test/vec/data_types/common_data_type_serder_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <arrow/record_batch.h>
#include <gen_cpp/data.pb.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
Expand All @@ -24,6 +25,10 @@
#include <iostream>

#include "olap/schema.h"
#include "runtime/descriptors.cpp"
#include "runtime/descriptors.h"
#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
Expand All @@ -35,6 +40,7 @@
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/utils/arrow_column_to_doris_column.h"

// this test is gonna to be a data type serialize and deserialize functions
// such as
Expand Down Expand Up @@ -82,7 +88,7 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
std::set<int> idxes, const std::string& column_data_file,
std::function<void(MutableColumns& load_cols, DataTypeSerDeSPtrs serders)>
assert_callback,
bool is_hive_format = false) {
bool is_hive_format = false, DataTypes dataTypes = {}) {
ASSERT_EQ(serders.size(), columns.size());
// Step 1: Insert data from `column_data_file` into the column and check result with `check_data_file`
// Load column data and expected data from CSV files
Expand All @@ -92,11 +98,11 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
if (S_ISREG(buff.st_mode)) {
// file
if (is_hive_format) {
load_data_and_assert_from_csv<true>(serders, columns, column_data_file,
col_spliter, idxes);
load_data_and_assert_from_csv<true, true>(serders, columns, column_data_file,
col_spliter, idxes);
} else {
load_data_and_assert_from_csv<false>(serders, columns, column_data_file,
col_spliter, idxes);
load_data_and_assert_from_csv<false, true>(serders, columns, column_data_file,
col_spliter, idxes);
}
} else if (S_ISDIR(buff.st_mode)) {
// dir
Expand All @@ -105,11 +111,11 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
std::string file_path = entry.path().string();
std::cout << "load data from file: " << file_path << std::endl;
if (is_hive_format) {
load_data_and_assert_from_csv<true>(serders, columns, file_path,
col_spliter, idxes);
load_data_and_assert_from_csv<true, true>(serders, columns, file_path,
col_spliter, idxes);
} else {
load_data_and_assert_from_csv<false>(serders, columns, file_path,
col_spliter, idxes);
load_data_and_assert_from_csv<false, true>(serders, columns, file_path,
col_spliter, idxes);
}
}
}
Expand All @@ -120,7 +126,7 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
}

// Helper function to load data from CSV, with index which splited by spliter and load to columns
template <bool is_hive_format>
template <bool is_hive_format, bool generate_res_file>
static void load_data_and_assert_from_csv(const DataTypeSerDeSPtrs serders,
MutableColumns& columns, const std::string& file_path,
const char spliter = ';',
Expand All @@ -137,17 +143,24 @@ class CommonDataTypeSerdeTest : public ::testing::Test {

std::string line;
DataTypeSerDe::FormatOptions options;
std::vector<std::vector<std::string>> res;
MutableColumns assert_str_cols(columns.size());
for (size_t i = 0; i < columns.size(); ++i) {
assert_str_cols[i] = ColumnString::create();
}

while (std::getline(file, line)) {
std::stringstream lineStream(line);
// std::cout << "whole : " << lineStream.str() << std::endl;
std::string value;
int l_idx = 0;
int c_idx = 0;
std::vector<string> row;
while (std::getline(lineStream, value, spliter)) {
if (idxes.contains(l_idx)) {
// load csv data
Slice string_slice(value.data(), value.size());
std::cout << string_slice << std::endl;
std::cout << "origin : " << string_slice << std::endl;
Status st;
// deserialize data
if constexpr (is_hive_format) {
Expand All @@ -158,14 +171,16 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
string_slice, options);
}
if (!st.ok()) {
// deserialize if happen error now we do not insert any value for input column
// so we push a default value to column for row alignment
columns[c_idx]->insert_default();
std::cout << "error in deserialize but continue: " << st.to_string()
<< std::endl;
}
// serialize data
auto ser_col = ColumnString::create();
ser_col->reserve(1);
VectorBufferWriter bw(*ser_col.get());
size_t row_num = columns[c_idx]->size() - 1;
assert_str_cols[c_idx]->reserve(columns[c_idx]->size());
VectorBufferWriter bw(assert_cast<ColumnString&>(*assert_str_cols[c_idx]));
if constexpr (is_hive_format) {
st = serders[c_idx]->serialize_one_cell_to_hive_text(*columns[c_idx],
row_num, bw, options);
Expand All @@ -176,13 +191,42 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
EXPECT_TRUE(st.ok()) << st.to_string();
}
bw.commit();
// assert data : origin data and serialized data should be equal
EXPECT_EQ(ser_col->get_data_at(0).to_string(), string_slice.to_string());
// assert data : origin data and serialized data should be equal or generated
// file to check data
size_t assert_size = assert_str_cols[c_idx]->size();
if constexpr (!generate_res_file) {
EXPECT_EQ(assert_str_cols[c_idx]->get_data_at(assert_size - 1).to_string(),
string_slice.to_string())
<< "column: " << columns[c_idx]->get_name() << " row: " << row_num
<< " is_hive_format: " << is_hive_format;
}
++c_idx;
}
res.push_back(row);
++l_idx;
}
}

if (generate_res_file) {
// generate res
auto pos = file_path.find_last_of(".");
string hive_format = is_hive_format ? "_hive" : "";
std::string res_file = file_path.substr(0, pos) + hive_format + "_serde_res.csv";
std::ofstream res_f(res_file);
if (!res_f.is_open()) {
throw std::ios_base::failure("Failed to open file." + res_file);
}
for (size_t r = 0; r < assert_str_cols[0]->size(); ++r) {
for (size_t c = 0; c < assert_str_cols.size(); ++c) {
std::cout << assert_str_cols[c]->get_data_at(r).to_string() << spliter
<< std::endl;
res_f << assert_str_cols[c]->get_data_at(r).to_string() << spliter;
}
res_f << std::endl;
}
res_f.close();
std::cout << "generate res file: " << res_file << std::endl;
}
}

// standard hive text ser-deserialize assert function
Expand Down Expand Up @@ -215,36 +259,108 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
}
}

// actually this is block_to_jsonb and jsonb_to_block test
static void assert_jsonb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs serders) {
Arena pool;
auto jsonb_column = ColumnString::create(); // jsonb column
jsonb_column->reserve(load_cols[0]->size());
MutableColumns assert_cols;
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
std::cout << " now we are testing column : " << col->get_name() << std::endl;
// serialize to jsonb
assert_cols.push_back(load_cols[i]->assume_mutable());
}
for (size_t r = 0; r < load_cols[0]->size(); ++r) {
JsonbWriterT<JsonbOutStream> jw;
Arena pool;
jw.writeStartObject();
// serialize to jsonb
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
serders[i]->write_one_cell_to_jsonb(*col, jw, &pool, i, r);
}
jw.writeEndObject();
jsonb_column->insert_data(jw.getOutput()->getBuffer(), jw.getOutput()->getSize());
}
// deserialize jsonb column to assert column
EXPECT_EQ(jsonb_column->size(), load_cols[0]->size());
for (size_t r = 0; r < jsonb_column->size(); ++r) {
StringRef jsonb_data = jsonb_column->get_data_at(r);
auto pdoc = JsonbDocument::createDocument(jsonb_data.data, jsonb_data.size);
JsonbDocument& doc = *pdoc;
size_t cIdx = 0;
for (auto it = doc->begin(); it != doc->end(); ++it) {
serders[cIdx]->read_one_cell_from_jsonb(*assert_cols[cIdx], it->value());
++cIdx;
}
}
// check column value
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
auto& assert_col = assert_cols[i];
for (size_t j = 0; j < col->size(); ++j) {
serders[i]->write_one_cell_to_jsonb(*col, jw, &pool, i, j);
auto cell = col->operator[](j);
auto assert_cell = assert_col->operator[](j);
EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name() << " row: " << j;
}
// deserialize from jsonb
auto expect_column = col->clone_empty();
auto jsonb_column = ColumnString::create();
}
}

// assert mysql text format, now we just simple assert not to fatal or exception here
static void assert_mysql_format(MutableColumns& load_cols, DataTypeSerDeSPtrs serders) {
MysqlRowBuffer<false> row_buffer;
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
for (size_t j = 0; j < col->size(); ++j) {
jsonb_column->insert_data(jw.getOutput()->getBuffer(), jw.getOutput()->getSize());
StringRef jsonb_data = jsonb_column->get_data_at(0);
auto pdoc = JsonbDocument::createDocument(jsonb_data.data, jsonb_data.size);
JsonbDocument& doc = *pdoc;
for (auto it = doc->begin(); it != doc->end(); ++it) {
serders[i]->read_one_cell_from_jsonb(*expect_column, it->value());
}
Status st;
EXPECT_NO_FATAL_FAILURE(
st = serders[i]->write_column_to_mysql(*col, row_buffer, j, false, {}));
EXPECT_TRUE(st.ok()) << st.to_string();
}
// check column value
}
}

// assert arrow serialize
static void assert_arrow_format(MutableColumns& load_cols, DataTypeSerDeSPtrs serders,
DataTypes types) {
// make a block to write to arrow
auto block = std::make_shared<Block>();
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
block->insert(ColumnWithTypeAndName(std::move(col), types[i], types[i]->get_name()));
}
// print block
std::cout << "block: " << block->dump_structure() << std::endl;
std::shared_ptr<arrow::Schema> block_arrow_schema;
EXPECT_EQ(get_arrow_schema_from_block(*block, &block_arrow_schema, "UTC"), Status::OK());
// convert block to arrow
std::shared_ptr<arrow::RecordBatch> result;
cctz::time_zone _timezone_obj; //default UTC
Status stt = convert_to_arrow_batch(*block, block_arrow_schema,
arrow::default_memory_pool(), &result, _timezone_obj);
EXPECT_EQ(Status::OK(), stt) << "convert block to arrow failed" << stt.to_string();

// deserialize arrow to block
auto assert_block = block->clone_empty();
auto rows = block->rows();
for (size_t i = 0; i < load_cols.size(); ++i) {
auto array = result->column(i);
auto& column_with_type_and_name = assert_block.get_by_position(i);
auto ret = arrow_column_to_doris_column(
array.get(), 0, column_with_type_and_name.column,
column_with_type_and_name.type, rows, _timezone_obj);
// do check data
EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" << ret.to_string();
auto& col = block->get_by_position(i).column;
auto& assert_col = column_with_type_and_name.column;
for (size_t j = 0; j < col->size(); ++j) {
auto cell = col->operator[](j);
auto expect_cell = expect_column->operator[](j);
EXPECT_EQ(cell, expect_cell) << "column: " << col->get_name() << " row: " << j;
auto assert_cell = assert_col->operator[](j);
EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name() << " row: " << j;
}
}
}

// assert rapidjson format
// now rapidjson write_one_cell_to_json and read_one_cell_from_json only used in column_object
// can just be replaced by jsonb format
};

} // namespace doris::vectorized
39 changes: 35 additions & 4 deletions be/test/vec/data_types/data_type_ip_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ TEST_F(DataTypeIPTest, SerdeHiveTextAndJsonFormatTest) {
ip_cols.push_back(column_ipv4->get_ptr());
ip_cols.push_back(column_ipv6->get_ptr());
DataTypeSerDeSPtrs serde = {dt_ipv4->get_serde(), dt_ipv6->get_serde()};
CommonDataTypeSerdeTest::load_data_and_assert_from_csv<true>(serde, ip_cols, data_files[1], ';',
{1, 2});
CommonDataTypeSerdeTest::load_data_and_assert_from_csv<false>(serde, ip_cols, data_files[1],
';', {1, 2});
CommonDataTypeSerdeTest::load_data_and_assert_from_csv<true, true>(serde, ip_cols,
data_files[1], ';', {1, 2});
CommonDataTypeSerdeTest::load_data_and_assert_from_csv<false, true>(serde, ip_cols,
data_files[1], ';', {1, 2});
}

TEST_F(DataTypeIPTest, SerdePbTest) {
Expand Down Expand Up @@ -200,4 +200,35 @@ TEST_F(DataTypeIPTest, SerdeJsonbTest) {
CommonDataTypeSerdeTest::assert_jsonb_format);
}

TEST_F(DataTypeIPTest, SerdeMysqlTest) {
auto serde_ipv4 = dt_ipv4->get_serde(1);
auto serde_ipv6 = dt_ipv6->get_serde(1);
auto column_ipv4 = dt_ipv4->create_column();
auto column_ipv6 = dt_ipv6->create_column();

// insert from data csv and assert insert result
MutableColumns ip_cols;
ip_cols.push_back(column_ipv4->get_ptr());
ip_cols.push_back(column_ipv6->get_ptr());
DataTypeSerDeSPtrs serde = {dt_ipv4->get_serde(), dt_ipv6->get_serde()};
CommonDataTypeSerdeTest::check_data(ip_cols, serde, ';', {1, 2}, data_files[0],
CommonDataTypeSerdeTest::assert_mysql_format);
}

TEST_F(DataTypeIPTest, SerdeArrowTest) {
auto serde_ipv4 = dt_ipv4->get_serde(1);
auto serde_ipv6 = dt_ipv6->get_serde(1);
auto column_ipv4 = dt_ipv4->create_column();
auto column_ipv6 = dt_ipv6->create_column();

// insert from data csv and assert insert result
MutableColumns ip_cols;
ip_cols.push_back(column_ipv4->get_ptr());
ip_cols.push_back(column_ipv6->get_ptr());
DataTypeSerDeSPtrs serde = {dt_ipv4->get_serde(), dt_ipv6->get_serde()};
CommonDataTypeSerdeTest::load_data_and_assert_from_csv<true, true>(serde, ip_cols,
data_files[1], ';', {1, 2});
CommonDataTypeSerdeTest::assert_arrow_format(ip_cols, serde, {dt_ipv4, dt_ipv6});
}

} // namespace doris::vectorized

0 comments on commit 363fc27

Please sign in to comment.