Skip to content

Commit

Permalink
Fix error happening when online compaction kicks in on async commits
Browse files Browse the repository at this point in the history
  • Loading branch information
jedelbo committed Sep 8, 2023
1 parent 9d6d486 commit 1ffa87c
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Running a query on @keys in a Dictionary would throw an exception ([#6831](https://github.com/realm/realm-core/issues/6831), since v13.15.1)
* Change JSON selialization format back to follow ISO 8601 - and add output of nanoseconds ([#6855](https://github.com/realm/realm-core/issues/6855), since 13.17.0)
* Testing the size of a collection of links against zero would sometimes fail (sometimes = "difficult to explain"). In particular: ([#6850](https://github.com/realm/realm-core/issues/6850), since v13.15.1)
* Fixed crash in slab allocator (Assertion failed: ref + size <= next->first) Many issues like ([#6340](https://github.com/realm/realm-core/issues/6340), since 13.0.0)

### Breaking changes
* None.
Expand Down
1 change: 1 addition & 0 deletions src/realm/alloc_slab.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ class SlabAlloc : public Allocator {
friend class DB;
friend class Group;
friend class GroupWriter;
friend class GroupComitter;
};


Expand Down
1 change: 1 addition & 0 deletions src/realm/group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ class Group : public ArrayParent {

friend class Table;
friend class GroupWriter;
friend class GroupComitter;
friend class DB;
friend class _impl::GroupFriend;
friend class metrics::QueryInfo;
Expand Down
52 changes: 29 additions & 23 deletions src/realm/group_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class InMemoryWriter : public _impl::ArrayWriterBase {


// Class controlling a memory mapped window into a file
class GroupWriter::MapWindow {
class GroupComitter::MapWindow {
public:
MapWindow(size_t alignment, util::File& f, ref_type start_ref, size_t initial_size,
util::WriteMarker* write_marker = nullptr);
Expand Down Expand Up @@ -101,7 +101,7 @@ class GroupWriter::MapWindow {
};

// True if a requested block fall within a memory mapping.
bool GroupWriter::MapWindow::matches(ref_type start_ref, size_t size)
bool GroupComitter::MapWindow::matches(ref_type start_ref, size_t size)
{
if (start_ref < m_base_ref)
return false;
Expand All @@ -117,14 +117,14 @@ bool GroupWriter::MapWindow::matches(ref_type start_ref, size_t size)
//
// In cases where a 1MB window would stretch beyond the end of the file, we choose
// a smaller window. Anything mapped after the end of file would be undefined anyways.
ref_type GroupWriter::MapWindow::aligned_to_mmap_block(ref_type start_ref)
ref_type GroupComitter::MapWindow::aligned_to_mmap_block(ref_type start_ref)
{
// align to 1MB boundary
size_t page_mask = m_alignment - 1;
return start_ref & ~page_mask;
}

size_t GroupWriter::MapWindow::get_window_size(util::File& f, ref_type start_ref, size_t size)
size_t GroupComitter::MapWindow::get_window_size(util::File& f, ref_type start_ref, size_t size)
{
size_t window_size = start_ref + size - m_base_ref;
// always map at least to match alignment
Expand All @@ -146,7 +146,7 @@ size_t GroupWriter::MapWindow::get_window_size(util::File& f, ref_type start_ref
//
// extends_to_match() will extend an existing mapping to accomodate a new request if possible
// and return true. If the request falls in a different 1MB window, it'll return false.
bool GroupWriter::MapWindow::extends_to_match(util::File& f, ref_type start_ref, size_t size)
bool GroupComitter::MapWindow::extends_to_match(util::File& f, ref_type start_ref, size_t size)
{
size_t aligned_ref = aligned_to_mmap_block(start_ref);
if (aligned_ref != m_base_ref)
Expand All @@ -158,8 +158,8 @@ bool GroupWriter::MapWindow::extends_to_match(util::File& f, ref_type start_ref,
return true;
}

GroupWriter::MapWindow::MapWindow(size_t alignment, util::File& f, ref_type start_ref, size_t size,
util::WriteMarker* write_marker)
GroupComitter::MapWindow::MapWindow(size_t alignment, util::File& f, ref_type start_ref, size_t size,
util::WriteMarker* write_marker)
: m_alignment(alignment)
{
m_base_ref = aligned_to_mmap_block(start_ref);
Expand All @@ -173,47 +173,43 @@ GroupWriter::MapWindow::MapWindow(size_t alignment, util::File& f, ref_type star
#endif
}

GroupWriter::MapWindow::~MapWindow()
GroupComitter::MapWindow::~MapWindow()
{
m_map.sync();
m_map.unmap();
}

void GroupWriter::MapWindow::flush()
void GroupComitter::MapWindow::flush()
{
m_map.flush();
}

void GroupWriter::MapWindow::sync()
void GroupComitter::MapWindow::sync()
{
flush();
m_map.sync();
}

char* GroupWriter::MapWindow::translate(ref_type ref)
char* GroupComitter::MapWindow::translate(ref_type ref)
{
return m_map.get_addr() + (ref - m_base_ref);
}

void GroupWriter::MapWindow::encryption_read_barrier(void* start_addr, size_t size)
void GroupComitter::MapWindow::encryption_read_barrier(void* start_addr, size_t size)
{
realm::util::encryption_read_barrier_for_write(start_addr, size, m_map.get_encrypted_mapping());
}

void GroupWriter::MapWindow::encryption_write_barrier(void* start_addr, size_t size)
void GroupComitter::MapWindow::encryption_write_barrier(void* start_addr, size_t size)
{
realm::util::encryption_write_barrier(start_addr, size, m_map.get_encrypted_mapping());
}


GroupWriter::GroupWriter(Group& group, Durability dura, WriteMarker* write_marker)
GroupComitter::GroupComitter(Group& group, Durability dura, WriteMarker* write_marker)
: m_group(group)
, m_alloc(group.m_alloc)
, m_free_positions(m_alloc)
, m_free_lengths(m_alloc)
, m_free_versions(m_alloc)
, m_durability(dura)
, m_write_marker(write_marker)
, m_durability(dura)
{
m_map_windows.reserve(num_map_windows);
#if REALM_PLATFORM_APPLE && REALM_MOBILE
Expand All @@ -235,6 +231,16 @@ GroupWriter::GroupWriter(Group& group, Durability dura, WriteMarker* write_marke
m_window_alignment = wanted_size;
}
#endif
}

GroupComitter::~GroupComitter() = default;

GroupWriter::GroupWriter(Group& group, Durability dura, WriteMarker* write_marker)
: GroupComitter(group, dura, write_marker)
, m_free_positions(m_alloc)
, m_free_lengths(m_alloc)
, m_free_versions(m_alloc)
{
Array& top = m_group.m_top;
m_logical_size = size_t(top.get_as_ref_or_tagged(Group::s_file_size_ndx).get_as_int());

Expand Down Expand Up @@ -332,14 +338,14 @@ size_t GroupWriter::get_file_size() const noexcept
return sz;
}

void GroupWriter::flush_all_mappings()
void GroupComitter::flush_all_mappings()
{
for (const auto& window : m_map_windows) {
window->flush();
}
}

void GroupWriter::sync_all_mappings()
void GroupComitter::sync_all_mappings()
{
if (m_durability == Durability::Unsafe)
return;
Expand All @@ -352,7 +358,7 @@ void GroupWriter::sync_all_mappings()
// existing one (possibly extended to accomodate the new request). Maintain a
// cache of open windows which are sync'ed and closed following a least recently
// used policy. Entries in the cache are kept in MRU order.
GroupWriter::MapWindow* GroupWriter::get_window(ref_type start_ref, size_t size)
GroupComitter::MapWindow* GroupComitter::get_window(ref_type start_ref, size_t size)
{
auto match = std::find_if(m_map_windows.begin(), m_map_windows.end(), [&](const auto& window) {
return window->matches(start_ref, size) || window->extends_to_match(m_alloc.get_file(), start_ref, size);
Expand Down Expand Up @@ -1335,7 +1341,7 @@ void GroupWriter::write_array_at(T* translator, ref_type ref, const char* data,
}


void GroupWriter::commit(ref_type new_top_ref)
void GroupComitter::commit(ref_type new_top_ref)
{
using _impl::SimulatedFailure;
SimulatedFailure::trigger(SimulatedFailure::group_writer__commit); // Throws
Expand Down
70 changes: 38 additions & 32 deletions src/realm/group_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,46 @@ class VersionInfo {
using TopRefMap = std::map<uint64_t, VersionInfo>;
using VersionVector = std::vector<uint64_t>;

class GroupComitter {
public:
using Durability = DBOptions::Durability;
GroupComitter(Group&, Durability dura = Durability::Full, util::WriteMarker* write_marker = nullptr);
~GroupComitter();
/// Flush changes to physical medium, then write the new top ref
/// to the file header, then flush again. Pass the top ref
/// returned by write_group().
void commit(ref_type new_top_ref);
// Flush all cached memory mappings
// Sync all cached memory mappings to disk - includes flush if needed
void sync_all_mappings();
// Flush all cached memory mappings from private to shared cache.
void flush_all_mappings();

protected:
class MapWindow;
Group& m_group;
SlabAlloc& m_alloc;
// Currently cached memory mappings. We keep as many as 16 1MB windows
// open for writing. The allocator will favor sequential allocation
// from a modest number of windows, depending upon fragmentation, so
// 16 windows should be more than enough. If more than 16 windows are
// needed, the least recently used is sync'ed and closed to make room
// for a new one. The windows are kept in MRU (most recently used) order.
const static int num_map_windows = 16;
std::vector<std::unique_ptr<MapWindow>> m_map_windows;
size_t m_window_alignment;
util::WriteMarker* m_write_marker;
Durability m_durability;

// Get a suitable memory mapping for later access:
// potentially adding it to the cache, potentially closing
// the least recently used and sync'ing it to disk
MapWindow* get_window(ref_type start_ref, size_t size);
};

/// This class is not supposed to be reused for multiple write sessions. In
/// particular, do not reuse it in case any of the functions throw.
class GroupWriter : public _impl::ArrayWriterBase {
class GroupWriter : public GroupComitter, public _impl::ArrayWriterBase {
public:
enum class EvacuationStage { idle, evacuating, waiting, blocked };
// For groups in transactional mode (Group::m_is_shared), this constructor
Expand All @@ -71,7 +108,6 @@ class GroupWriter : public _impl::ArrayWriterBase {
// (Group::m_is_shared), the constructor also adds version tracking
// information to the group, if it is not already present (6th and 7th entry
// in Group::m_top).
using Durability = DBOptions::Durability;
GroupWriter(Group&, Durability dura = Durability::Full, util::WriteMarker* write_marker = nullptr);
~GroupWriter();

Expand All @@ -83,10 +119,6 @@ class GroupWriter : public _impl::ArrayWriterBase {
/// commit() with the returned top ref.
ref_type write_group();

/// Flush changes to physical medium, then write the new top ref
/// to the file header, then flush again. Pass the top ref
/// returned by write_group().
void commit(ref_type new_top_ref);

size_t get_file_size() const noexcept;

Expand Down Expand Up @@ -146,13 +178,6 @@ class GroupWriter : public _impl::ArrayWriterBase {
}
}


// Flush all cached memory mappings
// Sync all cached memory mappings to disk - includes flush if needed
void sync_all_mappings();
// Flush all cached memory mappings from private to shared cache.
void flush_all_mappings();

private:
friend class InMemoryWriter;
struct FreeSpaceEntry {
Expand All @@ -171,24 +196,18 @@ class GroupWriter : public _impl::ArrayWriterBase {
static void move_free_in_file_to_size_map(const std::vector<GroupWriter::FreeSpaceEntry>& list,
std::multimap<size_t, size_t>& size_map);

class MapWindow;
Group& m_group;
SlabAlloc& m_alloc;
Array m_free_positions; // 4th slot in Group::m_top
Array m_free_lengths; // 5th slot in Group::m_top
Array m_free_versions; // 6th slot in Group::m_top
uint64_t m_current_version = 0;
uint64_t m_oldest_reachable_version;
TopRefMap m_top_ref_map;
bool m_any_new_unreachables;
size_t m_window_alignment;
size_t m_free_space_size = 0;
size_t m_locked_space_size = 0;
size_t m_evacuation_limit;
int64_t m_backoff;
size_t m_logical_size = 0;
Durability m_durability;
util::WriteMarker* m_write_marker = nullptr;

// m_free_in_file;
std::vector<FreeSpaceEntry> m_not_free_in_file;
Expand All @@ -199,19 +218,6 @@ class GroupWriter : public _impl::ArrayWriterBase {

void read_in_freelist();
size_t recreate_freelist(size_t reserve_pos);
// Currently cached memory mappings. We keep as many as 16 1MB windows
// open for writing. The allocator will favor sequential allocation
// from a modest number of windows, depending upon fragmentation, so
// 16 windows should be more than enough. If more than 16 windows are
// needed, the least recently used is sync'ed and closed to make room
// for a new one. The windows are kept in MRU (most recently used) order.
const static int num_map_windows = 16;
std::vector<std::unique_ptr<MapWindow>> m_map_windows;

// Get a suitable memory mapping for later access:
// potentially adding it to the cache, potentially closing
// the least recently used and sync'ing it to disk
MapWindow* get_window(ref_type start_ref, size_t size);

/// Allocate a chunk of free space of the specified size. The
/// specified size must be 8-byte aligned. Extend the file if
Expand Down
2 changes: 1 addition & 1 deletion src/realm/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ void Transaction::complete_async_commit()
if (db->m_logger) {
db->m_logger->log(util::Logger::Level::trace, "Comitting ref %1 to disk", read_lock.m_top_ref);
}
GroupWriter out(*this);
GroupComitter out(*this);
out.commit(read_lock.m_top_ref); // Throws
// we must release the write mutex before the callback, because the callback
// is allowed to re-request it.
Expand Down
Loading

0 comments on commit 1ffa87c

Please sign in to comment.