Skip to content

Commit

Permalink
Update FLX subscription state after reporting sync progress and call …
Browse files Browse the repository at this point in the history
…sync_transact_notifiers for bootstraps (#5553)
  • Loading branch information
jbreams authored Jun 1, 2022
1 parent 02adb14 commit 91c5ac9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### Fixed
* Added better comparator for `realm_user_t` and `realm_flx_sync_subscription_t` when using `realm_equals`.(Issue [#5522])(https://github.com/realm/realm-core/issues/5522).
* Changed `realm_sync_session_handle_error_for_testing` in order to support all SDKs. (Issue [#5550])(https://github.com/realm/realm-core/issues/5550).
* FLX sync subscription state changes will now correctly be reported after sync progress is reported ([#5553](https://github.com/realm/realm-core/pull/5553), since v12.0.0)

### Breaking changes
* Removed scheduler argument to the C API `realm_*_add_notification_callback` functions, because it wasn't actually used. (PR [#5541](https://github.com/realm/realm-core/pull/5541)).
Expand Down
9 changes: 5 additions & 4 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,6 @@ void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
}



// ################ SessionImpl ################


Expand Down Expand Up @@ -800,9 +799,11 @@ void SessionImpl::process_pending_flx_bootstrap()

history.integrate_server_changesets(
*pending_batch.progress, &downloadable_bytes, pending_batch.changesets.data(),
pending_batch.changesets.size(), new_version, batch_state, logger, [&](const TransactionRef& tr) {
pending_batch.changesets.size(), new_version, batch_state, logger,
[&](const TransactionRef& tr) {
bootstrap_store->pop_front_pending(tr, pending_batch.changesets.size());
});
},
get_transact_reporter());
download_cursor = pending_batch.progress->download;

logger.info(
Expand All @@ -818,8 +819,8 @@ void SessionImpl::process_pending_flx_bootstrap()
}

REALM_ASSERT_3(query_version, !=, -1);
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
m_wrapper.on_sync_progress();
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
}


Expand Down
46 changes: 34 additions & 12 deletions test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@
#include <iostream>
#include <stdexcept>

namespace realm {

class TestHelper {
public:
static bool can_advance(const SharedRealm& realm)
{
auto& coord = Realm::Internal::get_coordinator(*realm);
return coord.can_advance(*realm);
}
};

} // namespace realm

namespace realm::app {

namespace {
Expand Down Expand Up @@ -100,6 +113,13 @@ std::vector<ObjectId> fill_large_array_schema(FLXSyncTestHarness& harness)
});
return ret;
}

void wait_for_advance(const SharedRealm& realm)
{
timed_wait_for([&] {
return !TestHelper::can_advance(realm);
});
}
} // namespace

TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][app]") {
Expand All @@ -121,6 +141,7 @@ TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][app]") {
{"non_queryable_field", std::string{"non queryable 2"}}}));
});


harness.do_with_new_realm([&](SharedRealm realm) {
wait_for_download(*realm);
{
Expand All @@ -143,7 +164,7 @@ TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][app]") {

wait_for_download(*realm);
{
realm->refresh();
wait_for_advance(realm);
Results results(realm, table);
CHECK(results.size() == 1);
auto obj = results.get<Obj>(0);
Expand All @@ -161,7 +182,7 @@ TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][app]") {
}

{
realm->refresh();
wait_for_advance(realm);
Results results(realm, Query(table));
CHECK(results.size() == 2);
}
Expand All @@ -179,7 +200,7 @@ TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][app]") {
}

{
realm->refresh();
wait_for_advance(realm);
Results results(realm, Query(table));
CHECK(results.size() == 1);
auto obj = results.get<Obj>(0);
Expand All @@ -195,7 +216,7 @@ TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][app]") {
}

{
realm->refresh();
wait_for_advance(realm);
Results results(realm, table);
CHECK(results.size() == 0);
}
Expand Down Expand Up @@ -333,7 +354,7 @@ TEST_CASE("flx: uploading an object that is out-of-view results in compensating
std::move(error_future).get(), invalid_obj,
util::format("write to '%1' in table \"TopLevel\" not allowed", invalid_obj.to_string()));

realm->refresh();
wait_for_advance(realm);

auto top_level_table = realm->read_group().get_table("class_TopLevel");
REQUIRE(top_level_table->is_empty());
Expand Down Expand Up @@ -377,7 +398,7 @@ TEST_CASE("flx: uploading an object that is out-of-view results in compensating
std::move(error_future).get(), invalid_obj,
util::format("write to '%1' in table \"TopLevel\" not allowed", invalid_obj.to_string()));

realm->refresh();
wait_for_advance(realm);

obj = Object::get_for_primary_key(c, realm, "TopLevel", util::Any(invalid_obj));
embedded_obj = util::any_cast<Object&&>(obj.get_property_value<util::Any>(c, "embedded_obj"));
Expand All @@ -393,7 +414,7 @@ TEST_CASE("flx: uploading an object that is out-of-view results in compensating
wait_for_upload(*realm);
wait_for_download(*realm);

realm->refresh();
wait_for_advance(realm);
obj = Object::get_for_primary_key(c, realm, "TopLevel", util::Any(invalid_obj));
embedded_obj = util::any_cast<Object&&>(obj.get_property_value<util::Any>(c, "embedded_obj"));
REQUIRE(util::any_cast<std::string&&>(embedded_obj.get_property_value<util::Any>(c, "str_field")) ==
Expand Down Expand Up @@ -435,7 +456,8 @@ TEST_CASE("flx: uploading an object that is out-of-view results in compensating

validate_sync_error(std::move(error_future).get(), invalid_obj,
"object is outside of the current query view");
realm->refresh();

wait_for_advance(realm);

auto top_level_table = realm->read_group().get_table("class_TopLevel");
REQUIRE(top_level_table->size() == 1);
Expand Down Expand Up @@ -554,7 +576,7 @@ TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][f
wait_for_upload(*realm);
wait_for_download(*realm);

realm->refresh();
wait_for_advance(realm);
REQUIRE(table->size() == obj_ids_at_end.size());
for (auto& id : obj_ids_at_end) {
REQUIRE(table->find_primary_key(Mixed{id}));
Expand Down Expand Up @@ -947,7 +969,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app]
auto mutate_realm = [&] {
harness.load_initial_data([&](SharedRealm realm) {
auto table = realm->read_group().get_table("class_TopLevel");
realm->refresh();
wait_for_advance(realm);
Results res(realm, Query(table).greater(table->get_column_key("queryable_int_field"), int64_t(10)));
REQUIRE(res.size() == 2);
res.clear();
Expand Down Expand Up @@ -1020,7 +1042,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app]
wait_for_upload(*realm);
wait_for_download(*realm);

realm->refresh();
wait_for_advance(realm);
auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);

REQUIRE(table->size() == expected_obj_ids.size());
Expand Down Expand Up @@ -1132,8 +1154,8 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app]
.get();
wait_for_upload(*realm);
wait_for_download(*realm);
wait_for_advance(realm);

realm->refresh();
auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);

// After we've downloaded all the mutations there should only by 3 objects left.
Expand Down

0 comments on commit 91c5ac9

Please sign in to comment.