Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor row format with slice formant and single/multi slice row format #1381

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c15f63c
Refactor row format with slice formant and single/multi slice row format
tobegit3hub Mar 5, 2022
8da7721
Use slice index instead of schema index for ir builder
tobegit3hub Mar 5, 2022
c210917
Fix check for row format object
tobegit3hub Mar 5, 2022
1bb5889
Fix unit test casesg
tobegit3hub Mar 5, 2022
0d4d8aa
Add deconstructor and check pointer
tobegit3hub Mar 5, 2022
c4cd525
Enable single slice row format if enable unsafe row opt
tobegit3hub Mar 5, 2022
25fb442
Support encode single slice row format when enable unsaferow opt in o…
tobegit3hub Mar 5, 2022
42b3be6
Fix use slice idx instead of schema index
tobegit3hub Mar 5, 2022
dae74fe
Correct the column index from ColInfo instead of passed parameter
tobegit3hub Mar 5, 2022
4687156
Get row format corrected col index
tobegit3hub Mar 5, 2022
33f0dfc
Refine for comments
tobegit3hub Mar 7, 2022
abad45d
Add unit test for window without select
tobegit3hub Mar 7, 2022
3a3b92c
Update batch config
tobegit3hub Mar 8, 2022
34c10ea
Merge branch 'main' of github.com:4paradigm/OpenMLDB into feat/refact…
tobegit3hub Mar 8, 2022
337decb
Merge branch 'main' of github.com:4paradigm/OpenMLDB into feat/refact…
tobegit3hub Mar 9, 2022
6b12abf
Support new codec and unsafe row codec for window with append slice
tobegit3hub Mar 9, 2022
28f369c
Add more unit test for unsafe row opt
tobegit3hub Mar 9, 2022
c4d358a
Release empty row for window computer
tobegit3hub Mar 10, 2022
6130aa2
Support convert timestamp for unsaferow and openmldb row format
tobegit3hub Mar 10, 2022
2289ff7
Support set long in joined row with unsafe row
tobegit3hub Mar 10, 2022
ed86c0b
Format the scala code
tobegit3hub Mar 10, 2022
7158c3f
Fix get incorrect row from joined row
tobegit3hub Mar 10, 2022
4705165
Check schema context for cpp unit tests
tobegit3hub Mar 10, 2022
28d8d85
Fix cpplint
tobegit3hub Mar 10, 2022
7445958
Merge branch 'main' of github.com:4paradigm/OpenMLDB into feat/refact…
tobegit3hub Mar 10, 2022
e8d596c
Update ut for SliceFormat and check row format is nullptr
tobegit3hub Mar 11, 2022
dd2b295
Merge branch 'main' into feat/refactor_row_format_and_support_single_…
tobegit3hub Mar 11, 2022
c55e128
Merge branch 'main' of github.com:4paradigm/OpenMLDB into feat/refact…
tobegit3hub Mar 11, 2022
f0ceb3c
Fix syntax error after resolve conflict
tobegit3hub Mar 11, 2022
1aa51d4
Reset unsaferow gflag for cpp testsg
tobegit3hub Mar 12, 2022
34b5027
Ignore toydb engine test for cicd
tobegit3hub Mar 12, 2022
e5f4627
Merge branch 'main' of github.com:4paradigm/OpenMLDB into feat/refact…
tobegit3hub Mar 14, 2022
ec24d5e
Merge branch 'main' of github.com:4paradigm/OpenMLDB into feat/refact…
tobegit3hub Mar 14, 2022
013b450
Revert to run hybridse toydb engine test
tobegit3hub Mar 14, 2022
334fe34
Ignore failed test for toydb
tobegit3hub Mar 14, 2022
d0c88fd
Init function def for const project node
tobegit3hub Mar 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 87 additions & 3 deletions hybridse/include/codec/fe_row_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ struct StringColInfo : public ColInfo {
str_start_offset(str_start_offset) {}
};

class RowFormat {
class SliceFormat {
public:
explicit RowFormat(const hybridse::codec::Schema* schema);
virtual ~RowFormat() {}
explicit SliceFormat(const hybridse::codec::Schema* schema);
virtual ~SliceFormat() {}

bool GetStringColumnInfo(size_t idx, StringColInfo* res) const;

Expand All @@ -217,6 +217,90 @@ class RowFormat {
uint32_t str_field_start_offset_;
};

class RowFormat {
public:
tobegit3hub marked this conversation as resolved.
Show resolved Hide resolved
virtual ~RowFormat() {}
virtual bool GetStringColumnInfo(size_t schema_idx, size_t idx, StringColInfo* res) const = 0;
virtual const ColInfo* GetColumnInfo(size_t schema_idx, size_t idx) const = 0;
virtual size_t GetSliceId(size_t schema_idx) const = 0;
};

class MultiSlicesRowFormat : public RowFormat {
public:
explicit MultiSlicesRowFormat(const Schema* schema) {
slice_formats_.emplace_back(SliceFormat(schema));
}

~MultiSlicesRowFormat() {
slice_formats_.clear();
}

explicit MultiSlicesRowFormat(const std::vector<const Schema*>& schemas) {
for (auto schema : schemas) {
slice_formats_.emplace_back(SliceFormat(schema));
}
}

bool GetStringColumnInfo(size_t schema_idx, size_t idx, StringColInfo* res) const override {
return slice_formats_[schema_idx].GetStringColumnInfo(idx, res);
}

const ColInfo* GetColumnInfo(size_t schema_idx, size_t idx) const override {
return slice_formats_[schema_idx].GetColumnInfo(idx);
}

size_t GetSliceId(size_t schema_idx) const override {
return schema_idx;
}

private:
std::vector<SliceFormat> slice_formats_;
};

class SingleSliceRowFormat : public RowFormat {
public:
explicit SingleSliceRowFormat(const Schema* schema) {
slice_format_ = new SliceFormat(schema);
offsets_.emplace_back(0);
}

~SingleSliceRowFormat() {
offsets_.clear();
if (slice_format_) {
delete slice_format_;
}
}

explicit SingleSliceRowFormat(const std::vector<const Schema*>& schemas) {
int offset = 0;
for (auto schema : schemas) {
offsets_.emplace_back(offset);
offset += schema->size();
// Merge schema and make sure it is appended
merged_schema_.MergeFrom(*schema);
}

slice_format_ = new SliceFormat(&merged_schema_);
}

bool GetStringColumnInfo(size_t schema_idx, size_t idx, StringColInfo* res) const override {
return slice_format_->GetStringColumnInfo(offsets_[schema_idx] + idx, res);
}

const ColInfo* GetColumnInfo(size_t schema_idx, size_t idx) const override {
return slice_format_->GetColumnInfo(offsets_[schema_idx] + idx);
}

size_t GetSliceId(size_t schema_idx) const override {
return 0;
}

private:
std::vector<size_t> offsets_;
SliceFormat* slice_format_;
Schema merged_schema_;
};

} // namespace codec
} // namespace hybridse
#endif // HYBRIDSE_INCLUDE_CODEC_FE_ROW_CODEC_H_
6 changes: 3 additions & 3 deletions hybridse/include/vm/schemas_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ class SchemasContext {
const PhysicalOpNode* GetRoot() const;

/**
* Get detailed format for `idx`th schema source.
* Get detailed format.
*/
const codec::RowFormat* GetRowFormat(size_t idx) const;
const codec::RowFormat* GetRowFormat() const;

/**
* Get `idx`th schema source.
Expand Down Expand Up @@ -267,7 +267,7 @@ class SchemasContext {
std::vector<SchemaSource*> schema_sources_;

// detailed schema format info
std::vector<codec::RowFormat> row_formats_;
codec::RowFormat* row_format_ = nullptr;

// owned schema object
codec::Schema owned_concat_output_schema_;
Expand Down
8 changes: 4 additions & 4 deletions hybridse/src/codec/fe_row_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ int32_t RowView::GetString(uint32_t idx, const char** val, uint32_t* length) {
length);
}

RowFormat::RowFormat(const hybridse::codec::Schema* schema)
SliceFormat::SliceFormat(const hybridse::codec::Schema* schema)
: schema_(schema), infos_(), next_str_pos_(), str_field_start_offset_(0) {
if (nullptr == schema) {
return;
Expand Down Expand Up @@ -959,11 +959,11 @@ RowFormat::RowFormat(const hybridse::codec::Schema* schema)
str_field_start_offset_ = offset;
}

const ColInfo* RowFormat::GetColumnInfo(size_t idx) const {
const ColInfo* SliceFormat::GetColumnInfo(size_t idx) const {
return idx < infos_.size() ? &infos_[idx] : nullptr;
}

bool RowFormat::GetStringColumnInfo(size_t idx, StringColInfo* res) const {
bool SliceFormat::GetStringColumnInfo(size_t idx, StringColInfo* res) const {
if (nullptr == res) {
LOG(WARNING) << "input args have null";
return false;
Expand All @@ -982,7 +982,7 @@ bool RowFormat::GetStringColumnInfo(size_t idx, StringColInfo* res) const {
next_offset = nit->second;
} else {
if (FLAGS_enable_spark_unsaferow_format) {
// Do not need to get next offset for UnsafeRowOpt
// No need to get next offset for UnsafeRowOpt and ignore the warning
} else {
LOG(WARNING) << "fail to get string field next offset";
return false;
Expand Down
18 changes: 11 additions & 7 deletions hybridse/src/codec/fe_row_codec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ TEST_F(CodecTest, ManyCol) {
}
}

TEST_F(CodecTest, RowFormatTest) {
TEST_F(CodecTest, SliceFormatTest) {
std::vector<int> num_vec = {10, 20, 50, 100, 1000};
for (auto col_num : num_vec) {
::hybridse::type::TableDef def;
Expand All @@ -421,7 +421,7 @@ TEST_F(CodecTest, RowFormatTest) {
}
}

RowFormat decoder(&def.columns());
SliceFormat decoder(&def.columns());
for (int i = 0; i < col_num; i++) {
if (i % 3 == 0) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
Expand All @@ -443,7 +443,7 @@ TEST_F(CodecTest, RowFormatTest) {
}
}

TEST_F(CodecTest, RowFormatOffsetTest) {
TEST_F(CodecTest, SliceFormatOffsetTest) {
type::TableDef table;
table.set_name("t1");
{
Expand Down Expand Up @@ -484,7 +484,7 @@ TEST_F(CodecTest, RowFormatOffsetTest) {
column->set_name("col7");
}

RowFormat decoder(&table.columns());
SliceFormat decoder(&table.columns());
{
const codec::ColInfo* info = decoder.GetColumnInfo(0);
ASSERT_EQ(::hybridse::type::kInt32, info->type);
Expand Down Expand Up @@ -542,7 +542,7 @@ TEST_F(CodecTest, RowFormatOffsetTest) {
ASSERT_EQ(33u, str_info.str_start_offset);
}
}
TEST_F(CodecTest, RowFormatOffsetLongHeaderTest) {
TEST_F(CodecTest, SliceFormatOffsetLongHeaderTest) {
type::TableDef table;
table.set_name("t1");
{
Expand Down Expand Up @@ -593,7 +593,7 @@ TEST_F(CodecTest, RowFormatOffsetLongHeaderTest) {
column->set_name("col9");
}

RowFormat decoder(&table.columns());
SliceFormat decoder(&table.columns());
{
const codec::ColInfo* info = decoder.GetColumnInfo(0);
ASSERT_EQ(::hybridse::type::kInt32, info->type);
Expand Down Expand Up @@ -665,6 +665,8 @@ TEST_F(CodecTest, SparkUnsaferowBitMapSizeTest) {
ASSERT_EQ(BitMapSize(9), 8);
ASSERT_EQ(BitMapSize(20), 8);
ASSERT_EQ(BitMapSize(65), 16);

FLAGS_enable_spark_unsaferow_format = false;
}
TEST_F(CodecTest, SparkUnsaferowRowFormatTest) {
FLAGS_enable_spark_unsaferow_format = true;
Expand All @@ -684,7 +686,7 @@ TEST_F(CodecTest, SparkUnsaferowRowFormatTest) {
}
}

RowFormat decoder(&def.columns());
SliceFormat decoder(&def.columns());
for (int i = 0; i < col_num; i++) {
if (i % 3 == 0) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
Expand All @@ -704,6 +706,8 @@ TEST_F(CodecTest, SparkUnsaferowRowFormatTest) {
}
}
}

FLAGS_enable_spark_unsaferow_format = false;
}

} // namespace codec
Expand Down
1 change: 0 additions & 1 deletion hybridse/src/codec/type_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ int32_t AppendString(int8_t* buf_ptr, uint32_t buf_size, uint32_t col_idx,
int8_t* val, uint32_t size, int8_t is_null,
uint32_t str_start_offset, uint32_t str_field_offset,
uint32_t str_addr_space, uint32_t str_body_offset) {

if (is_null) {
AppendNullBit(buf_ptr, col_idx, true);
size_t str_addr_length = GetAddrLength(buf_size);
Expand Down
27 changes: 20 additions & 7 deletions hybridse/src/codegen/aggregate_ir_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ bool AggregateIRBuilder::CollectAggColumn(const hybridse::node::ExprNode* expr,
return false;
}
const codec::ColInfo& col_info =
*schema_context_->GetRowFormat(schema_idx)
->GetColumnInfo(col_idx);
*schema_context_->GetRowFormat()
->GetColumnInfo(schema_idx, col_idx);
auto col_type = col_info.type;
uint32_t offset = col_info.offset;

Expand Down Expand Up @@ -677,15 +677,22 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
// compute current row's slices
for (auto& pair : agg_col_infos_) {
size_t schema_idx = pair.second.schema_idx;
auto iter = used_slices.find(schema_idx);

size_t slice_idx = schema_idx;
// TODO(tobe): Check row format before getting
if (schema_context_->GetRowFormat() != nullptr) {
slice_idx = schema_context_->GetRowFormat()->GetSliceId(schema_idx);
}

auto iter = used_slices.find(slice_idx);
if (iter == used_slices.end()) {
::llvm::Value* idx_value =
llvm::ConstantInt::get(int64_ty, schema_idx, true);
llvm::ConstantInt::get(int64_ty, slice_idx, true);
::llvm::Value* buf_ptr =
builder.CreateCall(get_slice_func, {iter_ptr, idx_value});
::llvm::Value* buf_size =
builder.CreateCall(get_slice_size_func, {iter_ptr, idx_value});
used_slices[schema_idx] = {buf_ptr, buf_size};
used_slices[slice_idx] = {buf_ptr, buf_size};
}
}

Expand All @@ -696,11 +703,17 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
std::string col_key = info.GetColKey();
if (cur_row_fields_dict.find(col_key) == cur_row_fields_dict.end()) {
size_t schema_idx = info.schema_idx;
auto& slice_info = used_slices[schema_idx];
size_t slice_idx = schema_idx;
// TODO(tobe): Check row format before getting
if (schema_context_->GetRowFormat() != nullptr) {
slice_idx = schema_context_->GetRowFormat()->GetSliceId(schema_idx);
}

auto& slice_info = used_slices[slice_idx];

ScopeVar dummy_scope_var;
BufNativeIRBuilder buf_builder(
schema_idx, schema_context_->GetRowFormat(schema_idx),
schema_idx, schema_context_->GetRowFormat(),
body_block, &dummy_scope_var);
NativeValue field_value;
CHECK_TRUE(buf_builder.BuildGetField(info.col_idx, slice_info.first, slice_info.second, &field_value),
Expand Down
Loading