diff --git a/include/tatami_chunked/OracleSlabCache.hpp b/include/tatami_chunked/OracleSlabCache.hpp index 06d4ad1..6154231 100644 --- a/include/tatami_chunked/OracleSlabCache.hpp +++ b/include/tatami_chunked/OracleSlabCache.hpp @@ -135,6 +135,10 @@ class OracleSlabCache { future_cache[a] = cIt->second; ++cIt; } + to_reassign.clear(); + + populate(to_populate); + to_populate.clear(); // We always fill future_cache to the brim so every entry of // all_slabs should be referenced by a pointer in future_cache. @@ -145,12 +149,7 @@ class OracleSlabCache { // worrying about leaking memory. The only exception is if we're at // the end of the predictions, in which case it doesn't matter. current_cache.clear(); - to_reassign.clear(); - - populate(to_populate); - current_cache.swap(future_cache); - to_populate.clear(); } // We know it must exist, so no need to check ccIt's validity. diff --git a/include/tatami_chunked/SubsettedOracleSlabCache.hpp b/include/tatami_chunked/SubsettedOracleSlabCache.hpp index ab160f0..e613acd 100644 --- a/include/tatami_chunked/SubsettedOracleSlabCache.hpp +++ b/include/tatami_chunked/SubsettedOracleSlabCache.hpp @@ -36,11 +36,6 @@ enum class SubsetSelection : char { FULL, BLOCK, INDEX }; */ template class SubsettedOracleSlabCache { - std::shared_ptr > oracle; - size_t max_predictions; - size_t max_slabs; - size_t counter = 0; - public: /** * @brief Details on the subset to extract. @@ -138,85 +133,53 @@ class SubsettedOracleSlabCache { } } } - - void swap(SubsetDetails& x) { - std::swap(selection, x.selection); - std::swap(block_start, x.block_start); - std::swap(block_end, x.block_end); - std::swap(block_length, x.block_length); - indices.swap(x.indices); - mapping.swap(x.mapping); - } - /** - * @endcond - */ - }; - - /** - * @brief A cached slab. - */ - struct CachedSlab { - /** - * Contents of the slab. - */ - Slab_ contents; - - /** - * Subset of rows/columns to extract from this slab. - */ - SubsetDetails subset; - - public: - /** - * @cond - */ - CachedSlab(Slab_ c) : contents(std::move(c)) {} /** * @endcond */ }; private: - std::list slab_cache, tmp_cache, free_cache; + std::shared_ptr > oracle; + size_t total; + size_t counter = 0; - typedef typename std::list::iterator cache_iterator; - std::unordered_map > slab_exists, past_exists; + Index_ last_slab_id = 0; + CachedSlab* last_slab = NULL; - std::vector > predictions_made, next_predictions_made; - size_t predictions_fulfilled = 0; + size_t max_slabs; + std::vector all_slabs; + std::unordered_map current_cache, future_cache; - std::vector slab_pointers, next_slab_pointers; - std::vector > unassigned_slabs; - std::vector > slabs_to_populate, next_slabs_to_populate; + std::vector all_subset_details; + std::vector free_subset_details; + std::unordered_map close_future_subset_cache, far_future_subset_cache; + size_t close_refresh_point = 0; + size_t far_refresh_point = 0; + Id_ far_slab_id; + Index_ far_slab_offset; - std::vector next_subset; + std::vector > to_reassign; + std::vector > to_populate; public: /** * @param ora Pointer to an `tatami::Oracle` to be used for predictions. - * @param per_iteration Maximum number of predictions to make per iteration. * @param num_slabs Maximum number of slabs to store. */ - SubsettedOracleSlabCache(std::shared_ptr > ora, [[maybe_unused]] size_t per_iteration, size_t num_slabs) : + SubsettedOracleSlabCache(std::shared_ptr > ora, size_t num_slabs) : oracle(std::move(ora)), - max_predictions(oracle->total()), + total(oracle->total()), max_slabs(num_slabs) { - slab_exists.reserve(max_slabs); - past_exists.reserve(max_slabs); - - predictions_made.reserve(max_predictions); - next_predictions_made.reserve(max_predictions); - - slab_pointers.reserve(max_slabs); - next_slab_pointers.reserve(max_slabs); - - unassigned_slabs.reserve(max_slabs); - - slabs_to_populate.reserve(max_slabs); - next_slabs_to_populate.reserve(max_slabs); - - next_subset.resize(max_slabs); + all_slabs.reserve(max_slabs); + current_cache.reserve(max_slabs); + future_cache.reserve(max_slabs); + far_future_subset_cache.reserve(max_slabs); + + all_subset_details.reserve(max_slabs * 2); + for (auto& as : all_subset_details) { + free_subset_details.push_back(&as); + } } /** @@ -228,12 +191,6 @@ class SubsettedOracleSlabCache { * @endcond */ -private: - std::pair fetch(size_t i) const { - const auto& current = predictions_made[i]; - return std::pair(slab_pointers[current.first], current.second); - } - public: /** * This method is intended to be called when `num_slabs = 0`, to provide callers with the oracle predictions for non-cached extraction of data. @@ -260,183 +217,152 @@ class SubsettedOracleSlabCache { * For example, if each chunk takes up 10 rows, attempting to access row 21 would require retrieval of slab 2 and an offset of 1. * @param create Function that accepts no arguments and returns a `Slab_` object with sufficient memory to hold a slab's contents when used in `populate()`. * This may also return a default-constructed `Slab_` object if the allocation is done dynamically per slab in `populate()`. - * @param populate Function that accepts two arguments, `slabs_in_need` and `slab_data`. - * (1) `slabs_in_need` is a `const std::vector >&` specifying the slabs to be populated. + * @param populate Function that accepts a `std::vector >&` specifying the slabs to be populated. * The first `Id_` element of each pair contains the slab identifier, i.e., the first element returned by the `identify` function. - * The second `Index_` element specifies the index in `slab_data` in which to store the contents of each slab. - * (2) `slab_data` is a `std::vector&` containing pointers to the cached slab contents to be populated. - * This function should iterate over the `slabs_in_need` and populate the corresponding entries in `slab_data`, - * possibly using information in `CachedSlab::subset` to extract only the desired subset of each slab. + * The second `Slab_*` element specifies the object which to store the contents of each slab. + * The thid `SubsetDetails_*` element contains information about the subset of each slab that is required. + * This function should iterate over the vector and populate the desired subset of each slab. + * Note that the vector is not guaranteed to be sorted. * * @return Pair containing (1) a pointer to a cached slab and (2) the index of the next predicted row/column inside the retrieved slab. */ template std::pair next(Ifunction_ identify, Cfunction_ create, Pfunction_ populate) { - if (predictions_made.size() > predictions_fulfilled) { - return fetch(predictions_fulfilled++); + Index_ index = this->next(); + auto slab_info = identify(index); + if (slab_info.first == last_slab_id && last_slab) { + return std::make_pair(last_slab, slab_info.second); } - - if (!next_predictions_made.empty()) { - predictions_made.swap(next_predictions_made); - next_predictions_made.clear(); - - slab_pointers.swap(next_slab_pointers); - next_slab_pointers.clear(); - - slabs_to_populate.swap(next_slabs_to_populate); - next_slabs_to_populate.clear(); - - // Creating slabs needed for the current prediction round, - // based on the predictions made in the last round. - while (!unassigned_slabs.empty()) { - cache_iterator it; - if (!tmp_cache.empty()) { - it = tmp_cache.begin(); - slab_cache.splice(slab_cache.end(), tmp_cache, it); - } else { - slab_cache.emplace_back(create()); - it = slab_cache.end(); - --it; + last_slab_id = slab_info.first; + + // Updating the cache if we hit the refresh point. + if (counter - 1 == close_refresh_point) { + if (close_future_subset_cache.empty()) { + // This section only runs once, at the start, to populate the close_future_slab. + requisition_subset_close(slab_info.first, slab_info.second); + size_t used_slabs = 1; + + while (++close_refresh_point < total) { + auto future_index = oracle->get(close_refresh_point); + auto future_slab_info = identify(future_index); + auto cfcIt = close_future_subset_cache.find(future_slab_info.first); + if (cfcIt != close_future_subset_cache.end()) { + cfcIt->add(future_slab_info.first); + } else if (used_slabs < max_slabs) { + requisition_subset_close(future_slab_info.first, future_slab_info.second); + ++used_slabs; + } else { + far_slab_id = future_slab_info.first; + far_slab_offset = future_slab_info.second; + break; + } } - auto& last = unassigned_slabs.back(); - slab_pointers[last.first] = &(*it); - *(last.second) = it; // Remember this is a pointer to an iterator, so this assignment changes the iterator in the map. - - unassigned_slabs.pop_back(); - } - - while (!tmp_cache.empty()) { - free_cache.splice(free_cache.end(), tmp_cache, tmp_cache.begin()); + far_refresh_point = close_refresh_point; + } else { + close_refresh_point = far_refresh_point; } - // Updating subsets for all to-be-populated slabs. - for (const auto& x : slabs_to_populate) { - next_subset[x.second].swap(slab_pointers[x.second]->subset); + // Populating the far future cache. + { + requisition_subset_far(far_slab_id, far_slab_offset); + size_t used_slabs = 1; + + while (++far_refresh_point < total) { + auto future_index = oracle->get(far_refresh_point); + auto future_slab_info = identify(future_index); + auto ffcIt = far_future_subset_cache.find(future_slab_info.first); + if (ffcIt != far_future_subset_cache.end()) { + ffcIt->add(future_slab_info.first); + } else if (used_slabs < max_slabs) { + requisition_subset_far(future_slab_info.first, future_slab_info.second); + ++used_slabs; + } else { + far_slab_id = future_slab_info.first; + far_slab_offset = future_slab_info.second; + break; + } + } } - } else { - // This is the first run, so we can freely allocate here. - size_t used = 0; - while (counter < max_predictions) { - Index_ current = next(); - - auto slab_id = identify(current); - auto curslab = slab_id.first; - auto curindex = slab_id.second; - - auto it = slab_exists.find(curslab); - if (it != slab_exists.end()) { - predictions_made.emplace_back((it->second).first, curindex); - (it->second).second->subset.add(curindex); - - } else if (used < max_slabs) { - slab_cache.push_back(CachedSlab(create())); - auto sIt = slab_cache.end(); - --sIt; - - slab_exists[curslab] = std::make_pair(used, sIt); - slabs_to_populate.emplace_back(curslab, used); - slab_pointers.push_back(&(*sIt)); - sIt->subset.set(curindex); - - predictions_made.emplace_back(used, curindex); - ++used; - + // Reusing slabs from current_cache; these should all have FULL selections already. + for (auto& cf : close_future_subset_cache) { + auto cIt = current_cache.find(cf.first); + if (cIt == current_cache.end()) { + to_reassign.emplace_back(cIt->first, cIt->second); } else { - --counter; - break; + future_cache[cIt->first] = cIt->second; + to_populate.emplace_back(cIt->first, cIt->second, cf.second); + current_cache.erase(cIt); } } - } - - // Now filling up the next round of predictions. Note that this will - // change the std::list in which each iterator belongs, along with - // the various *_exists maps. This is okay as the list iterators and - // maps are no longer needed for the _current_ prediction round. - // Only 'slab_pointers' and 'slabs_to_populate' are needed, and these - // are untouched by the fiddling for the next iteration round. - size_t used = 0; - - tmp_cache.swap(slab_cache); - past_exists.swap(slab_exists); - slab_exists.clear(); - - while (counter < max_predictions) { - Index_ current = next(); - - auto slab_id = identify(current); - auto curslab = slab_id.first; - auto curindex = slab_id.second; - - auto it = slab_exists.find(curslab); - if (it != slab_exists.end()) { - auto pos = (it->second).first; - next_predictions_made.emplace_back(pos, curindex); - next_subset[pos].add(curindex); - - } else if (used < max_slabs) { - auto past = past_exists.find(curslab); - if (past != past_exists.end()) { - auto sIt = (past->second).second; - slab_cache.splice(slab_cache.end(), tmp_cache, sIt); - next_slab_pointers.push_back(&(*sIt)); - slab_exists[curslab] = std::make_pair(used, sIt); - - // If we detect that a slab is used in the current and next - // prediction rounds, we need to set its subset to FULL - // because we don't know whether future iteration rounds - // might need even more indices from this slab. The only - // way to ensure that this slab is re-usable in subsequent - // rounds is to extract it in its entirety. - sIt->subset.selection = SubsetSelection::FULL; + // Creating new slabs for everything that's left. + auto cIt = current_cache.begin(); + for (auto a : to_reassign) { + Slab_* slab_ptr; + if (cIt == current_cache.end()) { + all_slabs.emplace_back(create()); + slab_ptr = &(all_slabs.back()); } else { - if (free_cache.empty()) { - // We might be able to recycle an existing slab from tmp_cache - // to populate 'curslab'... but we don't know if we can do so at - // this moment, as those slabs might be needed by later predictions. - // So we just defer the creation of a new slab until we've run - // through the set of predictions for this round. - auto ins = slab_exists.insert(std::make_pair(curslab, std::make_pair(used, slab_cache.end()))); - unassigned_slabs.emplace_back(used, &(ins.first->second.second)); - next_slab_pointers.push_back(NULL); - next_subset[used].set(curindex); - - } else { - auto sIt = free_cache.begin(); - slab_cache.splice(slab_cache.end(), free_cache, sIt); - next_slab_pointers.push_back(&(*sIt)); - slab_exists[curslab] = std::make_pair(used, sIt); - next_subset[used].set(curindex); - } - - next_slabs_to_populate.emplace_back(curslab, used); + slab_ptr = cIt->second; } - - next_predictions_made.emplace_back(used, curindex); - ++used; - - } else { - --counter; - break; + future_cache[a.first] = slab_ptr; + to_populate.emplace_back(a.first, slab_ptr, a.second); } - } - - // Only populating after the next round of predictions. This is necessary - // to ensure that subsequent prediction rounds don't need the current slabs; - // if they don't, we can just extract the specified subset, otherwise we - // have to extract the FULL slab to account for potential future use. - if (!slabs_to_populate.empty()) { - for (auto& x : slabs_to_populate) { - slab_pointers[x.second]->subset.finalize(); + to_reassign.clear(); + + populate(to_populate); + to_populate.clear(); + + // We always fill future_cache to the brim so every entry of + // all_slabs should be referenced by a pointer in future_cache. + // There shouldn't be any free cache entries remaining in + // current_cache i.e., at this point, cIt should equal + // current_cache.end(), as we transferred everything to + // future_cache. Thus it is safe to clear current_cache without + // worrying about leaking memory. The only exception is if we're at + // the end of the predictions, in which case it doesn't matter. + current_cache.clear(); + current_cache.swap(future_cache); + + // Putting the no-longer-used subset pointers back in the free pool + // before we swap the close and far futures. + for (auto& cfc : close_future_subset_cache) { + free_subset_cache.push_back(cfc.second); } - populate(slabs_to_populate, slab_pointers); + close_future_subset_cache.clear(); + close_future_subset_cache.swap(far_future_subset_cache); } - // Well, because we just used one. - predictions_fulfilled = 1; - return fetch(0); + // We know it must exist, so no need to check ccIt's validity. + auto ccIt = current_cache.find(slab_info.first); + last_slab = ccIt->second; + return std::make_pair(last_slab, slab_info.second); + } + +private: + void requisition_subset_close(Id_ slab_id, Index_ slab_offset) { + auto selected = free_subset_cache.back(); + selected->set(slab_offset); + close_future_slab[id] = selected; + free_subset_cache.pop_back(); + } + + void requisition_subset_far(Id_ slab_id, Index_ slab_offset) { + auto selected = free_subset_cache.back(); + selected->set(slab_offset); + far_future_slab[id] = selected; + free_subset_cache.pop_back(); + + // If a slab is still being used in the far future, it might continue + // to be used in an even further future, in which case we need to do a + // FULL extraction just to be safe. + auto cfcIt = close_future_subset_cache.find(id); + if (close_future_subset_cache.find(id) != close_future_subset_cache.end()) { + selected->selection = SubsetSelection::FULL; + cfcIt->second->selection = SubsetSelection::FULL; + } } };