Skip to content

Commit

Permalink
Allow deletion of ephemeral objects before commit (#7064)
Browse files Browse the repository at this point in the history
* Optimize further by dropping ttl on ephemeral objects
* Just clear asymmetric tables
  • Loading branch information
jedelbo authored Oct 20, 2023
1 parent 768cdcc commit 78b75be
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 43 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* Deleting an object in an asymmetric table would cause a crash. Likely to solve [#1537](https://github.com/realm/realm-kotlin/issues/1537), since v12.1.0.

### Breaking changes
* None.
Expand Down
2 changes: 1 addition & 1 deletion src/realm/cluster_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ void ClusterTree::verify() const
#endif
}

void ClusterTree::nullify_links(ObjKey obj_key, CascadeState& state)
void ClusterTree::nullify_incoming_links(ObjKey obj_key, CascadeState& state)
{
REALM_ASSERT(state.m_group);
m_root->nullify_incoming_links(obj_key, state);
Expand Down
2 changes: 1 addition & 1 deletion src/realm/cluster_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ClusterTree {
{
m_root->destroy_deep();
}
void nullify_links(ObjKey, CascadeState&);
void nullify_incoming_links(ObjKey, CascadeState&);
bool is_empty() const noexcept
{
return size() == 0;
Expand Down
8 changes: 4 additions & 4 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2411,11 +2411,11 @@ Replication::version_type DB::do_commit(Transaction& transaction, bool commit_to
}
version_type new_version = current_version + 1;

if (!transaction.m_objects_to_delete.empty()) {
for (auto it : transaction.m_objects_to_delete) {
transaction.get_table(it.table_key)->remove_object(it.obj_key);
if (!transaction.m_tables_to_clear.empty()) {
for (auto table_key : transaction.m_tables_to_clear) {
transaction.get_table_unchecked(table_key)->clear();
}
transaction.m_objects_to_delete.clear();
transaction.m_tables_to_clear.clear();
}
if (Replication* repl = get_replication()) {
// If Replication::prepare_commit() fails, then the entire transaction
Expand Down
26 changes: 10 additions & 16 deletions src/realm/group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <optional>
#include <stdexcept>
#include <string>
#include <vector>
#include <set>
#include <chrono>

#include <realm/alloc_slab.hpp>
Expand Down Expand Up @@ -538,18 +538,6 @@ class Group : public ArrayParent {
private:
static constexpr StringData g_class_name_prefix = "class_";

struct ToDeleteRef {
ToDeleteRef(TableKey tk, ObjKey k)
: table_key(tk)
, obj_key(k)
, ttl(std::chrono::steady_clock::now())
{
}
TableKey table_key;
ObjKey obj_key;
std::chrono::steady_clock::time_point ttl;
};

// nullptr, if we're sharing an allocator provided during initialization
std::unique_ptr<SlabAlloc> m_local_alloc;
// in-use allocator. If local, then equal to m_local_alloc.
Expand Down Expand Up @@ -614,7 +602,7 @@ class Group : public ArrayParent {

util::UniqueFunction<void(const CascadeNotification&)> m_notify_handler;
util::UniqueFunction<void()> m_schema_change_handler;
std::vector<ToDeleteRef> m_objects_to_delete;
std::set<TableKey> m_tables_to_clear;

Group(SlabAlloc* alloc) noexcept;
void init_array_parents() noexcept;
Expand Down Expand Up @@ -809,6 +797,7 @@ class Group : public ArrayParent {
std::optional<size_t> read_lock_file_size = util::none,
std::optional<uint_fast64_t> read_lock_version = util::none);

Table* get_table_unchecked(TableKey);
size_t find_table_index(StringData name) const noexcept;
TableKey ndx2key(size_t ndx) const;
size_t key2ndx(TableKey key) const;
Expand Down Expand Up @@ -946,11 +935,16 @@ inline TableKey Group::find_table(StringData name) const noexcept
return (ndx != npos) ? ndx2key(ndx) : TableKey{};
}

inline Table* Group::get_table_unchecked(TableKey key)
{
auto ndx = key2ndx_checked(key);
return do_get_table(ndx); // Throws
}

inline TableRef Group::get_table(TableKey key)
{
check_attached();
auto ndx = key2ndx_checked(key);
Table* table = do_get_table(ndx); // Throws
Table* table = get_table_unchecked(key);
return TableRef(table, table ? table->m_alloc.get_instance_version() : 0);
}

Expand Down
16 changes: 10 additions & 6 deletions src/realm/sync/subscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,11 @@ MutableSubscriptionSet SubscriptionStore::get_mutable_by_version(int64_t version
{
auto tr = m_db->start_write();
auto sub_sets = tr->get_table(m_sub_set_table);
return MutableSubscriptionSet(weak_from_this(), std::move(tr),
sub_sets->get_object_with_primary_key(Mixed{version_id}));
auto obj = sub_sets->get_object_with_primary_key(Mixed{version_id});
if (!obj) {
throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
}
return MutableSubscriptionSet(weak_from_this(), std::move(tr), obj);
}

SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id) const
Expand All @@ -897,15 +900,16 @@ SubscriptionSet SubscriptionStore::get_by_version_impl(int64_t version_id,
{
auto tr = m_db->start_frozen(db_version.value_or(VersionID{}));
auto sub_sets = tr->get_table(m_sub_set_table);
try {
return SubscriptionSet(weak_from_this(), *tr, sub_sets->get_object_with_primary_key(Mixed{version_id}));
auto obj = sub_sets->get_object_with_primary_key(Mixed{version_id});
if (obj) {
return SubscriptionSet(weak_from_this(), *tr, obj);
}
catch (const KeyNotFound&) {
else {
std::lock_guard<std::mutex> lk(m_pending_notifications_mutex);
if (version_id < m_min_outstanding_version) {
return SubscriptionSet(weak_from_this(), version_id, SubscriptionSet::SupersededTag{});
}
throw;
throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
}
}

Expand Down
30 changes: 17 additions & 13 deletions src/realm/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ void Table::remove_recursive(CascadeState& cascade_state)
cascade_state.send_notifications();

for (auto& l : cascade_state.m_to_be_nullified) {
Obj obj = group->get_table(l.origin_table)->try_get_object(l.origin_key);
Obj obj = group->get_table_unchecked(l.origin_table)->try_get_object(l.origin_key);
REALM_ASSERT_DEBUG(obj);
if (obj) {
std::move(obj).nullify_link(l.origin_col_key, l.old_target_link);
Expand All @@ -558,7 +558,7 @@ void Table::remove_recursive(CascadeState& cascade_state)

auto to_delete = std::move(cascade_state.m_to_be_deleted);
for (auto obj : to_delete) {
auto table = group->get_table(obj.first);
auto table = obj.first == m_key ? this : group->get_table_unchecked(obj.first);
// This might add to the list of objects that should be deleted
REALM_ASSERT(!obj.second.is_unresolved());
table->m_clusters.erase(obj.second, cascade_state);
Expand All @@ -572,8 +572,9 @@ void Table::nullify_links(CascadeState& cascade_state)
Group* group = get_parent_group();
REALM_ASSERT(group);
for (auto& to_delete : cascade_state.m_to_be_deleted) {
auto table = group->get_table(to_delete.first);
table->m_clusters.nullify_links(to_delete.second, cascade_state);
auto table = to_delete.first == m_key ? this : group->get_table_unchecked(to_delete.first);
if (!table->is_asymmetric())
table->m_clusters.nullify_incoming_links(to_delete.second, cascade_state);
}
}

Expand Down Expand Up @@ -2178,20 +2179,22 @@ void Table::batch_erase_rows(const KeyColumn& keys)
void Table::batch_erase_objects(std::vector<ObjKey>& keys)
{
Group* g = get_parent_group();
bool maybe_has_incoming_links = g && !is_asymmetric();

if (has_any_embedded_objects() || (g && g->has_cascade_notification_handler())) {
CascadeState state(CascadeState::Mode::Strong, g);
std::for_each(keys.begin(), keys.end(), [this, &state](ObjKey k) {
state.m_to_be_deleted.emplace_back(m_key, k);
});
nullify_links(state);
if (maybe_has_incoming_links)
nullify_links(state);
remove_recursive(state);
}
else {
CascadeState state(CascadeState::Mode::None, g);
for (auto k : keys) {
if (g) {
m_clusters.nullify_links(k, state);
if (maybe_has_incoming_links) {
m_clusters.nullify_incoming_links(k, state);
}
m_clusters.erase(k, state);
}
Expand Down Expand Up @@ -3053,7 +3056,7 @@ Obj Table::create_object_with_primary_key(const Mixed& primary_key, FieldValues&
}
}
if (is_asymmetric() && repl && repl->get_history_type() == Replication::HistoryType::hist_SyncClient) {
get_parent_group()->m_objects_to_delete.emplace_back(this->m_key, ret.get_key());
get_parent_group()->m_tables_to_clear.insert(this->m_key);
}
return ret;
}
Expand Down Expand Up @@ -3147,7 +3150,8 @@ Obj Table::get_object_with_primary_key(Mixed primary_key) const
DataType type = DataType(primary_key_col.get_type());
REALM_ASSERT((primary_key.is_null() && primary_key_col.get_attrs().test(col_attr_Nullable)) ||
primary_key.get_type() == type);
return m_clusters.get(m_index_accessors[primary_key_col.get_index().val]->find_first(primary_key));
ObjKey k = m_index_accessors[primary_key_col.get_index().val]->find_first(primary_key);
return k ? m_clusters.get(k) : Obj{};
}

Mixed Table::get_primary_key(ObjKey key) const
Expand Down Expand Up @@ -3388,13 +3392,13 @@ void Table::remove_object(ObjKey key)
if (has_any_embedded_objects() || (g && g->has_cascade_notification_handler())) {
CascadeState state(CascadeState::Mode::Strong, g);
state.m_to_be_deleted.emplace_back(m_key, key);
m_clusters.nullify_links(key, state);
m_clusters.nullify_incoming_links(key, state);
remove_recursive(state);
}
else {
CascadeState state(CascadeState::Mode::None, g);
if (g) {
m_clusters.nullify_links(key, state);
m_clusters.nullify_incoming_links(key, state);
}
m_clusters.erase(key, state);
}
Expand Down Expand Up @@ -3946,7 +3950,7 @@ bool Table::has_any_embedded_objects()
for_each_public_column([&](ColKey col_key) {
auto target_table_key = get_opposite_table_key(col_key);
if (target_table_key && is_link_type(col_key.get_type())) {
auto target_table = get_parent_group()->get_table(target_table_key);
auto target_table = get_parent_group()->get_table_unchecked(target_table_key);
if (target_table->is_embedded()) {
m_has_any_embedded_objects = true;
return IteratorControl::Stop; // early out
Expand Down Expand Up @@ -3984,7 +3988,7 @@ ColKey Table::find_or_add_backlink_column(ColKey origin_col_key, TableKey origin
set_opposite_column(backlink_col_key, origin_table, origin_col_key);

if (Replication* repl = get_repl())
repl->typed_link_change(get_parent_group()->get_table(origin_table).unchecked_ptr(), origin_col_key,
repl->typed_link_change(get_parent_group()->get_table_unchecked(origin_table), origin_col_key,
m_key); // Throws
}

Expand Down
3 changes: 2 additions & 1 deletion src/realm/table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ class Table {
// - turns the object into a tombstone if links exist
// - otherwise works just as remove_object()
ObjKey invalidate_object(ObjKey key);
// Remove several objects
void batch_erase_objects(std::vector<ObjKey>& keys);
Obj try_get_tombstone(ObjKey key) const
{
REALM_ASSERT(key.is_unresolved());
Expand Down Expand Up @@ -703,7 +705,6 @@ class Table {
TableRef m_own_ref;

void batch_erase_rows(const KeyColumn& keys);
void batch_erase_objects(std::vector<ObjKey>& keys);
size_t do_set_link(ColKey col_key, size_t row_ndx, size_t target_row_ndx);

void populate_search_index(ColKey col_key);
Expand Down
9 changes: 9 additions & 0 deletions test/object-store/object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2390,6 +2390,15 @@ TEST_CASE("Asymmetric Object") {
REQUIRE(realm->is_empty());
}

SECTION("Delete ephemeral object before comitting") {
realm->begin_transaction();
auto obj = realm->read_group().get_table("class_asymmetric")->create_object_with_primary_key(1);
obj.remove();
realm->commit_transaction();
REQUIRE(!obj.is_valid());
REQUIRE(realm->is_empty());
}

SECTION("Outgoing link not allowed") {
auto obj = create(AnyDict{{"_id", INT64_C(1)}}, "table");
auto table = realm->read_group().get_table("class_table");
Expand Down
2 changes: 2 additions & 0 deletions test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2340,6 +2340,8 @@ TEST_CASE("flx: writes work without waiting for sync", "[sync][flx][baas]") {
realm->refresh();
Results results(realm, table);
CHECK(results.size() == 1);
Obj obj = results.get(0);
CHECK(obj.get_primary_key().get_object_id() == bar_obj_id);
CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
});
}
Expand Down

0 comments on commit 78b75be

Please sign in to comment.