Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refact](inverted index) refactor inverted index writer init #29072

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 137 additions & 182 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ const int32_t MERGE_FACTOR = 100000000;
const int32_t MAX_LEAF_COUNT = 1024;
const float MAXMBSortInHeap = 512.0 * 8;
const int DIMS = 1;
const std::string empty_value;

template <FieldType field_type>
class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
Expand Down Expand Up @@ -139,88 +138,123 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
return Status::OK();
}

Status init_fulltext_index() {
bool create = true;
std::unique_ptr<lucene::analysis::Analyzer> create_chinese_analyzer() {
auto chinese_analyzer = std::make_unique<lucene::analysis::LanguageBasedAnalyzer>();
chinese_analyzer->setLanguage(L"chinese");
chinese_analyzer->initDict(config::inverted_index_dict_path);

auto mode = get_parser_mode_string_from_properties(_index_meta->properties());
if (mode == INVERTED_INDEX_PARSER_FINE_GRANULARITY) {
chinese_analyzer->setMode(lucene::analysis::AnalyzerMode::All);
} else {
chinese_analyzer->setMode(lucene::analysis::AnalyzerMode::Default);
}

return chinese_analyzer;
}

Status create_char_string_reader(std::unique_ptr<lucene::util::Reader>& string_reader) {
CharFilterMap char_filter_map =
get_parser_char_filter_map_from_properties(_index_meta->properties());
if (!char_filter_map.empty()) {
string_reader = std::unique_ptr<lucene::util::Reader>(CharFilterFactory::create(
char_filter_map[INVERTED_INDEX_PARSER_CHAR_FILTER_TYPE],
new lucene::util::SStringReader<char>(),
char_filter_map[INVERTED_INDEX_PARSER_CHAR_FILTER_PATTERN],
char_filter_map[INVERTED_INDEX_PARSER_CHAR_FILTER_REPLACEMENT]));
} else {
string_reader = std::make_unique<lucene::util::SStringReader<char>>();
}
return Status::OK();
}

Status create_index_directory(std::unique_ptr<DorisCompoundDirectory>& dir) {
bool create = true;
auto index_path = InvertedIndexDescriptor::get_temporary_index_path(
_directory + "/" + _segment_file_name, _index_meta->index_id(),
_index_meta->get_index_suffix());

bool exists = false;
auto st = _fs->exists(index_path.c_str(), &exists);
if (!st.ok()) {
LOG(ERROR) << "index_path:"
<< " exists error:" << st;
LOG(ERROR) << "index_path: exists error:" << st;
return st;
}
if (exists) {
LOG(ERROR) << "try to init a directory:" << index_path << " already exists";
return Status::InternalError("init_fulltext_index directory already exists");
}

_char_string_reader = std::make_unique<lucene::util::SStringReader<char>>();
CharFilterMap char_filter_map =
get_parser_char_filter_map_from_properties(_index_meta->properties());
if (!char_filter_map.empty()) {
_char_string_reader.reset(CharFilterFactory::create(
char_filter_map[INVERTED_INDEX_PARSER_CHAR_FILTER_TYPE],
_char_string_reader.release(),
char_filter_map[INVERTED_INDEX_PARSER_CHAR_FILTER_PATTERN],
char_filter_map[INVERTED_INDEX_PARSER_CHAR_FILTER_REPLACEMENT]));
}
dir = std::unique_ptr<DorisCompoundDirectory>(
DorisCompoundDirectory::getDirectory(_fs, index_path.c_str(), create));
return Status::OK();
}

_doc = std::make_unique<lucene::document::Document>();
_dir.reset(DorisCompoundDirectory::getDirectory(_fs, index_path.c_str(), true));

if (_parser_type == InvertedIndexParserType::PARSER_STANDARD ||
_parser_type == InvertedIndexParserType::PARSER_UNICODE) {
_analyzer = std::make_unique<lucene::analysis::standard95::StandardAnalyzer>();
} else if (_parser_type == InvertedIndexParserType::PARSER_ENGLISH) {
_analyzer = std::make_unique<lucene::analysis::SimpleAnalyzer<char>>();
} else if (_parser_type == InvertedIndexParserType::PARSER_CHINESE) {
auto chinese_analyzer = _CLNEW lucene::analysis::LanguageBasedAnalyzer();
chinese_analyzer->setLanguage(L"chinese");
chinese_analyzer->initDict(config::inverted_index_dict_path);
auto mode = get_parser_mode_string_from_properties(_index_meta->properties());
if (mode == INVERTED_INDEX_PARSER_FINE_GRANULARITY) {
chinese_analyzer->setMode(lucene::analysis::AnalyzerMode::All);
} else {
chinese_analyzer->setMode(lucene::analysis::AnalyzerMode::Default);
}
_analyzer.reset(chinese_analyzer);
} else {
// ANALYSER_NOT_SET, ANALYSER_NONE use default SimpleAnalyzer
_analyzer = std::make_unique<lucene::analysis::SimpleAnalyzer<char>>();
Status create_index_writer(std::unique_ptr<lucene::index::IndexWriter>& index_writer) {
bool create_index = true;
bool close_dir_on_shutdown = true;
index_writer = std::make_unique<lucene::index::IndexWriter>(
_dir.get(), _analyzer.get(), create_index, close_dir_on_shutdown);
index_writer->setMaxBufferedDocs(MAX_BUFFER_DOCS);
index_writer->setRAMBufferSizeMB(config::inverted_index_ram_buffer_size);
index_writer->setMaxFieldLength(MAX_FIELD_LEN);
index_writer->setMergeFactor(MERGE_FACTOR);
index_writer->setUseCompoundFile(false);

return Status::OK();
}

Status create_field(lucene::document::Field** field) {
int field_config = int(lucene::document::Field::STORE_NO) |
int(lucene::document::Field::INDEX_NONORMS);
field_config |= (_parser_type == InvertedIndexParserType::PARSER_NONE)
? int(lucene::document::Field::INDEX_UNTOKENIZED)
: int(lucene::document::Field::INDEX_TOKENIZED);
*field = new lucene::document::Field(_field_name.c_str(), field_config);
(*field)->setOmitTermFreqAndPositions(
get_parser_phrase_support_string_from_properties(_index_meta->properties()) ==
INVERTED_INDEX_PARSER_PHRASE_SUPPORT_YES
? false
: true);
return Status::OK();
}

Status create_analyzer(std::unique_ptr<lucene::analysis::Analyzer>& analyzer) {
switch (_parser_type) {
case InvertedIndexParserType::PARSER_STANDARD:
case InvertedIndexParserType::PARSER_UNICODE:
analyzer = std::make_unique<lucene::analysis::standard95::StandardAnalyzer>();
break;
case InvertedIndexParserType::PARSER_ENGLISH:
analyzer = std::make_unique<lucene::analysis::SimpleAnalyzer<char>>();
break;
case InvertedIndexParserType::PARSER_CHINESE:
analyzer = create_chinese_analyzer();
break;
default:
analyzer = std::make_unique<lucene::analysis::SimpleAnalyzer<char>>();
break;
}
setup_analyzer_lowercase(analyzer);
return Status::OK();
}

void setup_analyzer_lowercase(std::unique_ptr<lucene::analysis::Analyzer>& analyzer) {
auto lowercase = get_parser_lowercase_from_properties(_index_meta->properties());
if (lowercase == "true") {
_analyzer->set_lowercase(true);
analyzer->set_lowercase(true);
} else if (lowercase == "false") {
_analyzer->set_lowercase(false);
analyzer->set_lowercase(false);
}
_index_writer = std::make_unique<lucene::index::IndexWriter>(_dir.get(), _analyzer.get(),
create, true);
_index_writer->setMaxBufferedDocs(MAX_BUFFER_DOCS);
_index_writer->setRAMBufferSizeMB(config::inverted_index_ram_buffer_size);
_index_writer->setMaxFieldLength(MAX_FIELD_LEN);
_index_writer->setMergeFactor(MERGE_FACTOR);
_index_writer->setUseCompoundFile(false);
_doc->clear();
}

int field_config = int(lucene::document::Field::STORE_NO) |
int(lucene::document::Field::INDEX_NONORMS);
if (_parser_type == InvertedIndexParserType::PARSER_NONE) {
field_config |= int(lucene::document::Field::INDEX_UNTOKENIZED);
} else {
field_config |= int(lucene::document::Field::INDEX_TOKENIZED);
}
_field = new lucene::document::Field(_field_name.c_str(), field_config);
if (get_parser_phrase_support_string_from_properties(_index_meta->properties()) ==
INVERTED_INDEX_PARSER_PHRASE_SUPPORT_YES) {
_field->setOmitTermFreqAndPositions(false);
} else {
_field->setOmitTermFreqAndPositions(true);
}
Status init_fulltext_index() {
RETURN_IF_ERROR(create_index_directory(_dir));
RETURN_IF_ERROR(create_char_string_reader(_char_string_reader));
RETURN_IF_ERROR(create_analyzer(_analyzer));
RETURN_IF_ERROR(create_index_writer(_index_writer));
RETURN_IF_ERROR(create_field(&_field));
_doc = std::make_unique<lucene::document::Document>();
_doc->add(*_field);
return Status::OK();
}
Expand Down Expand Up @@ -555,148 +589,69 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
roaring::Roaring _null_bitmap;
uint64_t _reverted_index_size;

std::unique_ptr<lucene::document::Document> _doc {};
lucene::document::Field* _field {};
std::unique_ptr<lucene::index::IndexWriter> _index_writer {};
std::unique_ptr<lucene::analysis::Analyzer> _analyzer {};
std::unique_ptr<lucene::util::Reader> _char_string_reader {};
std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer;
std::unique_ptr<lucene::document::Document> _doc = nullptr;
lucene::document::Field* _field = nullptr;
std::unique_ptr<lucene::index::IndexWriter> _index_writer = nullptr;
std::unique_ptr<lucene::analysis::Analyzer> _analyzer = nullptr;
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
std::unique_ptr<DorisCompoundDirectory> _dir = nullptr;
std::string _segment_file_name;
std::string _directory;
io::FileSystemSPtr _fs;
const KeyCoder* _value_key_coder;
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
std::wstring _field_name;
std::unique_ptr<DorisCompoundDirectory> _dir;
};

Status InvertedIndexColumnWriter::create(const Field* field,
std::unique_ptr<InvertedIndexColumnWriter>* res,
const std::string& segment_file_name,
const std::string& dir, const TabletIndex* index_meta,
const io::FileSystemSPtr& fs) {
auto typeinfo = field->type_info();
const auto* typeinfo = field->type_info();
FieldType type = typeinfo->type();
std::string field_name = field->name();
if (type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
const auto array_typeinfo = dynamic_cast<const ArrayTypeInfo*>(typeinfo);
typeinfo = array_typeinfo->item_type_info();
type = typeinfo->type();
const auto* array_typeinfo = dynamic_cast<const ArrayTypeInfo*>(typeinfo);
if (array_typeinfo != nullptr) {
typeinfo = array_typeinfo->item_type_info();
type = typeinfo->type();
} else {
return Status::NotSupported("unsupported array type for inverted index: " +
std::to_string(int(type)));
}
}

switch (type) {
case FieldType::OLAP_FIELD_TYPE_CHAR: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_CHAR>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_VARCHAR: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_VARCHAR>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_STRING: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_STRING>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DATETIME: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DATETIME>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DATE: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DATE>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DATETIMEV2: {
*res = std::make_unique<
InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DATETIMEV2>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DATEV2: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DATEV2>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_TINYINT: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_TINYINT>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_SMALLINT: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_SMALLINT>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT: {
*res = std::make_unique<
InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>>(
field_name, segment_file_name, dir, fs, index_meta);
#define M(TYPE) \
case TYPE: \
*res = std::make_unique<InvertedIndexColumnWriterImpl<TYPE>>( \
field_name, segment_file_name, dir, fs, index_meta); \
break;
}
case FieldType::OLAP_FIELD_TYPE_INT: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_INT>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_LARGEINT: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_LARGEINT>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DECIMAL>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL32: {
*res = std::make_unique<
InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DECIMAL32>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL64: {
*res = std::make_unique<
InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DECIMAL64>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL128I: {
*res = std::make_unique<
InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DECIMAL128I>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL256: {
*res = std::make_unique<
InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DECIMAL256>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_BOOL: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_BOOL>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_DOUBLE: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_DOUBLE>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_FLOAT: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_FLOAT>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
case FieldType::OLAP_FIELD_TYPE_BIGINT: {
*res = std::make_unique<InvertedIndexColumnWriterImpl<FieldType::OLAP_FIELD_TYPE_BIGINT>>(
field_name, segment_file_name, dir, fs, index_meta);
break;
}
M(FieldType::OLAP_FIELD_TYPE_TINYINT)
M(FieldType::OLAP_FIELD_TYPE_SMALLINT)
M(FieldType::OLAP_FIELD_TYPE_INT)
M(FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT)
M(FieldType::OLAP_FIELD_TYPE_BIGINT)
M(FieldType::OLAP_FIELD_TYPE_LARGEINT)
M(FieldType::OLAP_FIELD_TYPE_CHAR)
M(FieldType::OLAP_FIELD_TYPE_VARCHAR)
M(FieldType::OLAP_FIELD_TYPE_STRING)
M(FieldType::OLAP_FIELD_TYPE_DATE)
M(FieldType::OLAP_FIELD_TYPE_DATETIME)
M(FieldType::OLAP_FIELD_TYPE_DECIMAL)
M(FieldType::OLAP_FIELD_TYPE_DATEV2)
M(FieldType::OLAP_FIELD_TYPE_DATETIMEV2)
M(FieldType::OLAP_FIELD_TYPE_DECIMAL32)
M(FieldType::OLAP_FIELD_TYPE_DECIMAL64)
M(FieldType::OLAP_FIELD_TYPE_DECIMAL128I)
M(FieldType::OLAP_FIELD_TYPE_DECIMAL256)
M(FieldType::OLAP_FIELD_TYPE_BOOL)
M(FieldType::OLAP_FIELD_TYPE_DOUBLE)
M(FieldType::OLAP_FIELD_TYPE_FLOAT)
#undef M
default:
return Status::NotSupported("unsupported type for inverted index: " +
std::to_string(int(type)));
Expand Down
Loading