Skip to content

Commit

Permalink
Reduce the amount of work done while holding the lock in VersionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
tgoyne committed Feb 2, 2023
1 parent ab3d2e8 commit ff3a947
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 42 deletions.
110 changes: 72 additions & 38 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,18 +511,29 @@ class DB::VersionManager {
: m_file(file)
, m_mutex(mutex)
{
std::lock_guard lock(m_mutex);
size_t size = static_cast<size_t>(m_file.get_size());
m_reader_map.map(m_file, File::access_ReadWrite, size, File::map_NoSync);
m_info = m_reader_map.get_addr();
m_local_max_entry = m_info->readers.capacity();
REALM_ASSERT(sizeof(SharedInfo) + m_info->readers.compute_required_space(m_local_max_entry) == size);
size_t size = 0, required_size = sizeof(SharedInfo);
while (size < required_size) {
// Map the file without the lock held. This could result in the
// mapping being too small and having to remap if the file is grown
// concurrently, but if this is the case we should always see a bigger
// size the next time.
auto new_size = static_cast<size_t>(m_file.get_size());
REALM_ASSERT(new_size > size);
size = new_size;
m_reader_map.remap(m_file, File::access_ReadWrite, size, File::map_NoSync);
m_info = m_reader_map.get_addr();

std::lock_guard lock(m_mutex);
m_local_max_entry = m_info->readers.capacity();
required_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(m_local_max_entry);
REALM_ASSERT(required_size >= size);
}
}

void cleanup_versions(uint64_t& oldest_live_version, TopRefMap& top_refs, bool& any_new_unreachables)
{
std::lock_guard lock(m_mutex);
ensure_full_reader_mapping();
ensure_reader_mapping();
m_info->readers.purge_versions(oldest_live_version, top_refs, any_new_unreachables);
}

Expand All @@ -533,9 +544,25 @@ class DB::VersionManager {

VersionID get_version_id_of_latest_snapshot()
{
{
// First check the local cache. This is an unlocked read, so it may
// race with adding a new version. If this happens we'll either see
// a stale value (acceptable for a racing write on one thread and
// a read on another), or a new value which is guaranteed to not
// be an active index in the local cache.
std::lock_guard lock(m_local_mutex);
auto index = m_info->readers.newest.load();
if (index < m_local_readers.size()) {
auto& r = m_local_readers[index];
if (r.is_active()) {
return {r.version, index};
}
}
}

std::lock_guard lock(m_mutex);
ensure_full_reader_mapping();
uint_fast32_t index = m_info->readers.newest;
auto index = m_info->readers.newest.load();
ensure_reader_mapping(index);
return {m_info->readers.get(index).version, index};
}

Expand Down Expand Up @@ -569,42 +596,47 @@ class DB::VersionManager {
if (try_grab_local_read_lock(read_lock, type, version_id))
return read_lock;

const bool pick_specific = version_id.version != VersionID().version;

std::lock_guard lock(m_mutex);
auto newest = m_info->readers.newest.load();
REALM_ASSERT(newest != VersionList::nil);
read_lock.m_reader_idx = pick_specific ? version_id.index : newest;
ensure_full_reader_mapping();
bool picked_newest = read_lock.m_reader_idx == (unsigned)newest;
auto& r = m_info->readers.get(read_lock.m_reader_idx);
if (pick_specific && version_id.version != r.version)
throw BadVersion();
if (!picked_newest) {
if (type == ReadLockInfo::Frozen && r.count_frozen == 0 && r.count_live == 0)
throw BadVersion();
if (type != ReadLockInfo::Frozen && r.count_live == 0)
{
const bool pick_specific = version_id.version != VersionID().version;
std::lock_guard lock(m_mutex);
auto newest = m_info->readers.newest.load();
REALM_ASSERT(newest != VersionList::nil);
read_lock.m_reader_idx = pick_specific ? version_id.index : newest;
ensure_reader_mapping(read_lock.m_reader_idx);
bool picked_newest = read_lock.m_reader_idx == (unsigned)newest;
auto& r = m_info->readers.get(read_lock.m_reader_idx);
if (pick_specific && version_id.version != r.version)
throw BadVersion();
if (!picked_newest) {
if (type == ReadLockInfo::Frozen && r.count_frozen == 0 && r.count_live == 0)
throw BadVersion();
if (type != ReadLockInfo::Frozen && r.count_live == 0)
throw BadVersion();
}
populate_read_lock(read_lock, r, type);
}
populate_read_lock(read_lock, r, type);

std::lock_guard local_lock(m_local_mutex);
grow_local_cache(read_lock.m_reader_idx + 1);
auto& r2 = m_local_readers[read_lock.m_reader_idx];
if (!r2.is_active()) {
r2 = r;
r2.count_full = r2.count_live = r2.count_frozen = 0;
{
std::lock_guard local_lock(m_local_mutex);
grow_local_cache(read_lock.m_reader_idx + 1);
auto& r2 = m_local_readers[read_lock.m_reader_idx];
if (!r2.is_active()) {
r2.version = read_lock.m_version;
r2.filesize = read_lock.m_file_size;
r2.current_top = read_lock.m_top_ref;
r2.count_full = r2.count_live = r2.count_frozen = 0;
}
REALM_ASSERT(field_for_type(r2, type) == 0);
field_for_type(r2, type) = 1;
}
REALM_ASSERT(field_for_type(r2, type) == 0);
field_for_type(r2, type) = 1;

return read_lock;
}

void add_version(ref_type new_top_ref, size_t new_file_size, uint64_t new_version)
{
std::lock_guard lock(m_mutex);
ensure_full_reader_mapping();
ensure_reader_mapping();
if (m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version)) {
return;
}
Expand All @@ -628,15 +660,17 @@ class DB::VersionManager {
}

private:
void ensure_full_reader_mapping()
void ensure_reader_mapping(unsigned int required = -1)
{
using _impl::SimulatedFailure;
SimulatedFailure::trigger(SimulatedFailure::shared_group__grow_reader_mapping); // Throws

auto index = m_info->readers.capacity() - 1;
if (index >= m_local_max_entry) {
if (required < m_local_max_entry)
return;

auto new_max_entry = m_info->readers.capacity();
if (new_max_entry > m_local_max_entry) {
// handle mapping expansion if required
auto new_max_entry = m_info->readers.capacity();
size_t info_size = sizeof(DB::SharedInfo) + m_info->readers.compute_required_space(new_max_entry);
m_reader_map.remap(m_file, util::File::access_ReadWrite, info_size); // Throws
m_local_max_entry = new_max_entry;
Expand Down
8 changes: 4 additions & 4 deletions src/realm/util/file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,10 +787,10 @@ class File::Map : private MapBase {

/// See File::remap().
///
/// Calling this function on a Map instance that is not currently
/// attached to a memory mapped file has undefined behavior. The
/// returned pointer is the same as what will subsequently be
/// returned by get_addr().
/// Calling this function on a Map instance that is not currently attached
/// to a memory mapped file is equivalent to calling map(). The returned
/// pointer is the same as what will subsequently be returned by
/// get_addr().
T* remap(const File&, AccessMode = access_ReadOnly, size_t size = sizeof(T), int map_flags = 0);

/// Try to extend the existing mapping to a given size
Expand Down

0 comments on commit ff3a947

Please sign in to comment.