Skip to content

Commit

Permalink
feat growingsegment jsonkey index
Browse files Browse the repository at this point in the history
Signed-off-by: Xianhui.Lin <[email protected]>

feat growingsegment jsonkey index

Signed-off-by: Xianhui.Lin <[email protected]>
  • Loading branch information
JsDove committed Jan 13, 2025
1 parent 02956d5 commit bc1cd95
Show file tree
Hide file tree
Showing 26 changed files with 460 additions and 67 deletions.
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ queryNode:
growingMmapEnabled: false
fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager
maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager
jsonIndexMemoryBudgetInTantivy: 64 # the memory budget for the JSON index In Tantivy
jsonIndexCommitInterval: 200 # the commit interval for the JSON index to commit
lazyload:
enabled: false # Enable lazyload for loading data
waitTimeout: 30000 # max wait timeout duration in milliseconds before start to do lazyload search and retrieve
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/common/FieldMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ FieldMeta::enable_match() const {
return string_info_->enable_match;
}

bool
FieldMeta::enable_jsonIndex() const {
return IsJsonDataType(type_);
}

bool
FieldMeta::enable_analyzer() const {
if (!IsStringDataType(type_)) {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/common/FieldMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class FieldMeta {
bool
enable_analyzer() const;

bool
enable_jsonIndex() const;

TokenizerParams
get_analyzer_params() const;

Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ typedef struct CMmapConfig {
uint64_t fix_file_size;
bool growing_enable_mmap;
bool scalar_index_enable_mmap;
uint64_t json_index_memory_budget;
uint64_t json_index_commit_interval;
} CMmapConfig;

typedef struct CTraceConfig {
Expand Down
10 changes: 7 additions & 3 deletions internal/core/src/exec/expression/BinaryRangeExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() {
using GetType = std::conditional_t<std::is_same_v<ValueType, std::string>,
std::string_view,
ValueType>;
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
Expand All @@ -477,8 +476,13 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() {
ValueType val1 = GetValueFromProto<ValueType>(expr_->lower_val_);
ValueType val2 = GetValueFromProto<ValueType>(expr_->upper_val_);
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down
10 changes: 7 additions & 3 deletions internal/core/src/exec/expression/ExistsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,18 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) {

VectorPtr
PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() {
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down
7 changes: 6 additions & 1 deletion internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
#include "expr/ITypeExpr.h"
#include "query/PlanProto.h"
#include "segcore/SegmentSealedImpl.h"

#include "segcore/SegmentInterface.h"
#include "segcore/SegmentGrowingImpl.h"
namespace milvus {
namespace exec {

Expand Down Expand Up @@ -1070,6 +1071,10 @@ class SegmentExpr : public Expr {
if (sealed_seg->GetJsonKeyIndex(field_id) != nullptr) {
return true;
}
} else if (segment_->type() == SegmentType ::Growing) {
if (segment_->GetJsonKeyIndex(field_id) != nullptr) {
return true;
}
}
return false;
}
Expand Down
60 changes: 42 additions & 18 deletions internal/core/src/exec/expression/JsonContainsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() {
std::conditional_t<std::is_same_v<ExprValueType, std::string>,
std::string_view,
ExprValueType>;
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
Expand All @@ -357,8 +356,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() {
TargetBitmap(real_batch_size, true));
}
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down Expand Up @@ -498,7 +502,6 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) {

VectorPtr
PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() {
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
Expand All @@ -514,8 +517,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() {
TargetBitmap(real_batch_size, true));
}
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down Expand Up @@ -742,7 +750,6 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() {
std::conditional_t<std::is_same_v<ExprValueType, std::string>,
std::string_view,
ExprValueType>;
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
Expand All @@ -758,8 +765,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() {
TargetBitmap(real_batch_size, true));
}
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down Expand Up @@ -960,7 +972,6 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(

VectorPtr
PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() {
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
Expand All @@ -979,8 +990,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() {
TargetBitmap(real_batch_size, true));
}
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down Expand Up @@ -1186,7 +1202,6 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) {

VectorPtr
PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() {
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
Expand All @@ -1202,8 +1217,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() {
TargetBitmap(real_batch_size, true));
}
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down Expand Up @@ -1397,7 +1417,6 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) {

VectorPtr
PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() {
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
Expand All @@ -1410,8 +1429,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() {
TargetBitmap(real_batch_size, true));
}
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down
19 changes: 7 additions & 12 deletions internal/core/src/exec/expression/TermExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,22 +496,12 @@ PhyTermFilterExpr::ExecTermJsonVariableInField(OffsetVector* input) {
return res_vec;
}

static void
pollute_cache(size_t size) {
std::vector<char> dummy(size);
for (size_t i = 0; i < size; ++i) {
dummy[i] = 'x';
}
volatile char sink = dummy[0];
}

template <typename ValueType>
VectorPtr
PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() {
using GetType = std::conditional_t<std::is_same_v<ValueType, std::string>,
std::string_view,
ValueType>;
Assert(segment_->type() == SegmentType::Sealed);
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
Expand All @@ -530,8 +520,13 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() {
}

if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Expand Down
13 changes: 9 additions & 4 deletions internal/core/src/exec/expression/UnaryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,6 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() {
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);

#define UnaryRangeJSONIndexCompare(cmp) \
do { \
auto x = json.at<GetType>(offset, size); \
Expand Down Expand Up @@ -839,11 +838,17 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() {
ExprValueType val = GetValueFromProto<ExprValueType>(expr_->val_);
auto op_type = expr_->op_type_;
if (cached_index_chunk_id_ != 0) {
const auto* sealed_seg =
dynamic_cast<const segcore::SegmentSealed*>(segment_);
const segcore::SegmentInternalInterface* sealed_seg = nullptr;
if (segment_->type() == SegmentType::Growing) {
sealed_seg =
dynamic_cast<const segcore::SegmentGrowingImpl*>(segment_);
} else if (segment_->type() == SegmentType::Sealed) {
sealed_seg = dynamic_cast<const segcore::SegmentSealed*>(segment_);
}
auto field_id = expr_->column_.field_id_;
auto* index = sealed_seg->GetJsonKeyIndex(field_id);
auto* index = segment_->GetJsonKeyIndex(field_id);
Assert(index != nullptr);
Assert(sealed_seg != nullptr);
auto filter_func = [sealed_seg, field_id, op_type, val](uint32_t row_id,
uint16_t offset,
uint16_t size) {
Expand Down
Loading

0 comments on commit bc1cd95

Please sign in to comment.