diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.cpp b/internal/core/src/exec/expression/BinaryRangeExpr.cpp index a51e58ed8fc3a..771e8db45a141 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryRangeExpr.cpp @@ -260,17 +260,17 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); valid_res.set(); - auto execute_sub_batch = - [ lower_inclusive, - upper_inclusive ]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - HighPrecisionType val1, - HighPrecisionType val2) { + auto execute_sub_batch = [lower_inclusive, + upper_inclusive]( + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + HighPrecisionType val1, + HighPrecisionType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFunc func; func(val1, val2, data, size, res, offsets); @@ -344,6 +344,10 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { using GetType = std::conditional_t, std::string_view, ValueType>; + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecRangeVisitorImplForJsonForIndex(); + } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -361,17 +365,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { ValueType val2 = GetValueFromProto(expr_->upper_val_); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); - auto execute_sub_batch = - [ lower_inclusive, upper_inclusive, - pointer ]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val1, - ValueType val2) { + auto execute_sub_batch = [lower_inclusive, + upper_inclusive, + pointer]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val1, + ValueType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFuncForJson func; @@ -444,6 +449,70 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { return res_vec; } +template +VectorPtr +PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() { + using GetType = std::conditional_t, + std::string_view, + ValueType>; + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + bool lower_inclusive = expr_->lower_inclusive_; + bool upper_inclusive = expr_->upper_inclusive_; + ValueType val1 = GetValueFromProto(expr_->lower_val_); + ValueType val2 = GetValueFromProto(expr_->upper_val_); + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = + [sealed_seg, + &field_id, + val1, + val2, + lower_inclusive, + upper_inclusive](uint32_t row_id, uint16_t offset, uint16_t size) { + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = milvus::Json(json_pair.first.data(), + json_pair.first.size()); + auto val = json.at(offset, size); + if (val.error()) { + return false; + } + if (lower_inclusive && upper_inclusive) { + return val1 <= ValueType(val.value()) && + ValueType(val.value()) <= val2; + } else if (lower_inclusive && !upper_inclusive) { + return val1 <= ValueType(val.value()) && + ValueType(val.value()) < val2; + } else if (!lower_inclusive && upper_inclusive) { + return val1 < ValueType(val.value()) && + ValueType(val.value()) <= val2; + } else { + return val1 < ValueType(val.value()) && + ValueType(val.value()) < val2; + } + }; + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; + } + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); +} + template VectorPtr PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { @@ -470,18 +539,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { index = std::stoi(expr_->column_.nested_path_[0]); } - auto execute_sub_batch = - [ lower_inclusive, - upper_inclusive ]( - const milvus::ArrayView* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val1, - ValueType val2, - int index) { + auto execute_sub_batch = [lower_inclusive, + upper_inclusive]( + const milvus::ArrayView* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val1, + ValueType val2, + int index) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFuncForArray func; diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.h b/internal/core/src/exec/expression/BinaryRangeExpr.h index 1babfc6fd044e..7696dae12c76f 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.h +++ b/internal/core/src/exec/expression/BinaryRangeExpr.h @@ -270,6 +270,10 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr { VectorPtr ExecRangeVisitorImplForJson(OffsetVector* input = nullptr); + template + VectorPtr + ExecRangeVisitorImplForJsonForIndex(); + template VectorPtr ExecRangeVisitorImplForArray(OffsetVector* input = nullptr); diff --git a/internal/core/src/exec/expression/ExistsExpr.cpp b/internal/core/src/exec/expression/ExistsExpr.cpp index a4163e46aa0f7..cc64dcb57c43b 100644 --- a/internal/core/src/exec/expression/ExistsExpr.cpp +++ b/internal/core/src/exec/expression/ExistsExpr.cpp @@ -42,6 +42,10 @@ PhyExistsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { VectorPtr PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) { + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return EvalJsonExistsForDataSegmentForIndex(); + } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -63,18 +67,18 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::string& pointer) { - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = data[offset].exist(pointer); } - res[i] = data[offset].exist(pointer); - } - }; + }; int64_t processed_size; if (has_offset_input_) { @@ -96,5 +100,41 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) { return res_vec; } +VectorPtr +PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() { + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, field_id, pointer](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); + return json.exist(pointer); + }; + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; + } + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); +} + } //namespace exec } // namespace milvus diff --git a/internal/core/src/exec/expression/ExistsExpr.h b/internal/core/src/exec/expression/ExistsExpr.h index dc00f883c7400..1b6dac2b1c272 100644 --- a/internal/core/src/exec/expression/ExistsExpr.h +++ b/internal/core/src/exec/expression/ExistsExpr.h @@ -59,6 +59,9 @@ class PhyExistsFilterExpr : public SegmentExpr { VectorPtr EvalJsonExistsForDataSegment(OffsetVector* input = nullptr); + VectorPtr + EvalJsonExistsForDataSegmentForIndex(); + private: std::shared_ptr expr_; }; diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index 3318a4822865f..46cd3fc220a91 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -196,27 +196,28 @@ PhyJsonContainsFilterExpr::ExecArrayContains(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::unordered_set& elements) { - auto executor = [&](size_t i) { - const auto& array = data[i]; - for (int j = 0; j < array.length(); ++j) { - if (elements.count(array.template get_data(j)) > 0) { - return true; + auto executor = [&](size_t i) { + const auto& array = data[i]; + for (int j = 0; j < array.length(); ++j) { + if (elements.count(array.template get_data(j)) > + 0) { + return true; + } + } + return false; + }; + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; } + res[i] = executor(offset); } - return false; }; - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); - } - }; int64_t processed_size; if (has_offset_input_) { @@ -246,6 +247,12 @@ PhyJsonContainsFilterExpr::ExecJsonContains(OffsetVector* input) { std::conditional_t, std::string_view, ExprValueType>; + + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecJsonContainsByKeyIndex(); + } + auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -273,35 +280,35 @@ PhyJsonContainsFilterExpr::ExecJsonContains(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::unordered_set& elements) { - auto executor = [&](size_t i) { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { + auto executor = [&](size_t i) { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (elements.count(val.value()) > 0) { + return true; + } + } return false; - } - for (auto&& it : array) { - auto val = it.template get(); - if (val.error()) { - continue; + }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; } - if (elements.count(val.value()) > 0) { - return true; + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; } + res[i] = executor(offset); } - return false; }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); - } - }; int64_t processed_size; if (has_offset_input_) { @@ -328,8 +335,77 @@ PhyJsonContainsFilterExpr::ExecJsonContains(OffsetVector* input) { return res_vec; } +template +VectorPtr +PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() { + using GetType = + std::conditional_t, + std::string_view, + ExprValueType>; + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + std::unordered_set elements; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + for (auto const& element : expr_->vals_) { + elements.insert(GetValueFromProto(element)); + } + if (elements.empty()) { + MoveCursor(); + return std::make_shared( + TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); + } + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, &elements, &field_id](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); + auto array = json.array_at(offset, size); + + if (array.error()) { + return false; + } + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (elements.count(val.value()) > 0) { + return true; + } + } + return false; + }; + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; + } + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); +} + VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) { + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecJsonContainsArrayByKeyIndex(); + } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -357,44 +433,44 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](size_t i) -> bool { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - for (auto&& it : array) { - auto val = it.get_array(); - if (val.error()) { - continue; + auto executor = [&](size_t i) -> bool { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; } - std::vector< - simdjson::simdjson_result> - json_array; - json_array.reserve(val.count_elements()); - for (auto&& e : val) { - json_array.emplace_back(e); - } - for (auto const& element : elements) { - if (CompareTwoJsonArray(json_array, element)) { - return true; + for (auto&& it : array) { + auto val = it.get_array(); + if (val.error()) { + continue; } + std::vector< + simdjson::simdjson_result> + json_array; + json_array.reserve(val.count_elements()); + for (auto&& e : val) { + json_array.emplace_back(e); + } + for (auto const& element : elements) { + if (CompareTwoJsonArray(json_array, element)) { + return true; + } + } + } + return false; + }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; } + res[i] = executor(offset); } - return false; }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); - } - }; int64_t processed_size; if (has_offset_input_) { @@ -421,6 +497,67 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) { return res_vec; } +VectorPtr +PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() { + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + std::vector elements; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + for (auto const& element : expr_->vals_) { + elements.emplace_back(GetValueFromProto(element)); + } + if (elements.empty()) { + MoveCursor(); + return std::make_shared( + TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); + } + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, &elements, &field_id](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); + auto array = json.array_at(offset, size); + if (array.error()) { + return false; + } + for (auto&& it : array) { + auto val = it.get_array(); + if (val.error()) { + continue; + } + for (auto const& element : elements) { + if (CompareTwoJsonArray(val, element)) { + return true; + } + } + } + return false; + }; + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; + } + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); +} + template VectorPtr PhyJsonContainsFilterExpr::ExecArrayContainsAll(OffsetVector* input) { @@ -456,29 +593,29 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::unordered_set& elements) { - auto executor = [&](size_t i) { - std::unordered_set tmp_elements(elements); - // Note: array can only be iterated once - for (int j = 0; j < data[i].length(); ++j) { - tmp_elements.erase(data[i].template get_data(j)); - if (tmp_elements.size() == 0) { - return true; + auto executor = [&](size_t i) { + std::unordered_set tmp_elements(elements); + // Note: array can only be iterated once + for (int j = 0; j < data[i].length(); ++j) { + tmp_elements.erase(data[i].template get_data(j)); + if (tmp_elements.size() == 0) { + return true; + } } + return tmp_elements.size() == 0; + }; + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); } - return tmp_elements.size() == 0; }; - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); - } - }; int64_t processed_size; if (has_offset_input_) { @@ -508,6 +645,11 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { std::conditional_t, std::string_view, ExprValueType>; + + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecJsonContainsAllByKeyIndex(); + } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -536,38 +678,38 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::unordered_set& elements) { - auto executor = [&](const size_t i) -> bool { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - std::unordered_set tmp_elements(elements); - // Note: array can only be iterated once - for (auto&& it : array) { - auto val = it.template get(); - if (val.error()) { - continue; + auto executor = [&](const size_t i) -> bool { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; } - tmp_elements.erase(val.value()); - if (tmp_elements.size() == 0) { - return true; + std::unordered_set tmp_elements(elements); + // Note: array can only be iterated once + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { + continue; + } + tmp_elements.erase(val.value()); + if (tmp_elements.size() == 0) { + return true; + } + } + return tmp_elements.size() == 0; + }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; } + res[i] = executor(offset); } - return tmp_elements.size() == 0; }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); - } - }; int64_t processed_size; if (has_offset_input_) { @@ -594,9 +736,79 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { return res_vec; } +template +VectorPtr +PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() { + using GetType = + std::conditional_t, + std::string_view, + ExprValueType>; + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + std::unordered_set elements; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + for (auto const& element : expr_->vals_) { + elements.insert(GetValueFromProto(element)); + } + if (elements.empty()) { + MoveCursor(); + return std::make_shared( + TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); + } + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, &elements, &field_id](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); + auto array = json.array_at(offset, size); + if (array.error()) { + return false; + } + std::unordered_set tmp_elements(elements); + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { + continue; + } + tmp_elements.erase(val.value()); + if (tmp_elements.size() == 0) { + return true; + } + } + return tmp_elements.empty(); + }; + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; + } + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); +} + VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( OffsetVector* input) { + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecJsonContainsAllWithDiffTypeByKeyIndex(); + } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -629,14 +841,168 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( const std::string& pointer, const std::vector& elements, const std::unordered_set elements_index) { - auto executor = [&](size_t i) -> bool { - const auto& json = data[i]; - auto doc = json.doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { + auto executor = [&](size_t i) -> bool { + const auto& json = data[i]; + auto doc = json.doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + std::unordered_set tmp_elements_index(elements_index); + for (auto&& it : array) { + int i = -1; + for (auto& element : elements) { + i++; + switch (element.val_case()) { + case proto::plan::GenericValue::kBoolVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.bool_val()) { + tmp_elements_index.erase(i); + } + break; + } + case proto::plan::GenericValue::kInt64Val: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.int64_val()) { + tmp_elements_index.erase(i); + } + break; + } + case proto::plan::GenericValue::kFloatVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.float_val()) { + tmp_elements_index.erase(i); + } + break; + } + case proto::plan::GenericValue::kStringVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.string_val()) { + tmp_elements_index.erase(i); + } + break; + } + case proto::plan::GenericValue::kArrayVal: { + auto val = it.get_array(); + if (val.error()) { + continue; + } + if (CompareTwoJsonArray(val, + element.array_val())) { + tmp_elements_index.erase(i); + } + break; + } + default: + PanicInfo( + DataTypeInvalid, + fmt::format("unsupported data type {}", + element.val_case())); + } + if (tmp_elements_index.size() == 0) { + return true; + } + } + if (tmp_elements_index.size() == 0) { + return true; + } + } + return tmp_elements_index.size() == 0; + }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; + + int64_t processed_size; + if (has_offset_input_) { + processed_size = ProcessDataByOffsets(execute_sub_batch, + std::nullptr_t{}, + input, + res, + valid_res, + pointer, + elements, + elements_index); + } else { + processed_size = ProcessDataChunks(execute_sub_batch, + std::nullptr_t{}, + res, + valid_res, + pointer, + elements, + elements_index); + } + AssertInfo(processed_size == real_batch_size, + "internal error: expr processed rows {} not equal " + "expect batch size {}", + processed_size, + real_batch_size); + return res_vec; +} + +VectorPtr +PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() { + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + auto elements = expr_->vals_; + std::unordered_set elements_index; + int i = 0; + for (auto& element : elements) { + elements_index.insert(i); + i++; + } + if (elements.empty()) { + MoveCursor(); + return std::make_shared( + TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); + } + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, &elements, &elements_index, &field_id]( + uint32_t row_id, + uint16_t offset, + uint16_t size) { + return false; + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { return false; } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); std::unordered_set tmp_elements_index(elements_index); + auto array = json.array_at(offset, size); + if (array.error()) { + return false; + } for (auto&& it : array) { int i = -1; for (auto& element : elements) { @@ -707,48 +1073,24 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( } return tmp_elements_index.size() == 0; }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); - } - }; - - int64_t processed_size; - if (has_offset_input_) { - processed_size = ProcessDataByOffsets(execute_sub_batch, - std::nullptr_t{}, - input, - res, - valid_res, - pointer, - elements, - elements_index); - } else { - processed_size = ProcessDataChunks(execute_sub_batch, - std::nullptr_t{}, - res, - valid_res, - pointer, - elements, - elements_index); + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; } - AssertInfo(processed_size == real_batch_size, - "internal error: expr processed rows {} not equal " - "expect batch size {}", - processed_size, - real_batch_size); - return res_vec; + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); } VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecJsonContainsAllArrayByKeyIndex(); + } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -777,48 +1119,48 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](const size_t i) { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - std::unordered_set exist_elements_index; - for (auto&& it : array) { - auto val = it.get_array(); - if (val.error()) { - continue; + auto executor = [&](const size_t i) { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; } - std::vector< - simdjson::simdjson_result> - json_array; - json_array.reserve(val.count_elements()); - for (auto&& e : val) { - json_array.emplace_back(e); - } - for (int index = 0; index < elements.size(); ++index) { - if (CompareTwoJsonArray(json_array, elements[index])) { - exist_elements_index.insert(index); + std::unordered_set exist_elements_index; + for (auto&& it : array) { + auto val = it.get_array(); + if (val.error()) { + continue; + } + std::vector< + simdjson::simdjson_result> + json_array; + json_array.reserve(val.count_elements()); + for (auto&& e : val) { + json_array.emplace_back(e); + } + for (int index = 0; index < elements.size(); ++index) { + if (CompareTwoJsonArray(json_array, elements[index])) { + exist_elements_index.insert(index); + } + } + if (exist_elements_index.size() == elements.size()) { + return true; } } - if (exist_elements_index.size() == elements.size()) { - return true; + return exist_elements_index.size() == elements.size(); + }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; } + res[i] = executor(offset); } - return exist_elements_index.size() == elements.size(); }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); - } - }; int64_t processed_size; if (has_offset_input_) { @@ -845,8 +1187,77 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { return res_vec; } +VectorPtr +PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() { + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + std::vector elements; + for (auto const& element : expr_->vals_) { + elements.emplace_back(GetValueFromProto(element)); + } + if (elements.empty()) { + MoveCursor(); + return std::make_shared( + TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); + } + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, &elements, &field_id](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); + auto array = json.array_at(offset, size); + if (array.error()) { + return false; + } + std::unordered_set exist_elements_index; + for (auto&& it : array) { + auto json_array = it.get_array(); + if (json_array.error()) { + continue; + } + for (int index = 0; index < elements.size(); ++index) { + if (CompareTwoJsonArray(json_array, elements[index])) { + exist_elements_index.insert(index); + } + } + if (exist_elements_index.size() == elements.size()) { + return true; + } + } + return exist_elements_index.size() == elements.size(); + }; + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; + } + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); +} + VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) { + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecJsonContainsWithDiffTypeByKeyIndex(); + } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -879,10 +1290,146 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](const size_t i) { - auto& json = data[i]; - auto doc = json.doc(); - auto array = doc.at_pointer(pointer).get_array(); + auto executor = [&](const size_t i) { + auto& json = data[i]; + auto doc = json.doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + // Note: array can only be iterated once + for (auto&& it : array) { + for (auto const& element : elements) { + switch (element.val_case()) { + case proto::plan::GenericValue::kBoolVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.bool_val()) { + return true; + } + break; + } + case proto::plan::GenericValue::kInt64Val: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.int64_val()) { + return true; + } + break; + } + case proto::plan::GenericValue::kFloatVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.float_val()) { + return true; + } + break; + } + case proto::plan::GenericValue::kStringVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.string_val()) { + return true; + } + break; + } + case proto::plan::GenericValue::kArrayVal: { + auto val = it.get_array(); + if (val.error()) { + continue; + } + if (CompareTwoJsonArray(val, + element.array_val())) { + return true; + } + break; + } + default: + PanicInfo( + DataTypeInvalid, + fmt::format("unsupported data type {}", + element.val_case())); + } + } + } + return false; + }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; + + int64_t processed_size; + if (has_offset_input_) { + processed_size = ProcessDataByOffsets(execute_sub_batch, + std::nullptr_t{}, + input, + res, + valid_res, + pointer, + elements); + } else { + processed_size = ProcessDataChunks(execute_sub_batch, + std::nullptr_t{}, + res, + valid_res, + pointer, + elements); + } + AssertInfo(processed_size == real_batch_size, + "internal error: expr processed rows {} not equal " + "expect batch size {}", + processed_size, + real_batch_size); + return res_vec; +} + +VectorPtr +PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() { + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + auto elements = expr_->vals_; + if (elements.empty()) { + MoveCursor(); + return std::make_shared( + TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); + } + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, &elements, &field_id](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); + auto array = json.array_at(offset, size); if (array.error()) { return false; } @@ -949,42 +1496,16 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) { } return false; }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); - } - }; - - int64_t processed_size; - if (has_offset_input_) { - processed_size = ProcessDataByOffsets(execute_sub_batch, - std::nullptr_t{}, - input, - res, - valid_res, - pointer, - elements); - } else { - processed_size = ProcessDataChunks(execute_sub_batch, - std::nullptr_t{}, - res, - valid_res, - pointer, - elements); + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; } - AssertInfo(processed_size == real_batch_size, - "internal error: expr processed rows {} not equal " - "expect batch size {}", - processed_size, - real_batch_size); - return res_vec; + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); } VectorPtr diff --git a/internal/core/src/exec/expression/JsonContainsExpr.h b/internal/core/src/exec/expression/JsonContainsExpr.h index a0c8848cba188..71c2bf780f0d6 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.h +++ b/internal/core/src/exec/expression/JsonContainsExpr.h @@ -56,6 +56,10 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { VectorPtr ExecJsonContains(OffsetVector* input = nullptr); + template + VectorPtr + ExecJsonContainsByKeyIndex(); + template VectorPtr ExecArrayContains(OffsetVector* input = nullptr); @@ -64,6 +68,10 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { VectorPtr ExecJsonContainsAll(OffsetVector* input = nullptr); + template + VectorPtr + ExecJsonContainsAllByKeyIndex(); + template VectorPtr ExecArrayContainsAll(OffsetVector* input = nullptr); @@ -71,15 +79,27 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { VectorPtr ExecJsonContainsArray(OffsetVector* input = nullptr); + VectorPtr + ExecJsonContainsArrayByKeyIndex(); + VectorPtr ExecJsonContainsAllArray(OffsetVector* input = nullptr); + VectorPtr + ExecJsonContainsAllArrayByKeyIndex(); + VectorPtr ExecJsonContainsAllWithDiffType(OffsetVector* input = nullptr); + VectorPtr + ExecJsonContainsAllWithDiffTypeByKeyIndex(); + VectorPtr ExecJsonContainsWithDiffType(OffsetVector* input = nullptr); + VectorPtr + ExecJsonContainsWithDiffTypeByKeyIndex(); + VectorPtr EvalArrayContainsForIndexSegment(); diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index bb1a072667d57..de7b35f9e331c 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -558,7 +558,7 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() { return term_set.find(ValueType(val.value())) != term_set.end(); }; cached_index_chunk_res_ = - index->FilterByPath(pointer, filter_func).clone(); + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); cached_index_chunk_id_ = 0; } diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index 725af0854049a..4ece8ddd86f33 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -276,144 +276,145 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(OffsetVector* input) { if (expr_->column_.nested_path_.size() > 0) { index = std::stoi(expr_->column_.nested_path_[0]); } - auto execute_sub_batch = [op_type]( - const milvus::ArrayView* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val, - int index) { - switch (op_type) { - case proto::plan::GreaterThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::GreaterEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::LessThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::LessEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::Equal: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::NotEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::PrefixMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::Match: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; + auto execute_sub_batch = + [op_type]( + const milvus::ArrayView* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val, + int index) { + switch (op_type) { + case proto::plan::GreaterThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::GreaterEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::LessThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::LessEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::Equal: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::NotEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::PrefixMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::Match: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + default: + PanicInfo( + OpTypeInvalid, + fmt::format( + "unsupported operator type for unary expr: {}", + op_type)); } - default: - PanicInfo( - OpTypeInvalid, - fmt::format("unsupported operator type for unary expr: {}", - op_type)); - } - }; + }; int64_t processed_size; if (has_offset_input_) { processed_size = @@ -479,7 +480,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { }; } else { auto size_per_chunk = segment_->size_per_chunk(); - retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { + retrieve = [size_per_chunk, this](int64_t offset) -> auto { auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = @@ -553,6 +554,12 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { std::conditional_t, std::string_view, ExprValueType>; + + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecRangeVisitorImplJsonForIndex(); + } + auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -598,15 +605,15 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = (cmp); \ } while (false) - auto execute_sub_batch = - [ op_type, pointer ]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ExprValueType val) { + auto execute_sub_batch = [op_type, pointer]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ExprValueType val) { switch (op_type) { case proto::plan::GreaterThan: { for (size_t i = 0; i < size; ++i) { @@ -793,6 +800,144 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { return res_vec; } +template +VectorPtr +PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() { + using GetType = + std::conditional_t, + std::string_view, + ExprValueType>; + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + ExprValueType val = GetValueFromProto(expr_->val_); + auto op_type = expr_->op_type_; + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, field_id, op_type, val](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); + switch (op_type) { + case proto::plan::GreaterThan: + if constexpr (std::is_same_v) { + return false; + } else { + auto x = json.at(offset, size); + if (x.error()) { + return false; + } + return ExprValueType(x.value()) > val; + } + case proto::plan::GreaterEqual: + if constexpr (std::is_same_v) { + return false; + } else { + auto x = json.at(offset, size); + if (x.error()) { + return false; + } + return ExprValueType(x.value()) >= val; + } + case proto::plan::LessThan: + if constexpr (std::is_same_v) { + return false; + } else { + auto x = json.at(offset, size); + if (x.error()) { + return false; + } + return ExprValueType(x.value()) < val; + } + case proto::plan::LessEqual: + if constexpr (std::is_same_v) { + return false; + } else { + auto x = json.at(offset, size); + if (x.error()) { + return false; + } + return ExprValueType(x.value()) <= val; + } + case proto::plan::Equal: + if constexpr (std::is_same_v) { + auto array = json.array_at(offset, size); + if (array.error()) { + return false; + } + return CompareTwoJsonArray(array.value(), val); + } else { + auto x = json.at(offset, size); + if (x.error()) { + return false; + } + return ExprValueType(x.value()) == val; + } + case proto::plan::NotEqual: + if constexpr (std::is_same_v) { + auto array = json.array_at(offset, size); + if (array.error()) { + return false; + } + return !CompareTwoJsonArray(array.value(), val); + } else { + auto x = json.at(offset, size); + if (x.error()) { + return false; + } + return ExprValueType(x.value()) != val; + } + case proto::plan::PrefixMatch: + if constexpr (std::is_same_v) { + return false; + } else { + auto x = json.at(offset, size); + if (x.error()) { + return false; + } + return milvus::query::Match( + ExprValueType(x.value()), val, op_type); + } + case proto::plan::Match: + if constexpr (std::is_same_v) { + return false; + } else { + auto x = json.at(offset, size); + if (x.error()) { + return false; + } + PatternMatchTranslator translator; + auto regex_pattern = translator(val); + RegexMatcher matcher(regex_pattern); + return matcher(ExprValueType(x.value())); + } + default: + return false; + } + }; + cached_index_chunk_res_ = + index->FilterByPath(pointer, real_batch_size, filter_func).clone(); + cached_index_chunk_id_ = 0; + } + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); +} + template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl(OffsetVector* input) { @@ -978,13 +1123,13 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { auto execute_sub_batch = [expr_type]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - IndexInnerType val) { + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; diff --git a/internal/core/src/exec/expression/UnaryExpr.h b/internal/core/src/exec/expression/UnaryExpr.h index 159fe5abb4091..f4c368ab58f50 100644 --- a/internal/core/src/exec/expression/UnaryExpr.h +++ b/internal/core/src/exec/expression/UnaryExpr.h @@ -353,6 +353,10 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { VectorPtr ExecRangeVisitorImplJson(OffsetVector* input = nullptr); + template + VectorPtr + ExecRangeVisitorImplJsonForIndex(); + template VectorPtr ExecRangeVisitorImplArray(OffsetVector* input = nullptr); diff --git a/internal/core/src/index/JsonKeyInvertedIndex.h b/internal/core/src/index/JsonKeyInvertedIndex.h index 90f8ba1f84db0..b686f63897f2e 100644 --- a/internal/core/src/index/JsonKeyInvertedIndex.h +++ b/internal/core/src/index/JsonKeyInvertedIndex.h @@ -39,8 +39,9 @@ class JsonKeyInvertedIndex : public InvertedIndexTantivy { const TargetBitmap FilterByPath(const std::string& path, + int32_t row, std::function filter) { - TargetBitmap bitset(Count()); + TargetBitmap bitset(row); auto array = wrapper_->term_query(path); LOG_DEBUG("json key filter size:{}", array.array_.len); diff --git a/internal/core/unittest/test_json_key_index.cpp b/internal/core/unittest/test_json_key_index.cpp index 137718eb2f794..c128c78909535 100644 --- a/internal/core/unittest/test_json_key_index.cpp +++ b/internal/core/unittest/test_json_key_index.cpp @@ -30,11 +30,23 @@ using namespace milvus::indexbuilder; using namespace milvus; using namespace milvus::index; +std::string +join(const std::vector& vec, const std::string& delimiter) { + std::ostringstream oss; + for (size_t i = 0; i < vec.size(); ++i) { + oss << vec[i]; + if (i != vec.size() - 1) { + oss << delimiter; + } + } + return oss.str(); +} + // 1000 keys static std::string GenerateJson(int N) { std::vector data(N); - std::default_random_engine er(67); + std::default_random_engine er(42); std::normal_distribution<> distr(0, 1); std::vector keys; for (int i = 0; i < N; i++) { @@ -43,33 +55,33 @@ GenerateJson(int N) { std::string json_string; std::vector values(N); for (int i = 0; i < N; i++) { - if (i % 7 == 0 || i % 7 == 4) { + if (i % 7 == 0) { values[i] = std::to_string(er()); - } else if (i % 7 == 1 || i % 7 == 5) { + } else if (i % 7 == 1) { values[i] = std::to_string(static_cast(er())); - } else if (i % 7 == 2 || i % 7 == 6) { + } else if (i % 7 == 2) { values[i] = er() / 2 == 0 ? "true" : "false"; } else if (i % 7 == 3) { values[i] = "\"xxxx" + std::to_string(i) + "\""; - // } else if (i % 7 == 4) { - // std::vector intvec(10); - // for (int j = 0; j < 10; j++) { - // intvec[j] = std::to_string(i + j); - // } - // values[i] = "[" + join(intvec, ",") + "]"; - // } else if (i % 7 == 5) { - // std::vector doublevec(10); - // for (int j = 0; j < 10; j++) { - // doublevec[j] = - // std::to_string(static_cast(i + j + er())); - // } - // values[i] = "[" + join(doublevec, ",") + "]"; - // } else if (i % 7 == 6) { - // std::vector stringvec(10); - // for (int j = 0; j < 10; j++) { - // stringvec[j] = "\"xxx" + std::to_string(j) + "\""; - // } - // values[i] = "[" + join(stringvec, ",") + "]"; + } else if (i % 7 == 4) { + std::vector intvec(10); + for (int j = 0; j < 10; j++) { + intvec[j] = std::to_string(i + j); + } + values[i] = "[" + join(intvec, ",") + "]"; + } else if (i % 7 == 5) { + std::vector doublevec(10); + for (int j = 0; j < 10; j++) { + doublevec[j] = + std::to_string(static_cast(i + j + er())); + } + values[i] = "[" + join(doublevec, ",") + "]"; + } else if (i % 7 == 6) { + std::vector stringvec(10); + for (int j = 0; j < 10; j++) { + stringvec[j] = "\"xxx" + std::to_string(j) + "\""; + } + values[i] = "[" + join(stringvec, ",") + "]"; } } json_string += "{"; @@ -85,6 +97,7 @@ static std::vector GenerateJsons(int size, int dim) { std::vector jsons; for (int i = 0; i < size; ++i) { + std::cout << GenerateJson(dim) << std::endl; jsons.push_back( milvus::Json(simdjson::padded_string(GenerateJson(dim)))); } @@ -99,7 +112,9 @@ class JsonKeyIndexTest : public testing::Test { int64_t segment_id, int64_t field_id, int64_t index_build_id, - int64_t index_version) { + int64_t index_version, + int64_t size, + int64_t dim) { proto::schema::FieldSchema field_schema; field_schema.set_data_type(proto::schema::DataType::JSON); @@ -108,7 +123,7 @@ class JsonKeyIndexTest : public testing::Test { auto index_meta = storage::IndexMeta{ segment_id, field_id, index_build_id, index_version}; - data_ = std::move(GenerateJsons(10000, 100)); + data_ = std::move(GenerateJsons(size, dim)); auto field_data = storage::CreateFieldData(DataType::JSON); field_data->FillFieldData(data_.data(), data_.size()); storage::InsertData insert_data(field_data); @@ -162,6 +177,8 @@ class JsonKeyIndexTest : public testing::Test { int64_t field_id = 101; int64_t index_build_id = 1000; int64_t index_version = 10000; + size_ = 10; + dim_ = 10; std::string root_path = "/tmp/test-jsonkey-index/"; storage::StorageConfig storage_config; @@ -174,7 +191,9 @@ class JsonKeyIndexTest : public testing::Test { segment_id, field_id, index_build_id, - index_version); + index_version, + size_, + dim_); } virtual ~JsonKeyIndexTest() override { @@ -184,39 +203,231 @@ class JsonKeyIndexTest : public testing::Test { public: void TestTermInFunc() { - std::set term_set = {"xxxxx"}; - auto filter_func = [&term_set, this](uint32_t row_id, - uint16_t offset, - uint16_t size) { - //std::cout << row_id << " " << offset << " " << size << std::endl; - - auto val = this->data_[row_id].template at_pos( - offset, size); - if (val.second != "") { - //std::cout << val.error() << std::endl; - return false; + { + std::vector> testcases{{"705894"}}; + for (auto testcase : testcases) { + auto check = [&](std::string value) { + std::unordered_set term_set(testcase.begin(), + testcase.end()); + return term_set.find(value) != term_set.end(); + }; + std::unordered_set term_set(testcase.begin(), + testcase.end()); + auto filter_func = [&term_set, this](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto val = + this->data_[row_id].template at_pos( + offset, size); + if (val.second != "") { + return false; + } + return term_set.find((std::string(val.first))) != + term_set.end(); + }; + auto bitset = + index_->FilterByPath("/keys0", size_, filter_func); + + ASSERT_EQ(bitset.size(), size_); + for (int i = 0; i < bitset.size(); ++i) { + auto ans = bitset[i]; + auto ref = check("705894"); + ASSERT_EQ(ans, ref); + } + } + } + { + std::vector testcases{"true"}; + for (auto& value : testcases) { + auto filter_func = [this, &value](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto val = + this->data_[row_id].template at_pos( + offset, size); + if (val.second != "") { + return false; + } + return std::string(val.first) == value; + }; + + auto bitset = + index_->FilterByPath("/keys2", size_, filter_func); + ASSERT_EQ(bitset.size(), size_); + for (int i = 0; i < bitset.size(); ++i) { + auto ans = bitset[i]; + auto ref = (value == "false"); + ASSERT_EQ(ans, ref); + } + } + } + } + void + TestUnaryRangeInFunc() { + std::vector testcases{"10", "705894", "805894"}; + std::vector ops{ + OpType::Equal, + OpType::NotEqual, + OpType::GreaterThan, + OpType::GreaterEqual, + OpType::LessThan, + OpType::LessEqual, + }; + for (const auto& testcase : testcases) { + auto check = [&](std::string value) { return value == testcase; }; + std::function f = check; + for (auto& op : ops) { + switch (op) { + case OpType::Equal: { + f = [&](std::string value) { + return value == testcase; + }; + break; + } + case OpType::NotEqual: { + f = [&](std::string value) { + return value != testcase; + }; + break; + } + case OpType::GreaterEqual: { + f = [&](std::string value) { + return value >= testcase; + }; + break; + } + case OpType::GreaterThan: { + f = [&](std::string value) { return value > testcase; }; + break; + } + case OpType::LessEqual: { + f = [&](std::string value) { + return value <= testcase; + }; + break; + } + case OpType::LessThan: { + f = [&](std::string value) { return value < testcase; }; + break; + } + default: { + PanicInfo(Unsupported, "unsupported range node"); + } + } + + auto filter_func = [&op, &testcase, this](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto val = + this->data_[row_id].template at_pos( + offset, size); + if (val.second != "") { + return false; + } + switch (op) { + case OpType::GreaterThan: + return std::string(val.first) > testcase; + case OpType::GreaterEqual: + return std::string(val.first) >= testcase; + case OpType::LessThan: + return std::string(val.first) < testcase; + case OpType::LessEqual: + return std::string(val.first) <= testcase; + case OpType::Equal: + return std::string(val.first) == testcase; + case OpType::NotEqual: + return std::string(val.first) != testcase; + default: + return false; + } + }; + auto bitset = + index_->FilterByPath("/keys0", size_, filter_func); + ASSERT_EQ(bitset.size(), size_); + for (int i = 0; i < bitset.size(); ++i) { + auto ans = bitset[i]; + auto ref = f("705894"); + ASSERT_EQ(ans, ref); + } } - return term_set.find((std::string(val.first))) != term_set.end(); + } + } + + void + TestBinaryRangeInFunc() { + struct Testcase { + bool lower_inclusive; + bool upper_inclusive; + std::string lower; + std::string upper; + }; + std::vector testcases{ + {true, false, "10", "20"}, + {true, true, "20", "30"}, + {false, true, "30", "40"}, + {false, false, "40", "50"}, }; - index_->FilterByPath("/keys0", filter_func); + for (const auto& testcase : testcases) { + auto check = [&](std::string value) { + if (testcase.lower_inclusive && testcase.upper_inclusive) { + return testcase.lower <= value && value <= testcase.upper; + } else if (testcase.lower_inclusive && + !testcase.upper_inclusive) { + return testcase.lower <= value && value < testcase.upper; + } else if (!testcase.lower_inclusive && + testcase.upper_inclusive) { + return testcase.lower < value && value <= testcase.upper; + } else { + return testcase.lower < value && value < testcase.upper; + } + }; + + auto filter_func = [&testcase, this](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto val = + this->data_[row_id].template at_pos( + offset, size); + if (val.second != "") { + return false; + } + if (testcase.lower_inclusive && testcase.upper_inclusive) { + return testcase.lower <= std::string(val.first) && + std::string(val.first) <= testcase.upper; + } else if (testcase.lower_inclusive && + !testcase.upper_inclusive) { + return testcase.lower <= std::string(val.first) && + std::string(val.first) < testcase.upper; + } else if (!testcase.lower_inclusive && + testcase.upper_inclusive) { + return testcase.lower < std::string(val.first) && + std::string(val.first) <= testcase.upper; + } else { + return testcase.lower < std::string(val.first) && + std::string(val.first) < testcase.upper; + } + }; + auto bitset = index_->FilterByPath("/keys7", size_, filter_func); + ASSERT_EQ(bitset.size(), size_); + for (int i = 0; i < bitset.size(); ++i) { + auto ans = bitset[i]; + auto ref = check("970724117"); + ASSERT_EQ(ans, ref); + } + } } public: std::shared_ptr index_; DataType type_; - size_t nb_; + size_t size_; + size_t dim_; std::vector data_; std::shared_ptr chunk_manager_; }; TEST_F(JsonKeyIndexTest, CountFuncTest) { - int all_cost = 0; - while (true) { - auto start = std::chrono::steady_clock::now(); - TestTermInFunc(); - all_cost += std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count(); - std::cout << "all_cost" << all_cost << std::endl; - } + TestTermInFunc(); + TestUnaryRangeInFunc(); + TestBinaryRangeInFunc(); } \ No newline at end of file diff --git a/internal/indexnode/task_stats.go b/internal/indexnode/task_stats.go index 92233de1e2a58..787fc32d6daa9 100644 --- a/internal/indexnode/task_stats.go +++ b/internal/indexnode/task_stats.go @@ -333,10 +333,16 @@ func (st *statsTask) Execute(ctx context.Context) error { return err } } else if st.req.GetSubJobType() == indexpb.StatsSubJob_JsonKeyIndexJob { - err = st.createJsonKeyIndex(ctx, st.req.GetStorageConfig(), st.req.GetCollectionID(), - st.req.GetPartitionID(), st.req.GetTargetSegmentID(), st.req.GetTaskVersion(), st.req.GetTaskID(), insertLogs) + err = st.createJsonKeyIndex(ctx, + st.req.GetStorageConfig(), + st.req.GetCollectionID(), + st.req.GetPartitionID(), + st.req.GetTargetSegmentID(), + st.req.GetTaskVersion(), + st.req.GetTaskID(), + insertLogs) if err != nil { - log.Warn("stats wrong, failed to create text index", zap.Error(err)) + log.Warn("stats wrong, failed to create json index", zap.Error(err)) return err } }