Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error happening when online compaction kicks in on async commits #6962

Merged
merged 7 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Running a query on @keys in a Dictionary would throw an exception ([#6831](https://github.com/realm/realm-core/issues/6831), since v13.15.1)
* Change JSON selialization format back to follow ISO 8601 - and add output of nanoseconds ([#6855](https://github.com/realm/realm-core/issues/6855), since 13.17.0)
* Testing the size of a collection of links against zero would sometimes fail (sometimes = "difficult to explain"). In particular: ([#6850](https://github.com/realm/realm-core/issues/6850), since v13.15.1)
* Fixed crash in slab allocator (Assertion failed: ref + size <= next->first) Many issues like ([#6340](https://github.com/realm/realm-core/issues/6340), since 13.0.0)

### Breaking changes
* None.
Expand Down
1 change: 1 addition & 0 deletions src/realm/alloc_slab.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ class SlabAlloc : public Allocator {
friend class DB;
friend class Group;
friend class GroupWriter;
friend class GroupComitter;
};


Expand Down
17 changes: 14 additions & 3 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ void DB::open(const std::string& path, bool no_create_file, const DBOptions& opt
REALM_ASSERT(path.size());

m_db_path = path;
m_path_hash = StringData(path).hash() & 0xffff;
m_path_hash = (size_t(this) >> 4) & 0xffff;
Copy link
Member

@nicola-cab nicola-cab Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this line doing? It is not clear...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just hashing the file name was not good enough. If two DBs were opening the same file, they would get the same hash. Here we just take some bytes out of the object pointer. That should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make that into a comment :-)

Copy link
Member

@nicola-cab nicola-cab Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, maybe I am wrong, but it seems that this is basically an important part for fixing this issue. And it is really really difficult to make sense of this line just reading it, without any added context.
Can you please add a comment or move this logic for hashing the db object into some helper method or some class.
Also, m_path_hash is no longer a good name for this, since you are basically hashing the ptr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_path_hash is only used for the prefix of the logger. It was changed for debugging purposes so that the log output can be differentiated between DB instances. In that way it is not critical to the fix, but it helped find out what was happening. The essence of the fix is separation of GroupWriter and GroupCommitter, so that async commits can just make a commit without doing any of the online compaction.

set_logger(options.logger);
if (m_replication) {
m_replication->set_logger(m_logger.get());
Expand Down Expand Up @@ -2311,6 +2311,10 @@ bool DB::do_try_begin_write()

void DB::do_begin_write()
{
if (m_logger) {
m_logger->log(util::Logger::Level::trace, "acquire writemutex");
}

SharedInfo* info = m_info;

// Get write lock - the write lock is held until do_end_write().
Expand Down Expand Up @@ -2369,6 +2373,9 @@ void DB::do_begin_write()
// should take this situation into account by comparing with '>' instead of '!='
info->next_served = my_ticket;
finish_begin_write();
if (m_logger) {
m_logger->log(util::Logger::Level::trace, "writemutex acquired");
bmunkholm marked this conversation as resolved.
Show resolved Hide resolved
}
}

void DB::finish_begin_write()
Expand Down Expand Up @@ -2396,6 +2403,9 @@ void DB::do_end_write() noexcept
m_write_transaction_open = false;
m_pick_next_writer.notify_all();
m_writemutex.unlock();
if (m_logger) {
m_logger->log(util::Logger::Level::trace, "writemutex released");
}
}


Expand Down Expand Up @@ -2553,8 +2563,9 @@ void DB::low_level_commit(uint_fast64_t new_version, Transaction& transaction, b
}
auto t2 = std::chrono::steady_clock::now();
if (m_logger) {
m_logger->log(util::Logger::Level::debug, "Commit of size %1 done in %2 us", commit_size,
std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
std::string to_disk_str = commit_to_disk ? util::format(" ref %1", new_top_ref) : " (no commit to disk)";
m_logger->log(util::Logger::Level::debug, "Commit of size %1 done in %2 us%3", commit_size,
std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count(), to_disk_str);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/realm/group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ class Group : public ArrayParent {

friend class Table;
friend class GroupWriter;
friend class GroupComitter;
friend class DB;
friend class _impl::GroupFriend;
friend class metrics::QueryInfo;
Expand Down
52 changes: 29 additions & 23 deletions src/realm/group_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class InMemoryWriter : public _impl::ArrayWriterBase {


// Class controlling a memory mapped window into a file
class GroupWriter::MapWindow {
class GroupComitter::MapWindow {
public:
MapWindow(size_t alignment, util::File& f, ref_type start_ref, size_t initial_size,
util::WriteMarker* write_marker = nullptr);
Expand Down Expand Up @@ -101,7 +101,7 @@ class GroupWriter::MapWindow {
};

// True if a requested block fall within a memory mapping.
bool GroupWriter::MapWindow::matches(ref_type start_ref, size_t size)
bool GroupComitter::MapWindow::matches(ref_type start_ref, size_t size)
{
if (start_ref < m_base_ref)
return false;
Expand All @@ -117,14 +117,14 @@ bool GroupWriter::MapWindow::matches(ref_type start_ref, size_t size)
//
// In cases where a 1MB window would stretch beyond the end of the file, we choose
// a smaller window. Anything mapped after the end of file would be undefined anyways.
ref_type GroupWriter::MapWindow::aligned_to_mmap_block(ref_type start_ref)
ref_type GroupComitter::MapWindow::aligned_to_mmap_block(ref_type start_ref)
{
// align to 1MB boundary
size_t page_mask = m_alignment - 1;
return start_ref & ~page_mask;
}

size_t GroupWriter::MapWindow::get_window_size(util::File& f, ref_type start_ref, size_t size)
size_t GroupComitter::MapWindow::get_window_size(util::File& f, ref_type start_ref, size_t size)
{
size_t window_size = start_ref + size - m_base_ref;
// always map at least to match alignment
Expand All @@ -146,7 +146,7 @@ size_t GroupWriter::MapWindow::get_window_size(util::File& f, ref_type start_ref
//
// extends_to_match() will extend an existing mapping to accomodate a new request if possible
// and return true. If the request falls in a different 1MB window, it'll return false.
bool GroupWriter::MapWindow::extends_to_match(util::File& f, ref_type start_ref, size_t size)
bool GroupComitter::MapWindow::extends_to_match(util::File& f, ref_type start_ref, size_t size)
{
size_t aligned_ref = aligned_to_mmap_block(start_ref);
if (aligned_ref != m_base_ref)
Expand All @@ -158,8 +158,8 @@ bool GroupWriter::MapWindow::extends_to_match(util::File& f, ref_type start_ref,
return true;
}

GroupWriter::MapWindow::MapWindow(size_t alignment, util::File& f, ref_type start_ref, size_t size,
util::WriteMarker* write_marker)
GroupComitter::MapWindow::MapWindow(size_t alignment, util::File& f, ref_type start_ref, size_t size,
util::WriteMarker* write_marker)
: m_alignment(alignment)
{
m_base_ref = aligned_to_mmap_block(start_ref);
Expand All @@ -173,47 +173,43 @@ GroupWriter::MapWindow::MapWindow(size_t alignment, util::File& f, ref_type star
#endif
}

GroupWriter::MapWindow::~MapWindow()
GroupComitter::MapWindow::~MapWindow()
{
m_map.sync();
m_map.unmap();
}

void GroupWriter::MapWindow::flush()
void GroupComitter::MapWindow::flush()
{
m_map.flush();
}

void GroupWriter::MapWindow::sync()
void GroupComitter::MapWindow::sync()
{
flush();
m_map.sync();
}

char* GroupWriter::MapWindow::translate(ref_type ref)
char* GroupComitter::MapWindow::translate(ref_type ref)
{
return m_map.get_addr() + (ref - m_base_ref);
}

void GroupWriter::MapWindow::encryption_read_barrier(void* start_addr, size_t size)
void GroupComitter::MapWindow::encryption_read_barrier(void* start_addr, size_t size)
{
realm::util::encryption_read_barrier_for_write(start_addr, size, m_map.get_encrypted_mapping());
}

void GroupWriter::MapWindow::encryption_write_barrier(void* start_addr, size_t size)
void GroupComitter::MapWindow::encryption_write_barrier(void* start_addr, size_t size)
{
realm::util::encryption_write_barrier(start_addr, size, m_map.get_encrypted_mapping());
}


GroupWriter::GroupWriter(Group& group, Durability dura, WriteMarker* write_marker)
GroupComitter::GroupComitter(Group& group, Durability dura, WriteMarker* write_marker)
: m_group(group)
, m_alloc(group.m_alloc)
, m_free_positions(m_alloc)
, m_free_lengths(m_alloc)
, m_free_versions(m_alloc)
, m_durability(dura)
, m_write_marker(write_marker)
, m_durability(dura)
{
m_map_windows.reserve(num_map_windows);
#if REALM_PLATFORM_APPLE && REALM_MOBILE
Expand All @@ -235,6 +231,16 @@ GroupWriter::GroupWriter(Group& group, Durability dura, WriteMarker* write_marke
m_window_alignment = wanted_size;
}
#endif
}

GroupComitter::~GroupComitter() = default;

GroupWriter::GroupWriter(Group& group, Durability dura, WriteMarker* write_marker)
: GroupComitter(group, dura, write_marker)
, m_free_positions(m_alloc)
, m_free_lengths(m_alloc)
, m_free_versions(m_alloc)
{
Array& top = m_group.m_top;
m_logical_size = size_t(top.get_as_ref_or_tagged(Group::s_file_size_ndx).get_as_int());

Expand Down Expand Up @@ -332,14 +338,14 @@ size_t GroupWriter::get_file_size() const noexcept
return sz;
}

void GroupWriter::flush_all_mappings()
void GroupComitter::flush_all_mappings()
{
for (const auto& window : m_map_windows) {
window->flush();
}
}

void GroupWriter::sync_all_mappings()
void GroupComitter::sync_all_mappings()
{
if (m_durability == Durability::Unsafe)
return;
Expand All @@ -352,7 +358,7 @@ void GroupWriter::sync_all_mappings()
// existing one (possibly extended to accomodate the new request). Maintain a
// cache of open windows which are sync'ed and closed following a least recently
// used policy. Entries in the cache are kept in MRU order.
GroupWriter::MapWindow* GroupWriter::get_window(ref_type start_ref, size_t size)
GroupComitter::MapWindow* GroupComitter::get_window(ref_type start_ref, size_t size)
{
auto match = std::find_if(m_map_windows.begin(), m_map_windows.end(), [&](const auto& window) {
return window->matches(start_ref, size) || window->extends_to_match(m_alloc.get_file(), start_ref, size);
Expand Down Expand Up @@ -1335,7 +1341,7 @@ void GroupWriter::write_array_at(T* translator, ref_type ref, const char* data,
}


void GroupWriter::commit(ref_type new_top_ref)
void GroupComitter::commit(ref_type new_top_ref)
{
using _impl::SimulatedFailure;
SimulatedFailure::trigger(SimulatedFailure::group_writer__commit); // Throws
Expand Down
70 changes: 38 additions & 32 deletions src/realm/group_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,46 @@ class VersionInfo {
using TopRefMap = std::map<uint64_t, VersionInfo>;
using VersionVector = std::vector<uint64_t>;

class GroupComitter {
public:
using Durability = DBOptions::Durability;
GroupComitter(Group&, Durability dura = Durability::Full, util::WriteMarker* write_marker = nullptr);
~GroupComitter();
/// Flush changes to physical medium, then write the new top ref
/// to the file header, then flush again. Pass the top ref
/// returned by write_group().
void commit(ref_type new_top_ref);
// Flush all cached memory mappings
// Sync all cached memory mappings to disk - includes flush if needed
void sync_all_mappings();
// Flush all cached memory mappings from private to shared cache.
void flush_all_mappings();

protected:
class MapWindow;
Group& m_group;
SlabAlloc& m_alloc;
// Currently cached memory mappings. We keep as many as 16 1MB windows
// open for writing. The allocator will favor sequential allocation
// from a modest number of windows, depending upon fragmentation, so
// 16 windows should be more than enough. If more than 16 windows are
// needed, the least recently used is sync'ed and closed to make room
// for a new one. The windows are kept in MRU (most recently used) order.
const static int num_map_windows = 16;
std::vector<std::unique_ptr<MapWindow>> m_map_windows;
size_t m_window_alignment;
util::WriteMarker* m_write_marker;
Durability m_durability;

// Get a suitable memory mapping for later access:
// potentially adding it to the cache, potentially closing
// the least recently used and sync'ing it to disk
MapWindow* get_window(ref_type start_ref, size_t size);
};

/// This class is not supposed to be reused for multiple write sessions. In
/// particular, do not reuse it in case any of the functions throw.
class GroupWriter : public _impl::ArrayWriterBase {
class GroupWriter : public GroupComitter, public _impl::ArrayWriterBase {
public:
enum class EvacuationStage { idle, evacuating, waiting, blocked };
// For groups in transactional mode (Group::m_is_shared), this constructor
Expand All @@ -71,7 +108,6 @@ class GroupWriter : public _impl::ArrayWriterBase {
// (Group::m_is_shared), the constructor also adds version tracking
// information to the group, if it is not already present (6th and 7th entry
// in Group::m_top).
using Durability = DBOptions::Durability;
GroupWriter(Group&, Durability dura = Durability::Full, util::WriteMarker* write_marker = nullptr);
~GroupWriter();

Expand All @@ -83,10 +119,6 @@ class GroupWriter : public _impl::ArrayWriterBase {
/// commit() with the returned top ref.
ref_type write_group();

/// Flush changes to physical medium, then write the new top ref
/// to the file header, then flush again. Pass the top ref
/// returned by write_group().
void commit(ref_type new_top_ref);

size_t get_file_size() const noexcept;

Expand Down Expand Up @@ -146,13 +178,6 @@ class GroupWriter : public _impl::ArrayWriterBase {
}
}


// Flush all cached memory mappings
// Sync all cached memory mappings to disk - includes flush if needed
void sync_all_mappings();
// Flush all cached memory mappings from private to shared cache.
void flush_all_mappings();

private:
friend class InMemoryWriter;
struct FreeSpaceEntry {
Expand All @@ -171,24 +196,18 @@ class GroupWriter : public _impl::ArrayWriterBase {
static void move_free_in_file_to_size_map(const std::vector<GroupWriter::FreeSpaceEntry>& list,
std::multimap<size_t, size_t>& size_map);

class MapWindow;
Group& m_group;
SlabAlloc& m_alloc;
Array m_free_positions; // 4th slot in Group::m_top
Array m_free_lengths; // 5th slot in Group::m_top
Array m_free_versions; // 6th slot in Group::m_top
uint64_t m_current_version = 0;
uint64_t m_oldest_reachable_version;
TopRefMap m_top_ref_map;
bool m_any_new_unreachables;
size_t m_window_alignment;
size_t m_free_space_size = 0;
size_t m_locked_space_size = 0;
size_t m_evacuation_limit;
int64_t m_backoff;
size_t m_logical_size = 0;
Durability m_durability;
util::WriteMarker* m_write_marker = nullptr;

// m_free_in_file;
std::vector<FreeSpaceEntry> m_not_free_in_file;
Expand All @@ -199,19 +218,6 @@ class GroupWriter : public _impl::ArrayWriterBase {

void read_in_freelist();
size_t recreate_freelist(size_t reserve_pos);
// Currently cached memory mappings. We keep as many as 16 1MB windows
// open for writing. The allocator will favor sequential allocation
// from a modest number of windows, depending upon fragmentation, so
// 16 windows should be more than enough. If more than 16 windows are
// needed, the least recently used is sync'ed and closed to make room
// for a new one. The windows are kept in MRU (most recently used) order.
const static int num_map_windows = 16;
std::vector<std::unique_ptr<MapWindow>> m_map_windows;

// Get a suitable memory mapping for later access:
// potentially adding it to the cache, potentially closing
// the least recently used and sync'ing it to disk
MapWindow* get_window(ref_type start_ref, size_t size);

/// Allocate a chunk of free space of the specified size. The
/// specified size must be 8-byte aligned. Extend the file if
Expand Down
Loading