Skip to content

Commit

Permalink
Storages: refactor DMFile vector search
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Oct 22, 2024
1 parent 9eb0c80 commit f1bc1d3
Show file tree
Hide file tree
Showing 14 changed files with 420 additions and 585 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileIndexWriter.h>
#include <Storages/DeltaMerge/File/DMFileVectorIndexWriter.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
Expand Down Expand Up @@ -540,7 +540,7 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment)

// No lock is needed, stable meta is immutable.
const auto build_info
= DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles());
= DMFileVectorIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles());
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty() || build_info.dm_files.empty())
return false;

Expand Down Expand Up @@ -615,7 +615,7 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co
// No lock is needed, stable meta is immutable.
auto segment_id = segment->segmentId();
auto build_info
= DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles());
= DMFileVectorIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles());
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
return true;

Expand Down Expand Up @@ -734,7 +734,7 @@ void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const Loc
DMFile::info(index_build_info.dm_files));

// 2. Build the index.
DMFileIndexWriter iw(DMFileIndexWriter::Options{
DMFileVectorIndexWriter iw(DMFileVectorIndexWriter::Options{
.path_pool = path_pool,
.index_infos = index_build_info.indexes_to_build,
.dm_files = index_build_info.dm_files,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,10 @@ class DMFile : private boost::noncopyable
#endif
DMFileMetaPtr meta;

friend class DMFileVectorIndexReader;
friend class DMFileV3IncrementWriter;
friend class DMFileWriter;
friend class DMFileIndexWriter;
friend class DMFileVectorIndexWriter;
friend class DMFileReader;
friend class MarkLoader;
friend class ColumnReadStream;
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,6 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
std::move(header_layout),
std::move(rest_columns_reader),
std::move(vec_column.value()),
file_provider,
read_limiter,
scan_context,
vector_index_cache,
bitmap_filter.value(),
Expand Down
223 changes: 223 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileVectorIndexReader.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/Index/VectorSearchPerf.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/S3/FileCache.h>
#include <Storages/S3/FileCachePerf.h>


namespace DB::ErrorCodes
{
extern const int S3_ERROR;
} // namespace DB::ErrorCodes

namespace DB::DM
{

String DMFileVectorIndexReader::PerfStat::toString() const
{
return fmt::format(
"index_size={} load={:.2f}s{}{}, search={:.2f}s, read_vec_column={:.2f}s",
index_size,
duration_load_index,
has_s3_download ? " (S3)" : "",
has_load_from_file ? " (LoadFile)" : "",
duration_search,
duration_read_vec_column);
}

std::vector<VectorIndexViewer::Key> DMFileVectorIndexReader::load()
{
if (loaded)
return {};

loadVectorIndex();
auto sorted_results = loadVectorSearchResult();

perf_stat.selected_nodes = sorted_results.size();
loaded = true;
return sorted_results;
}

void DMFileVectorIndexReader::loadVectorIndex()
{
const auto col_id = ann_query_info->column_id();
const auto index_id = ann_query_info->index_id() > 0 ? ann_query_info->index_id() : EmptyIndexID;

RUNTIME_CHECK(dmfile->useMetaV2()); // v3

// Check vector index exists on the column
auto vector_index = dmfile->getLocalIndex(col_id, index_id);
RUNTIME_CHECK(vector_index.has_value(), col_id, index_id);
perf_stat.index_size = vector_index->index_bytes();

// If local file is invalidated, cache is not valid anymore. So we
// need to ensure file exists on local fs first.
const auto index_file_path = index_id > 0 //
? dmfile->vectorIndexPath(index_id) //
: dmfile->colIndexPath(DMFile::getFileNameBase(col_id));
String local_index_file_path;
if (auto s3_file_name = S3::S3FilenameView::fromKeyWithPrefix(index_file_path); s3_file_name.isValid())
{
// Disaggregated mode
auto * file_cache = FileCache::instance();
RUNTIME_CHECK_MSG(file_cache, "Must enable S3 file cache to use vector index");

Stopwatch watch;

auto perf_begin = PerfContext::file_cache;

// If download file failed, retry a few times.
for (auto i = 3; i > 0; --i)
{
try
{
if (auto file_guard = file_cache->downloadFileForLocalRead(s3_file_name, vector_index->index_bytes());
file_guard)
{
local_index_file_path = file_guard->getLocalFileName();
break; // Successfully downloaded index into local cache
}

throw Exception(ErrorCodes::S3_ERROR, "Failed to download vector index file {}", index_file_path);
}
catch (...)
{
if (i <= 1)
throw;
}
}

if ( //
PerfContext::file_cache.fg_download_from_s3 > perf_begin.fg_download_from_s3 || //
PerfContext::file_cache.fg_wait_download_from_s3 > perf_begin.fg_wait_download_from_s3)
perf_stat.has_s3_download = true;

auto download_duration = watch.elapsedSeconds();
perf_stat.duration_load_index += download_duration;

GET_METRIC(tiflash_vector_index_duration, type_download).Observe(download_duration);
}
else
{
// Not disaggregated mode
local_index_file_path = index_file_path;
}

auto load_from_file = [&]() {
perf_stat.has_load_from_file = true;
return VectorIndexViewer::view(*vector_index, local_index_file_path);
};

Stopwatch watch;
if (vec_index_cache)
// Note: must use local_index_file_path as the cache key, because cache
// will check whether file is still valid and try to remove memory references
// when file is dropped.
vec_index = vec_index_cache->getOrSet(local_index_file_path, load_from_file);
else
vec_index = load_from_file();

perf_stat.duration_load_index += watch.elapsedSeconds();
RUNTIME_CHECK(vec_index != nullptr);

scan_context->total_vector_idx_load_time_ms += static_cast<UInt64>(perf_stat.duration_load_index * 1000);
if (perf_stat.has_s3_download)
// it could be possible that s3=true but load_from_file=false, it means we download a file
// and then reuse the memory cache. The majority time comes from s3 download
// so we still count it as s3 download.
scan_context->total_vector_idx_load_from_s3++;
else if (perf_stat.has_load_from_file)
scan_context->total_vector_idx_load_from_disk++;
else
scan_context->total_vector_idx_load_from_cache++;
}

DMFileVectorIndexReader::~DMFileVectorIndexReader()
{
scan_context->total_vector_idx_read_vec_time_ms += static_cast<UInt64>(perf_stat.duration_read_vec_column * 1000);
}

String DMFileVectorIndexReader::perfStat() const
{
return fmt::format(
"{} top_k_[query/visited/discarded/result]={}/{}/{}/{}",
perf_stat.toString(),
ann_query_info->top_k(),
perf_stat.visited_nodes,
perf_stat.discarded_nodes,
perf_stat.selected_nodes);
}

std::vector<VectorIndexViewer::Key> DMFileVectorIndexReader::loadVectorSearchResult()
{
Stopwatch watch;

auto perf_begin = PerfContext::vector_search;

RUNTIME_CHECK(valid_rows.size() >= dmfile->getRows(), valid_rows.size(), dmfile->getRows());
auto sorted_results = vec_index->search(ann_query_info, valid_rows);
std::sort(sorted_results.begin(), sorted_results.end());
// results must not contain duplicates. Usually there should be no duplicates.
sorted_results.erase(std::unique(sorted_results.begin(), sorted_results.end()), sorted_results.end());

perf_stat.discarded_nodes = PerfContext::vector_search.discarded_nodes - perf_begin.discarded_nodes;
perf_stat.visited_nodes = PerfContext::vector_search.visited_nodes - perf_begin.visited_nodes;

perf_stat.duration_search = watch.elapsedSeconds();
scan_context->total_vector_idx_search_time_ms += static_cast<UInt64>(perf_stat.duration_search * 1000);
scan_context->total_vector_idx_search_discarded_nodes += perf_stat.discarded_nodes;
scan_context->total_vector_idx_search_visited_nodes += perf_stat.visited_nodes;

return sorted_results;
}

void DMFileVectorIndexReader::read(
MutableColumnPtr & vec_column,
const std::vector<VectorIndexViewer::Key> & row_ids,
size_t start_offset,
size_t column_size)
{
Stopwatch watch;
RUNTIME_CHECK(loaded);

vec_column->reserve(column_size);
std::vector<Float32> value;
size_t current_rowid = start_offset;
for (auto rowid : row_ids)
{
vec_index->get(rowid, value);
if (rowid > current_rowid)
{
UInt32 nulls = rowid - current_rowid;
// Insert [] if column is Not Null, or NULL if column is Nullable
vec_column->insertManyDefaults(nulls);
}
vec_column->insertData(reinterpret_cast<const char *>(value.data()), value.size() * sizeof(Float32));
current_rowid = rowid + 1;
}
if (current_rowid < start_offset + column_size)
{
UInt32 nulls = column_size + start_offset - current_rowid;
vec_column->insertManyDefaults(nulls);
}
perf_stat.duration_read_vec_column += watch.elapsedSeconds();
}

} // namespace DB::DM
97 changes: 97 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>

namespace DB::DM
{

class DMFileVectorIndexReader
{
private:
const DMFilePtr & dmfile;
const ANNQueryInfoPtr & ann_query_info;
const BitmapFilterView valid_rows;
const ScanContextPtr & scan_context;
// Global vector index cache
const VectorIndexCachePtr vec_index_cache;

// Performance statistics
struct PerfStat
{
double duration_search;
double duration_load_index;
double duration_read_vec_column;
size_t index_size;
size_t visited_nodes;
size_t discarded_nodes;
size_t selected_nodes;
bool has_s3_download;
bool has_load_from_file;

String toString() const;
};
PerfStat perf_stat;

// Set after load().
VectorIndexViewerPtr vec_index = nullptr;
bool loaded = false;

public:
DMFileVectorIndexReader(
const ANNQueryInfoPtr & ann_query_info_,
const DMFilePtr & dmfile_,
const BitmapFilterView & valid_rows_,
const ScanContextPtr & scan_context_,
const VectorIndexCachePtr & vec_index_cache_)
: dmfile(dmfile_)
, ann_query_info(ann_query_info_)
, valid_rows(valid_rows_)
, scan_context(scan_context_)
, vec_index_cache(vec_index_cache_)
, perf_stat()
{}

~DMFileVectorIndexReader();

// Read vector column data and set filter.
// The column will be as same as as the rows of the tiny file,
// but only the rows in sorted_results will be filled,
// others will be filled with default values.
// return the real number of rows read.
void read(
MutableColumnPtr & vec_column,
const std::vector<VectorIndexViewer::Key> & row_ids,
size_t start_offset,
size_t column_size);

// Load vector index and search results.
// Return the rowids of the selected rows.
std::vector<VectorIndexViewer::Key> load();

String perfStat() const;

private:
void loadVectorIndex();
std::vector<VectorIndexViewer::Key> loadVectorSearchResult();
};

using DMFileVectorIndexReaderPtr = std::shared_ptr<DMFileVectorIndexReader>;

} // namespace DB::DM
Loading

0 comments on commit f1bc1d3

Please sign in to comment.