Skip to content

Commit

Permalink
jsoncontainexpr unaryexpr binaryexpr json index optimization
Browse files Browse the repository at this point in the history
Signed-off-by: Xianhui.Lin <[email protected]>

improve jsonContainSExpr

Signed-off-by: Xianhui.Lin <[email protected]>

add jsonindex test

Signed-off-by: Xianhui.Lin <[email protected]>
  • Loading branch information
JsDove committed Dec 26, 2024
1 parent fd42099 commit b9b1cbd
Show file tree
Hide file tree
Showing 12 changed files with 1,517 additions and 493 deletions.
137 changes: 103 additions & 34 deletions internal/core/src/exec/expression/BinaryRangeExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,17 +260,17 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) {
TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size);
valid_res.set();

auto execute_sub_batch =
[ lower_inclusive,
upper_inclusive ]<FilterType filter_type = FilterType::sequential>(
const T* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
HighPrecisionType val1,
HighPrecisionType val2) {
auto execute_sub_batch = [lower_inclusive,
upper_inclusive]<FilterType filter_type =
FilterType::sequential>(
const T* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
HighPrecisionType val1,
HighPrecisionType val2) {
if (lower_inclusive && upper_inclusive) {
BinaryRangeElementFunc<T, true, true, filter_type> func;
func(val1, val2, data, size, res, offsets);
Expand Down Expand Up @@ -344,6 +344,10 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) {
using GetType = std::conditional_t<std::is_same_v<ValueType, std::string>,
std::string_view,
ValueType>;
FieldId field_id = expr_->column_.field_id_;
if (CanUseJsonKeyIndex(field_id)) {
return ExecRangeVisitorImplForJsonForIndex<ValueType>();
}
auto real_batch_size =
has_offset_input_ ? input->size() : GetNextBatchSize();
if (real_batch_size == 0) {
Expand All @@ -361,17 +365,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) {
ValueType val2 = GetValueFromProto<ValueType>(expr_->upper_val_);
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);

auto execute_sub_batch =
[ lower_inclusive, upper_inclusive,
pointer ]<FilterType filter_type = FilterType::sequential>(
const milvus::Json* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ValueType val1,
ValueType val2) {
auto execute_sub_batch = [lower_inclusive,
upper_inclusive,
pointer]<FilterType filter_type =
FilterType::sequential>(
const milvus::Json* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ValueType val1,
ValueType val2) {
if (lower_inclusive && upper_inclusive) {
BinaryRangeElementFuncForJson<ValueType, true, true, filter_type>
func;
Expand Down Expand Up @@ -444,6 +449,70 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) {
return res_vec;
}

template <typename ValueType>
VectorPtr
PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() {
using GetType = std::conditional_t<std::is_same_v<ValueType, std::string>,
std::string_view,
ValueType>;
Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
bool lower_inclusive = expr_->lower_inclusive_;
bool upper_inclusive = expr_->upper_inclusive_;
ValueType val1 = GetValueFromProto<ValueType>(expr_->lower_val_);
ValueType val2 = GetValueFromProto<ValueType>(expr_->upper_val_);
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
auto filter_func =
[sealed_seg,
&field_id,
val1,
val2,
lower_inclusive,
upper_inclusive](uint32_t row_id, uint16_t offset, uint16_t size) {
auto json_pair = sealed_seg->GetJsonData(field_id, row_id);
if (!json_pair.second) {
return false;
}
auto json = milvus::Json(json_pair.first.data(),
json_pair.first.size());
auto val = json.at<GetType>(offset, size);
if (val.error()) {
return false;
}
if (lower_inclusive && upper_inclusive) {
return val1 <= ValueType(val.value()) &&
ValueType(val.value()) <= val2;
} else if (lower_inclusive && !upper_inclusive) {
return val1 <= ValueType(val.value()) &&
ValueType(val.value()) < val2;
} else if (!lower_inclusive && upper_inclusive) {
return val1 < ValueType(val.value()) &&
ValueType(val.value()) <= val2;
} else {
return val1 < ValueType(val.value()) &&
ValueType(val.value()) < val2;
}
};
cached_index_chunk_res_ =
index->FilterByPath(pointer, real_batch_size, filter_func).clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}

template <typename ValueType>
VectorPtr
PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) {
Expand All @@ -470,18 +539,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) {
index = std::stoi(expr_->column_.nested_path_[0]);
}

auto execute_sub_batch =
[ lower_inclusive,
upper_inclusive ]<FilterType filter_type = FilterType::sequential>(
const milvus::ArrayView* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ValueType val1,
ValueType val2,
int index) {
auto execute_sub_batch = [lower_inclusive,
upper_inclusive]<FilterType filter_type =
FilterType::sequential>(
const milvus::ArrayView* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ValueType val1,
ValueType val2,
int index) {
if (lower_inclusive && upper_inclusive) {
BinaryRangeElementFuncForArray<ValueType, true, true, filter_type>
func;
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/exec/expression/BinaryRangeExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr {
VectorPtr
ExecRangeVisitorImplForJson(OffsetVector* input = nullptr);

template <typename ValueType>
VectorPtr
ExecRangeVisitorImplForJsonForIndex();

template <typename ValueType>
VectorPtr
ExecRangeVisitorImplForArray(OffsetVector* input = nullptr);
Expand Down
62 changes: 51 additions & 11 deletions internal/core/src/exec/expression/ExistsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ PhyExistsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {

VectorPtr
PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) {
FieldId field_id = expr_->column_.field_id_;
if (CanUseJsonKeyIndex(field_id)) {
return EvalJsonExistsForDataSegmentForIndex();
}
auto real_batch_size =
has_offset_input_ ? input->size() : GetNextBatchSize();
if (real_batch_size == 0) {
Expand All @@ -63,18 +67,18 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) {
TargetBitmapView res,
TargetBitmapView valid_res,
const std::string& pointer) {
for (int i = 0; i < size; ++i) {
auto offset = i;
if constexpr (filter_type == FilterType::random) {
offset = (offsets) ? offsets[i] : i;
}
if (valid_data != nullptr && !valid_data[offset]) {
res[i] = valid_res[i] = false;
continue;
for (int i = 0; i < size; ++i) {
auto offset = i;
if constexpr (filter_type == FilterType::random) {
offset = (offsets) ? offsets[i] : i;
}
if (valid_data != nullptr && !valid_data[offset]) {
res[i] = valid_res[i] = false;
continue;
}
res[i] = data[offset].exist(pointer);
}
res[i] = data[offset].exist(pointer);
}
};
};

int64_t processed_size;
if (has_offset_input_) {
Expand All @@ -96,5 +100,41 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) {
return res_vec;
}

VectorPtr
PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() {
Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
auto filter_func = [sealed_seg, field_id, pointer](uint32_t row_id,
uint16_t offset,
uint16_t size) {
auto json_pair = sealed_seg->GetJsonData(field_id, row_id);
if (!json_pair.second) {
return false;
}
auto json =
milvus::Json(json_pair.first.data(), json_pair.first.size());
return json.exist(pointer);
};
cached_index_chunk_res_ =
index->FilterByPath(pointer, real_batch_size, filter_func).clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}

} //namespace exec
} // namespace milvus
3 changes: 3 additions & 0 deletions internal/core/src/exec/expression/ExistsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class PhyExistsFilterExpr : public SegmentExpr {
VectorPtr
EvalJsonExistsForDataSegment(OffsetVector* input = nullptr);

VectorPtr
EvalJsonExistsForDataSegmentForIndex();

private:
std::shared_ptr<const milvus::expr::ExistsExpr> expr_;
};
Expand Down
Loading

0 comments on commit b9b1cbd

Please sign in to comment.