Skip to content

Commit

Permalink
Simplify transaction rollback notifications
Browse files Browse the repository at this point in the history
We don't actually need TransactReverser for anything other than KVO these days,
and the KVO parser can trivially do the required reversing itself. Sending
schema change notifications from a rolled back write transaction was actually
just incorrect to begin with, but was harmless other than being suboptimal.
  • Loading branch information
tgoyne committed Apr 27, 2023
1 parent c94b18e commit 8713b66
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 367 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* None.
* Improve performance of rolling back write transactions after making changes. If no KVO observers are used this is now constant time rather than taking time proportional to the number of changes to be rolled back. Rollbacks with KVO observers are 10-20% faster. ([PR #6513](https://github.com/realm/realm-core/pull/6513))

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
Expand All @@ -18,6 +18,8 @@

### Internals
* Clear out SubscriptionStore and cancel pending notifications upon rollback to PBS after client migration to FLX. ([#6389](https://github.com/realm/realm-core/issues/6389))
* Simplify the non-sync replication log by emitting the same instruction type for all three types of collections rather than different instructions per collection type. This has no functional effect but eliminates some duplicated code. ([PR #6513](https://github.com/realm/realm-core/pull/6513))
* Remove TransactionChangeInfo::track_all, which was only ever used by the global notifier. ([PR #6513](https://github.com/realm/realm-core/pull/6513))

----------------------------------------------

Expand Down Expand Up @@ -53,8 +55,6 @@

### Internals
* Allow to run multiprocess encryption tests outside building folder.
* Simplify the non-sync replication log by emitting the same instruction type for all three types of collections rather than different instructions per collection type. This has no functional effect but eliminates some duplicated code. ([PR #6513](https://github.com/realm/realm-core/pull/6513))
* Remove TransactionChangeInfo::track_all, which was only ever used by the global notifier. ([PR #6513](https://github.com/realm/realm-core/pull/6513))

----------------------------------------------

Expand Down
8 changes: 4 additions & 4 deletions src/realm/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,7 @@ void Group::refresh_dirty_accessors()
}


void Group::advance_transact(ref_type new_top_ref, util::NoCopyInputStream& in, bool writable)
void Group::advance_transact(ref_type new_top_ref, util::NoCopyInputStream* in, bool writable)
{
REALM_ASSERT(is_attached());
// Exception safety: If this function throws, the group accessor and all of
Expand Down Expand Up @@ -1488,10 +1488,10 @@ void Group::advance_transact(ref_type new_top_ref, util::NoCopyInputStream& in,
// This is no longer needed in Core, but we need to compute "schema_changed",
// for the benefit of ObjectStore.
bool schema_changed = false;
if (has_schema_change_notification_handler()) {
_impl::TransactLogParser parser; // Throws
if (in && has_schema_change_notification_handler()) {
TransactAdvancer advancer(*this, schema_changed);
parser.parse(in, advancer); // Throws
_impl::TransactLogParser parser; // Throws
parser.parse(*in, advancer); // Throws
}

m_top.detach(); // Soft detach
Expand Down
2 changes: 1 addition & 1 deletion src/realm/group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ class Group : public ArrayParent {
class TransactAdvancer;
/// Memory mappings must have been updated to reflect any growth in filesize before
/// calling advance_transact()
void advance_transact(ref_type new_top_ref, util::NoCopyInputStream&, bool writable);
void advance_transact(ref_type new_top_ref, util::NoCopyInputStream*, bool writable);
void refresh_dirty_accessors();
void flush_accessors_for_commit();

Expand Down
225 changes: 0 additions & 225 deletions src/realm/impl/transact_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
#include <realm/util/buffer.hpp>
#include <realm/util/input_stream.hpp>

#include <stdexcept>
#include <tuple>

namespace realm {

struct GlobalKey;
Expand Down Expand Up @@ -960,228 +957,6 @@ inline bool TransactLogParser::read_char(char& c)
return true;
}


class TransactReverser {
public:
bool select_table(TableKey key)
{
sync_table();
m_encoder.select_table(key);
m_pending_ts_instr = get_inst();
return true;
}

bool insert_group_level_table(TableKey table_key)
{
sync_table();
m_encoder.erase_class(table_key);
append_instruction();
return true;
}

bool erase_class(TableKey table_key)
{
sync_table();
m_encoder.insert_group_level_table(table_key);
append_instruction();
return true;
}

bool rename_class(TableKey)
{
sync_table();
return true;
}

bool create_object(ObjKey key)
{
m_encoder.remove_object(key); // Throws
append_instruction();
return true;
}

bool remove_object(ObjKey key)
{
m_encoder.create_object(key); // Throws
append_instruction();
return true;
}

bool modify_object(ColKey col_key, ObjKey key)
{
m_encoder.modify_object(col_key, key);
append_instruction();
return true;
}

bool list_set(size_t ndx)
{
m_encoder.list_set(ndx);
append_instruction();
return true;
}

bool list_insert(size_t ndx)
{
m_encoder.list_erase(ndx);
append_instruction();
return true;
}

bool set_link_type(ColKey key)
{
m_encoder.set_link_type(key);
return true;
}

bool insert_column(ColKey col_key)
{
m_encoder.erase_column(col_key);
append_instruction();
return true;
}

bool erase_column(ColKey col_key)
{
m_encoder.insert_column(col_key);
append_instruction();
return true;
}

bool rename_column(ColKey col_key)
{
m_encoder.rename_column(col_key);
return true;
}

bool select_collection(ColKey col_key, ObjKey key)
{
sync_list();
m_encoder.select_collection(col_key, key);
m_pending_ls_instr = get_inst();
return true;
}

bool list_move(size_t from_link_ndx, size_t to_link_ndx)
{
m_encoder.list_move(from_link_ndx, to_link_ndx);
append_instruction();
return true;
}

bool list_erase(size_t list_ndx)
{
m_encoder.list_insert(list_ndx);
append_instruction();
return true;
}

bool list_clear(size_t old_list_size)
{
// Append in reverse order because the reversed log is itself applied
// in reverse, and this way it generates all back-insertions rather than
// all front-insertions
for (size_t i = old_list_size; i > 0; --i) {
m_encoder.list_insert(i - 1);
append_instruction();
}
return true;
}

bool typed_link_change(ColKey col, TableKey dest)
{
m_encoder.typed_link_change(col, dest);
append_instruction();
return true;
}

private:
_impl::TransactLogBufferStream m_buffer;
_impl::TransactLogEncoder m_encoder{m_buffer};
struct Instr {
size_t begin;
size_t end;
};
std::vector<Instr> m_instructions;
size_t current_instr_start = 0;
Instr m_pending_ts_instr{0, 0};
Instr m_pending_ls_instr{0, 0};

Instr get_inst()
{
Instr instr;
instr.begin = current_instr_start;
current_instr_start = transact_log_size();
instr.end = current_instr_start;
return instr;
}

size_t transact_log_size() const
{
REALM_ASSERT_3(m_encoder.write_position(), >=, m_buffer.get_data());
return m_encoder.write_position() - m_buffer.get_data();
}

void append_instruction()
{
m_instructions.push_back(get_inst());
}

void append_instruction(Instr instr)
{
m_instructions.push_back(instr);
}

void sync_select(Instr& pending_instr)
{
if (pending_instr.begin != pending_instr.end) {
append_instruction(pending_instr);
pending_instr = {0, 0};
}
}

void sync_list()
{
sync_select(m_pending_ls_instr);
}

void sync_table()
{
sync_list();
sync_select(m_pending_ts_instr);
}

friend class ReversedNoCopyInputStream;
};


class ReversedNoCopyInputStream : public util::NoCopyInputStream {
public:
ReversedNoCopyInputStream(TransactReverser& reverser)
: m_instr_order(reverser.m_instructions)
{
// push any pending select_table into the buffer
reverser.sync_table();

m_buffer = reverser.m_buffer.get_data();
m_current = m_instr_order.size();
}

util::Span<const char> next_block() override
{
if (m_current != 0) {
m_current--;
return {m_buffer + m_instr_order[m_current].begin, m_buffer + m_instr_order[m_current].end};
}
return {m_buffer, m_buffer};
}

private:
const char* m_buffer;
std::vector<TransactReverser::Instr>& m_instr_order;
size_t m_current;
};

} // namespace _impl
} // namespace realm

Expand Down
35 changes: 27 additions & 8 deletions src/realm/object-store/impl/transact_log_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace {

class KVOAdapter : public _impl::TransactionChangeInfo {
public:
KVOAdapter(std::vector<BindingContext::ObserverState>& observers, BindingContext* context);
KVOAdapter(std::vector<BindingContext::ObserverState>& observers, BindingContext* context, bool is_rollback);

void before(Transaction& sg);
void after(Transaction& sg);
Expand All @@ -51,12 +51,15 @@ class KVOAdapter : public _impl::TransactionChangeInfo {
};
std::vector<ListInfo> m_lists;
VersionID m_version;
bool m_rollback;
};

KVOAdapter::KVOAdapter(std::vector<BindingContext::ObserverState>& observers, BindingContext* context)
KVOAdapter::KVOAdapter(std::vector<BindingContext::ObserverState>& observers, BindingContext* context,
bool is_rollback)
: _impl::TransactionChangeInfo{}
, m_context(context)
, m_observers(observers)
, m_rollback(is_rollback)
{
if (m_observers.empty())
return;
Expand Down Expand Up @@ -101,7 +104,7 @@ void KVOAdapter::before(Transaction& sg)

auto const& table = it->second;
auto key = observer.obj_key;
if (table.deletions_contains(key)) {
if (m_rollback ? table.insertions_contains(key) : table.deletions_contains(key)) {
m_invalidated.push_back(observer.info);
continue;
}
Expand Down Expand Up @@ -134,7 +137,7 @@ void KVOAdapter::before(Transaction& sg)

builder.modifications.remove(builder.insertions);

// KVO can't express moves (becaue NSArray doesn't have them), so
// KVO can't express moves (because NSArray doesn't have them), so
// transform them into a series of sets on each affected index when possible
if (!builder.moves.empty() && builder.insertions.count() == builder.moves.size() &&
builder.deletions.count() == builder.moves.size()) {
Expand Down Expand Up @@ -185,6 +188,22 @@ void KVOAdapter::before(Transaction& sg)
changes.kind = BindingContext::ColumnInfo::Kind::Remove;
changes.indices = builder.deletions;
}

// If we're rolling back a write transaction, insertions are actually
// deletions and vice-versa. More complicated scenarios which would
// require logic beyond this fortunately just aren't supported by KVO.
if (m_rollback) {
switch (changes.kind) {
case BindingContext::ColumnInfo::Kind::Insert:
changes.kind = BindingContext::ColumnInfo::Kind::Remove;
break;
case BindingContext::ColumnInfo::Kind::Remove:
changes.kind = BindingContext::ColumnInfo::Kind::Insert;
break;
default:
break;
}
}
}
m_context->will_change(m_observers, m_invalidated);
}
Expand Down Expand Up @@ -467,9 +486,9 @@ class KVOTransactLogObserver : public TransactLogObserver {

public:
KVOTransactLogObserver(std::vector<BindingContext::ObserverState>& observers, BindingContext* context,
_impl::NotifierPackage& notifiers, Transaction& sg)
_impl::NotifierPackage& notifiers, Transaction& sg, bool is_rollback = false)
: TransactLogObserver(m_adapter)
, m_adapter(observers, context)
, m_adapter(observers, context, is_rollback)
, m_notifiers(notifiers)
, m_sg(sg)
{
Expand Down Expand Up @@ -589,8 +608,8 @@ void cancel(Transaction& tr, BindingContext* context)
}

_impl::NotifierPackage notifiers;
KVOTransactLogObserver o(observers, context, notifiers, tr);
tr.rollback_and_continue_as_read(&o);
KVOTransactLogObserver o(observers, context, notifiers, tr, true);
tr.rollback_and_continue_as_read(o);
}

void advance(Transaction& tr, TransactionChangeInfo& info, VersionID version)
Expand Down
Loading

0 comments on commit 8713b66

Please sign in to comment.