From 0bc9090a5a86ca595afab483fbb2f2a532a5fd65 Mon Sep 17 00:00:00 2001 From: Kumar Saurabh Arora Date: Tue, 2 Apr 2024 20:46:39 -0700 Subject: [PATCH] Support of skip_ids in merge_from_multiple function of OnDiskInvertedLists (#3327) Summary: **Context** 1. [Issue 2621](https://github.com/facebookresearch/faiss/issues/2621) discuss inconsistency between OnDiskInvertedList and InvertedList. OnDiskInvertedList is supposed to handle disk based multiple Index Shards. Thus, we should name it differently when merging invls from index shard. 2. [Issue 2876](https://github.com/facebookresearch/faiss/issues/2876) provides usecase of shifting ids when merging invls from different shards. **In this diff**, 1. To address #1 above, I renamed the merge_from function to merge_from_multiple without touching merge_from base class. why so? To continue to allow merge invl from one index to ondiskinvl from other index. 2. To address #2 above, I have added support of shift_ids in merge_from_multiple to shift ids from different shards. This can be used when each shard has same set of ids but different data. This is not recommended if id is already unique across shards. Reviewed By: mdouze Differential Revision: D55482518 --- contrib/ondisk.py | 4 +- faiss/invlists/OnDiskInvertedLists.cpp | 23 ++++++-- faiss/invlists/OnDiskInvertedLists.h | 3 +- tests/test_contrib.py | 73 +++++++++++++++++++++++--- tests/test_merge.cpp | 28 +++++++++- 5 files changed, 115 insertions(+), 16 deletions(-) diff --git a/contrib/ondisk.py b/contrib/ondisk.py index 26a95f44f5..81ec71941c 100644 --- a/contrib/ondisk.py +++ b/contrib/ondisk.py @@ -11,7 +11,7 @@ def merge_ondisk( - trained_index: faiss.Index, shard_fnames: List[str], ivfdata_fname: str + trained_index: faiss.Index, shard_fnames: List[str], ivfdata_fname: str, shift_ids=False ) -> None: """Add the contents of the indexes stored in shard_fnames into the index trained_index. The on-disk data is stored in ivfdata_fname""" @@ -51,7 +51,7 @@ def merge_ondisk( ivf_vector.push_back(ivf) LOG.info("merge %d inverted lists " % ivf_vector.size()) - ntotal = invlists.merge_from(ivf_vector.data(), ivf_vector.size()) + ntotal = invlists.merge_from_multiple(ivf_vector.data(), ivf_vector.size(), shift_ids) # now replace the inverted lists in the output index index.ntotal = index_ivf.ntotal = ntotal diff --git a/faiss/invlists/OnDiskInvertedLists.cpp b/faiss/invlists/OnDiskInvertedLists.cpp index 3017d164c6..dc17fe67f6 100644 --- a/faiss/invlists/OnDiskInvertedLists.cpp +++ b/faiss/invlists/OnDiskInvertedLists.cpp @@ -565,15 +565,16 @@ void OnDiskInvertedLists::free_slot(size_t offset, size_t capacity) { /***************************************** * Compact form *****************************************/ - -size_t OnDiskInvertedLists::merge_from( +size_t OnDiskInvertedLists::merge_from_multiple( const InvertedLists** ils, int n_il, + bool shift_ids, bool verbose) { FAISS_THROW_IF_NOT_MSG( totsize == 0, "works only on an empty InvertedLists"); std::vector sizes(nlist); + std::vector shift_id_offsets(n_il); for (int i = 0; i < n_il; i++) { const InvertedLists* il = ils[i]; FAISS_THROW_IF_NOT(il->nlist == nlist && il->code_size == code_size); @@ -581,6 +582,10 @@ size_t OnDiskInvertedLists::merge_from( for (size_t j = 0; j < nlist; j++) { sizes[j] += il->list_size(j); } + + size_t il_totsize = il->compute_ntotal(); + shift_id_offsets[i] = + (shift_ids && i > 0) ? shift_id_offsets[i - 1] + il_totsize : 0; } size_t cums = 0; @@ -605,11 +610,21 @@ size_t OnDiskInvertedLists::merge_from( const InvertedLists* il = ils[i]; size_t n_entry = il->list_size(j); l.size += n_entry; + ScopedIds scope_ids(il, j); + const idx_t* scope_ids_data = scope_ids.get(); + std::vector new_ids; + if (shift_ids) { + new_ids.resize(n_entry); + for (size_t k = 0; k < n_entry; k++) { + new_ids[k] = scope_ids[k] + shift_id_offsets[i]; + } + scope_ids_data = new_ids.data(); + } update_entries( j, l.size - n_entry, n_entry, - ScopedIds(il, j).get(), + scope_ids_data, ScopedCodes(il, j).get()); } assert(l.size == l.capacity); @@ -638,7 +653,7 @@ size_t OnDiskInvertedLists::merge_from( size_t OnDiskInvertedLists::merge_from_1( const InvertedLists* ils, bool verbose) { - return merge_from(&ils, 1, verbose); + return merge_from_multiple(&ils, 1, verbose); } void OnDiskInvertedLists::crop_invlists(size_t l0, size_t l1) { diff --git a/faiss/invlists/OnDiskInvertedLists.h b/faiss/invlists/OnDiskInvertedLists.h index 98cb653a7a..01c7f3481e 100644 --- a/faiss/invlists/OnDiskInvertedLists.h +++ b/faiss/invlists/OnDiskInvertedLists.h @@ -101,9 +101,10 @@ struct OnDiskInvertedLists : InvertedLists { // copy all inverted lists into *this, in compact form (without // allocating slots) - size_t merge_from( + size_t merge_from_multiple( const InvertedLists** ils, int n_il, + bool shift_ids = false, bool verbose = false); /// same as merge_from for a single invlist diff --git a/tests/test_contrib.py b/tests/test_contrib.py index 84b90a4e5f..0e7cbbfb03 100644 --- a/tests/test_contrib.py +++ b/tests/test_contrib.py @@ -9,6 +9,7 @@ import platform import os import random +import shutil import tempfile from faiss.contrib import datasets @@ -17,15 +18,13 @@ from faiss.contrib import ivf_tools from faiss.contrib import clustering from faiss.contrib import big_batch_search +from faiss.contrib.ondisk import merge_ondisk from common_faiss_tests import get_dataset_2 -try: - from faiss.contrib.exhaustive_search import \ - knn_ground_truth, knn, range_ground_truth, \ - range_search_max_results, exponential_query_iterator -except: - pass # Submodule import broken in python 2. - +from faiss.contrib.exhaustive_search import \ + knn_ground_truth, knn, range_ground_truth, \ + range_search_max_results, exponential_query_iterator +from contextlib import contextmanager @unittest.skipIf(platform.python_version_tuple()[0] < '3', 'Submodule import broken in python 2.') @@ -674,3 +673,63 @@ def test_code_set(self): np.testing.assert_equal( np.sort(np.unique(codes, axis=0), axis=None), np.sort(codes[inserted], axis=None)) + + +@unittest.skipIf(platform.system() == 'Windows', + 'OnDiskInvertedLists is unsupported on Windows.') +class TestMerge(unittest.TestCase): + @contextmanager + def temp_directory(self): + temp_dir = tempfile.mkdtemp() + try: + yield temp_dir + finally: + shutil.rmtree(temp_dir) + + def do_test_ondisk_merge(self, shift_ids=False): + with self.temp_directory() as tmpdir: + # only train and add index to disk without adding elements. + # this will create empty inverted lists. + ds = datasets.SyntheticDataset(32, 2000, 200, 20) + index = faiss.index_factory(ds.d, "IVF32,Flat") + index.train(ds.get_train()) + faiss.write_index(index, tmpdir + "/trained.index") + + # create 4 shards and add elements to them + ns = 4 # number of shards + + for bno in range(ns): + index = faiss.read_index(tmpdir + "/trained.index") + i0, i1 = int(bno * ds.nb / ns), int((bno + 1) * ds.nb / ns) + if shift_ids: + index.add_with_ids(ds.xb[i0:i1], np.arange(0, ds.nb / ns)) + else: + index.add_with_ids(ds.xb[i0:i1], np.arange(i0, i1)) + faiss.write_index(index, tmpdir + "/block_%d.index" % bno) + + # construct the output index and merge them on disk + index = faiss.read_index(tmpdir + "/trained.index") + block_fnames = [tmpdir + "/block_%d.index" % bno for bno in range(4)] + + merge_ondisk( + index, block_fnames, tmpdir + "/merged_index.ivfdata", shift_ids + ) + faiss.write_index(index, tmpdir + "/populated.index") + + # perform a search from index on disk + index = faiss.read_index(tmpdir + "/populated.index") + index.nprobe = 5 + D, I = index.search(ds.xq, 5) + + # ground-truth + gtI = ds.get_groundtruth(5) + + recall_at_1 = (I[:, :1] == gtI[:, :1]).sum() / float(ds.xq.shape[0]) + self.assertGreaterEqual(recall_at_1, 0.5) + + def test_ondisk_merge(self): + self.do_test_ondisk_merge() + + def test_ondisk_merge_with_shift_ids(self): + # verified that recall is same for test_ondisk_merge and + self.do_test_ondisk_merge(True) diff --git a/tests/test_merge.cpp b/tests/test_merge.cpp index 5a1d08cfba..edbe2a03a6 100644 --- a/tests/test_merge.cpp +++ b/tests/test_merge.cpp @@ -32,6 +32,7 @@ size_t nq = 100; int nindex = 4; int k = 10; int nlist = 40; +int shard_size = nb / nindex; struct CommonData { std::vector database; @@ -100,7 +101,7 @@ int compare_merged( auto il = new faiss::OnDiskInvertedLists( index0->nlist, index0->code_size, filename.c_str()); - il->merge_from(lists.data(), lists.size()); + il->merge_from_multiple(lists.data(), lists.size(), shift_ids); index0->replace_invlists(il, true); index0->ntotal = ntotal; @@ -110,11 +111,14 @@ int compare_merged( nq, cd.queries.data(), k, newD.data(), newI.data()); size_t ndiff = 0; + bool adjust_ids = shift_ids && !standard_merge; for (size_t i = 0; i < k * nq; i++) { - if (refI[i] != newI[i]) { + idx_t new_id = adjust_ids ? refI[i] % shard_size : refI[i]; + if (refI[i] != new_id) { ndiff++; } } + return ndiff; } @@ -220,3 +224,23 @@ TEST(MERGE, merge_flat_ondisk_2) { int ndiff = compare_merged(&index_shards, false, false); EXPECT_GE(0, ndiff); } + +// now use ondisk specific merge and use shift ids +TEST(MERGE, merge_flat_ondisk_3) { + faiss::IndexShards index_shards(d, false, false); + index_shards.own_indices = true; + + std::vector ids; + for (int i = 0; i < nb; ++i) { + int id = i % shard_size; + ids.push_back(id); + } + for (int i = 0; i < nindex; i++) { + index_shards.add_shard( + new faiss::IndexIVFFlat(&cd.quantizer, d, nlist)); + } + EXPECT_TRUE(index_shards.is_trained); + index_shards.add_with_ids(nb, cd.database.data(), ids.data()); + int ndiff = compare_merged(&index_shards, true, false); + EXPECT_GE(0, ndiff); +}