From e9322ee69a1e43be1ce3dd181bccced3723c26dc Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 10:02:15 +0200 Subject: [PATCH 01/20] Reenable sync benchmark --- CHANGELOG.md | 2 +- evergreen/config.yml | 10 ++++++ src/realm/object-store/sync/sync_session.cpp | 2 ++ src/realm/sync/client.cpp | 23 ++++++++++++ src/realm/sync/client.hpp | 17 +++++---- src/realm/sync/config.hpp | 5 +++ src/realm/sync/noinst/client_impl_base.cpp | 2 ++ src/realm/sync/noinst/client_impl_base.hpp | 2 ++ test/CMakeLists.txt | 10 +----- test/benchmark-sync/CMakeLists.txt | 5 +++ .../access_token | 0 .../bench_transform.cpp | 0 .../load_test.cpp | 0 .../load_test_clients_listen_start.sh | 0 test/sync_fixtures.hpp | 36 +++++++++++++++++++ 15 files changed, 98 insertions(+), 16 deletions(-) create mode 100644 test/benchmark-sync/CMakeLists.txt rename test/{bench-sync => benchmark-sync}/access_token (100%) rename test/{bench-sync => benchmark-sync}/bench_transform.cpp (100%) rename test/{bench-sync => benchmark-sync}/load_test.cpp (100%) rename test/{bench-sync => benchmark-sync}/load_test_clients_listen_start.sh (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b6dfc8cc4b..09c63e3db19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ ----------- ### Internals -* None. +* Reenable sync benchmark. ---------------------------------------------- diff --git a/evergreen/config.yml b/evergreen/config.yml index 55fdfdb29b3..1f32426fc32 100644 --- a/evergreen/config.yml +++ b/evergreen/config.yml @@ -149,6 +149,8 @@ functions: exit 1 fi + export UNITTEST_THREADS=1 + BENCHMARK=$(./evergreen/abspath.sh ./build/test/benchmark-${benchmark_name}/${cmake_build_type|Debug}/realm-benchmark-${benchmark_name}) echo "Going to run benchmark $BENCHMARK" @@ -520,6 +522,14 @@ tasks: vars: benchmark_name: crud +- name: benchmark-sync + exec_timeout_secs: 1800 + tags: [ "benchmark" ] + commands: + - func: "run benchmark" + vars: + benchmark_name: sync + - name: sync-tests tags: [ "test_suite", "for_pull_requests" ] exec_timeout_secs: 1800 diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 279aee69b9b..723b67295ea 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -753,6 +753,8 @@ void SyncSession::create_sync_session() return hook(anchor, progress, query_version, batch_state); }; } + session_config.on_before_download_integration = sync_config.on_before_download_integration; + session_config.on_after_download_integration = sync_config.on_after_download_integration; { std::string sync_route = m_sync_manager->sync_route(); diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 68da75ca16b..f912e7cfa47 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -247,6 +247,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac std::function m_on_download_message_received_hook; std::function m_on_bootstrap_message_processed_hook; + std::function m_on_before_download_integration; + std::function m_on_after_download_integration; + std::shared_ptr m_flx_subscription_store; int64_t m_flx_active_version = 0; int64_t m_flx_last_seen_version = 0; @@ -868,6 +871,24 @@ void SessionImpl::receive_download_message_hook(const SyncProgress& progress, in m_wrapper.m_on_download_message_received_hook(progress, query_version, batch_state); } +void SessionImpl::before_download_integration_hook(size_t num_changesets) +{ + if (REALM_LIKELY(!m_wrapper.m_on_before_download_integration)) { + return; + } + + m_wrapper.m_on_before_download_integration(num_changesets); +} + +void SessionImpl::after_download_integration_hook(size_t num_changesets) +{ + if (REALM_LIKELY(!m_wrapper.m_on_after_download_integration)) { + return; + } + + m_wrapper.m_on_after_download_integration(num_changesets); +} + // ################ SessionWrapper ################ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr flx_sub_store, @@ -891,6 +912,8 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr on_download_message_received_hook; - // Will be called after each bootstrap message is added to the pending bootstrap store, - // but before processing a finalized bootstrap. For testing only. + /// Will be called after each bootstrap message is added to the pending bootstrap store, + /// but before processing a finalized bootstrap. For testing only. std::function on_bootstrap_message_processed_hook; + + /// Called before each download message is integrated. For testing only. + std::function on_before_download_integration; + /// Called after each download message is integrated. For testing only. + std::function on_after_download_integration; }; /// \brief Start a new session for the specified client-side Realm. diff --git a/src/realm/sync/config.hpp b/src/realm/sync/config.hpp index a541ee02f7c..e5ed6d3b308 100644 --- a/src/realm/sync/config.hpp +++ b/src/realm/sync/config.hpp @@ -181,6 +181,11 @@ struct SyncConfig { std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)> on_bootstrap_message_processed_hook; + // Called before each download message is integrated on the sync worker thread. For testing only. + std::function on_before_download_integration; + // Called after each download message is integrated on the sync worker thread. For testing only. + std::function on_after_download_integration; + bool simulate_integration_error = false; explicit SyncConfig(std::shared_ptr user, bson::Bson partition); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 5c564be67b7..a29c4f82048 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -1394,8 +1394,10 @@ void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws return; } + before_download_integration_hook(received_changesets.size()); history.integrate_server_changesets(progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, {}, get_transact_reporter()); // Throws + after_download_integration_hook(received_changesets.size()); if (received_changesets.size() == 1) { logger.debug("1 remote changeset integrated, producing client version %1", version_info.sync_version.version); // Throws diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index e0006e1792d..c4fe7aa36d4 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -1079,6 +1079,8 @@ class ClientImpl::Session { void check_for_upload_completion(); void check_for_download_completion(); void receive_download_message_hook(const SyncProgress&, int64_t, DownloadBatchState); + void before_download_integration_hook(size_t); + void after_download_integration_hook(size_t); friend class Connection; }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 948d4e9ea6f..8337cba1132 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -10,6 +10,7 @@ endif() add_subdirectory(benchmark-common-tasks) add_subdirectory(benchmark-crud) add_subdirectory(benchmark-larger) +add_subdirectory(benchmark-sync) # FIXME: Add other benchmarks set(CORE_TEST_SOURCES @@ -215,11 +216,6 @@ if(REALM_ENABLE_SYNC) client/statistics.hpp ) - set(BENCH_TRANSFORM_SOURCES - bench-sync/bench_transform.cpp - test_all.cpp - ) - file(GLOB TEST_RESOURCES RELATIVE ${CMAKE_CURRENT_BINARY_DIR} *.json *.pem ../certificate-authority/certs/* @@ -244,8 +240,4 @@ if(REALM_ENABLE_SYNC) # This enables symbols in backtraces target_link_libraries(SyncTests "-rdynamic") endif() - - add_executable(BenchTransform EXCLUDE_FROM_ALL ${BENCH_TRANSFORM_SOURCES}) - set_target_properties(BenchTransform PROPERTIES OUTPUT_NAME "bench-transform") - target_link_libraries(BenchTransform TestUtil Sync) endif() diff --git a/test/benchmark-sync/CMakeLists.txt b/test/benchmark-sync/CMakeLists.txt new file mode 100644 index 00000000000..089222aea27 --- /dev/null +++ b/test/benchmark-sync/CMakeLists.txt @@ -0,0 +1,5 @@ +if(REALM_ENABLE_SYNC) + add_executable(realm-benchmark-sync bench_transform.cpp ../test_all.cpp) + add_dependencies(benchmarks realm-benchmark-sync) + target_link_libraries(realm-benchmark-sync TestUtil Sync SyncServer) +endif() \ No newline at end of file diff --git a/test/bench-sync/access_token b/test/benchmark-sync/access_token similarity index 100% rename from test/bench-sync/access_token rename to test/benchmark-sync/access_token diff --git a/test/bench-sync/bench_transform.cpp b/test/benchmark-sync/bench_transform.cpp similarity index 100% rename from test/bench-sync/bench_transform.cpp rename to test/benchmark-sync/bench_transform.cpp diff --git a/test/bench-sync/load_test.cpp b/test/benchmark-sync/load_test.cpp similarity index 100% rename from test/bench-sync/load_test.cpp rename to test/benchmark-sync/load_test.cpp diff --git a/test/bench-sync/load_test_clients_listen_start.sh b/test/benchmark-sync/load_test_clients_listen_start.sh similarity index 100% rename from test/bench-sync/load_test_clients_listen_start.sh rename to test/benchmark-sync/load_test_clients_listen_start.sh diff --git a/test/sync_fixtures.hpp b/test/sync_fixtures.hpp index ff148edf7cb..a8eab11e9cc 100644 --- a/test/sync_fixtures.hpp +++ b/test/sync_fixtures.hpp @@ -617,6 +617,42 @@ class MultiClientServerFixture { }); } + // Use either the methods below or `start()`. + void start_server(int index) + { + REALM_ASSERT(index >= 0 && index < m_num_servers); + m_server_threads[index].start([this, index] { + run_server(index); + }); + } + + void start_client(int index) + { + REALM_ASSERT(index >= 0 && index < m_num_clients); + m_client_threads[index].start([this, index] { + run_client(index); + }); + } + + void stop_server(int index) + { + REALM_ASSERT(index >= 0 && index < m_num_servers); + m_servers[index]->stop(); + unit_test::TestContext& test_context = m_test_context; + if (m_server_threads[index].joinable()) + CHECK(!m_server_threads[index].join()); + CHECK_LESS_EQUAL(m_servers[index]->errors_seen(), m_allow_server_errors[index]); + } + + void stop_client(int index) + { + REALM_ASSERT(index >= 0 && index < m_num_clients); + m_clients[index]->stop(); + unit_test::TestContext& test_context = m_test_context; + if (m_client_threads[index].joinable()) + CHECK(!m_client_threads[index].join()); + } + void stop() { for (int i = 0; i < m_num_clients; ++i) From 9e7e4f66a901a7d95457c53348e66e68cc6dbb01 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 10:03:09 +0200 Subject: [PATCH 02/20] Update sync benchmark --- test/benchmark-sync/bench_transform.cpp | 467 +++++++++--------------- 1 file changed, 167 insertions(+), 300 deletions(-) diff --git a/test/benchmark-sync/bench_transform.cpp b/test/benchmark-sync/bench_transform.cpp index eee55255f0a..54708762a45 100644 --- a/test/benchmark-sync/bench_transform.cpp +++ b/test/benchmark-sync/bench_transform.cpp @@ -1,171 +1,187 @@ #include "../util/benchmark_results.hpp" -#include "../util/dump_changesets.hpp" #include "../util/timer.hpp" #include "../util/test_path.hpp" #include "../util/unit_test.hpp" #include "../test_all.hpp" - -#if REALM_ENABLE_ENCRYPTION -#include "../util/crypt_key.hpp" -#endif // REALM_ENABLE_ENCRYPTION - -#include -#include - -#include "../peer.hpp" +#include "../sync_fixtures.hpp" using namespace realm; -using namespace realm::sync; -using namespace realm::test_util; using namespace realm::test_util::unit_test; - -static constexpr auto s_bench_test_dump_dir = "BENCHTEST_DUMP_TRANSFORM"; +using namespace realm::fixtures; namespace bench { +static std::unique_ptr results; + +#define TEST_CLIENT_DB(name) \ + SHARED_GROUP_TEST_PATH(name##_path); \ + auto name = DB::create(make_client_replication(), name##_path); + // Two peers have 1000 transactions each with a handful of instructions in // each (25% transactions contain MoveLastOver). One peer receives and merges // all transactions from the other (but does not apply them to their // database). template -void transform_transactions(TestContext& test_context, BenchmarkResults& results) +void transform_transactions(TestContext& test_context) { std::string ident = test_context.test_details.test_name; - std::string ident_preface = test_context.test_details.test_name + "_Setup"; const size_t num_iterations = 3; for (size_t i = 0; i < num_iterations; ++i) { - // We dump the changesets generated by the performance tests when a directory is specified. - // This generates a performance testing corpus for the Golang implementation. - auto changeset_dump_dir_gen = get_changeset_dump_dir_generator(test_context, s_bench_test_dump_dir); - - auto server = Peer::create_server(test_context, changeset_dump_dir_gen.get()); - auto origin = Peer::create_client(test_context, 2, changeset_dump_dir_gen.get()); - auto client = Peer::create_client(test_context, 3, changeset_dump_dir_gen.get()); + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); // Produce some mostly realistic transactions on both sides. - auto make_transactions = [](Peer& peer) { + auto make_transactions = [](DBRef& db) { ColKey col_ndx; { - peer.start_transaction(); - TableRef t = sync::create_table(*peer.group, "class_t"); - col_ndx = t->add_column(type_Int, "i"); - peer.commit(); + WriteTransaction wt(db); + TableRef t = wt.add_table("class_t"); + col_ndx = t->add_column(type_String, "s"); + wt.commit(); } for (size_t j = 0; j < num_transactions - 1; ++j) { - peer.start_transaction(); - TableRef t = peer.table("class_t"); - t->create_object().set(col_ndx, 123); + WriteTransaction wt(db); + TableRef t = wt.get_table("class_t"); + t->create_object().set(col_ndx, std::string(500, char('a' + j % 26))); // Let 25% of commits contain a MoveLastOver if (j % 4 == 0) { t->remove_object(t->begin()); } - peer.commit(); + wt.commit(); } }; - // Timer t_preface{Timer::type_RealTime}; - make_transactions(*origin); - make_transactions(*client); - // results.submit(ident_preface.c_str(), t_preface.get_elapsed_time()); + make_transactions(db_1); + make_transactions(db_2); - // Upload everything to the server (fast, no conflicts) - size_t outstanding = server->count_outstanding_changesets_from(*origin); - for (size_t j = 0; j < outstanding; ++j) { - server->integrate_next_changeset_from(*origin); - } + TEST_DIR(dir); - outstanding = client->count_outstanding_changesets_from(*server); - REALM_ASSERT(outstanding != 0); + MultiClientServerFixture::Config config; + config.server_public_key_path = ""; + MultiClientServerFixture fixture(2, 1, dir, test_context, config); Timer t{Timer::type_RealTime}; - // FIXME: client to server is artificially slow, because we do not yet - // have a batched UPLOAD ability. - // - // for (size_t j = 0; j < outstanding; ++j) { - // server->integrate_next_changeset_from(*client); - // } - client->integrate_next_changesets_from(*server, outstanding); - results.submit(ident.c_str(), t.get_elapsed_time()); + + Session::Config session_config; + session_config.on_before_download_integration = [&](size_t num_changesets) { + CHECK(num_changesets > 0); + + t.reset(); + }; + session_config.on_after_download_integration = [&](size_t num_changesets) { + CHECK(num_changesets > 0); + + results->submit(ident.c_str(), t.get_elapsed_time()); + }; + + Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of second client. + fixture.start_server(0); + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + fixture.stop_client(1); + + // Upload changes of first client and wait to integrate changes from second client. + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); } - // results.finish(ident_preface, ident_preface); - results.finish(ident, ident); + results->finish(ident, ident, "runtime_secs"); } // Two peers have 1 transaction each with 1000 instructions (8.3% of // instructions are MoveLastOver). One peer receives and merges the large // transaction from the other (but does not apply it to their database). template -void transform_instructions(TestContext& test_context, BenchmarkResults& results) +void transform_instructions(TestContext& test_context) { std::string ident = test_context.test_details.test_name; - std::string ident_preface = test_context.test_details.test_name + "_Setup"; for (size_t i = 0; i < 3; ++i) { - // We dump the changesets generated by the performance tests when a directory is specified. - // This generates a performance testing corpus for the Golang implementation. - auto changeset_dump_dir_gen = get_changeset_dump_dir_generator(test_context, s_bench_test_dump_dir); - - auto server = Peer::create_server(test_context, changeset_dump_dir_gen.get()); - auto client = Peer::create_client(test_context, 2, changeset_dump_dir_gen.get()); + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); // Produce some mostly realistic transactions on both sides. - auto make_instructions = [](Peer& peer) { - peer.start_transaction(); - TableRef t = peer.group->add_table("class_t"); + auto make_instructions = [](DBRef& db) { + WriteTransaction wt(db); + TableRef t = wt.add_table("class_t"); ColKey col_ndx = t->add_column(type_Int, "i"); for (size_t j = 0; j < num_iterations; ++j) { - Obj obj = t->create_object(); - obj.set(col_ndx, 123); + t->create_object().set(col_ndx, 123); // Let 25% of commits contain a MoveLastOver if (j % 4 == 0) { t->begin()->remove(); } } - - return peer.commit(); + wt.commit(); }; - // Timer t_preface{Timer::type_RealTime}; - make_instructions(*server); - make_instructions(*client); - // results.submit(ident_preface.c_str(), t_preface.get_elapsed_time()); + make_instructions(db_1); + make_instructions(db_2); - size_t outstanding = server->count_outstanding_changesets_from(*client); - REALM_ASSERT(outstanding != 0); + TEST_DIR(dir); + + MultiClientServerFixture::Config config; + config.server_public_key_path = ""; + MultiClientServerFixture fixture(2, 1, dir, test_context, config); Timer t{Timer::type_RealTime}; - for (size_t j = 0; j < outstanding; ++j) { - server->integrate_next_changeset_from(*client); - } - results.submit(ident.c_str(), t.get_elapsed_time()); + + Session::Config session_config; + session_config.on_before_download_integration = [&](size_t num_changesets) { + CHECK(num_changesets > 0); + + t.reset(); + }; + session_config.on_after_download_integration = [&](size_t num_changesets) { + CHECK(num_changesets > 0); + + results->submit(ident.c_str(), t.get_elapsed_time()); + }; + Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of second client. + fixture.start_server(0); + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + fixture.stop_client(1); + + // Upload changes of first client and wait to integrate changes from second client. + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); } - // results.finish(ident_preface, ident_preface); - results.finish(ident, ident); + results->finish(ident, ident, "runtime_secs"); } template -void connected_objects(TestContext& test_context, BenchmarkResults& results) +void connected_objects(TestContext& test_context) { std::string ident = test_context.test_details.test_name; - std::string ident_preface = test_context.test_details.test_name + "_Setup"; for (size_t i = 0; i < 3; ++i) { - // We dump the changesets generated by the performance tests when a directory is specified. - // This generates a performance testing corpus for the Golang implementation. - auto changeset_dump_dir_gen = get_changeset_dump_dir_generator(test_context, s_bench_test_dump_dir); + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); - auto server = Peer::create_server(test_context, changeset_dump_dir_gen.get()); - auto client = Peer::create_client(test_context, 2, changeset_dump_dir_gen.get()); - - auto make_instructions = [](Peer& peer) { - peer.start_transaction(); - TableRef t = sync::create_table_with_primary_key(*peer.group, "class_t", type_String, "pk"); - auto col_key = t->add_column(*t, "l"); + // Produce some mostly realistic transactions on both sides. + auto make_instructions = [](DBRef& db) { + WriteTransaction wt(db); + TableRef t = wt.get_group().add_table_with_primary_key("class_t", type_String, "pk"); + ColKey col_key = t->add_column(*t, "l"); // Everything links to this object! auto first_key = t->create_object_with_primary_key("Hello").get_key(); @@ -176,279 +192,130 @@ void connected_objects(TestContext& test_context, BenchmarkResults& results) std::string pk = ss.str(); t->create_object_with_primary_key(pk).set(col_key, first_key); } - - return peer.commit(); + wt.commit(); }; - // Timer t_preface{Time::type_RealTime}; - make_instructions(*server); - make_instructions(*client); - // results.submit(ident_preface.c_str(), t_preface.get_elapsed_time()); + make_instructions(db_1); + make_instructions(db_2); + + TEST_DIR(dir); - size_t outstanding = server->count_outstanding_changesets_from(*client); - REALM_ASSERT(outstanding != 0); + MultiClientServerFixture::Config config; + config.server_public_key_path = ""; + MultiClientServerFixture fixture(2, 1, dir, test_context, config); Timer t{Timer::type_RealTime}; - for (size_t j = 0; j < outstanding; ++j) { - server->integrate_next_changeset_from(*client); - } - results.submit(ident.c_str(), t.get_elapsed_time()); + + Session::Config session_config; + session_config.on_before_download_integration = [&](size_t num_changesets) { + CHECK(num_changesets > 0); + + t.reset(); + }; + session_config.on_after_download_integration = [&](size_t num_changesets) { + CHECK(num_changesets > 0); + + results->submit(ident.c_str(), t.get_elapsed_time()); + }; + Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of second client. + fixture.start_server(0); + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + fixture.stop_client(1); + + // Upload changes of first client and wait to integrate changes from second client. + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); } - results.finish(ident, ident); + results->finish(ident, ident, "runtime_secs"); } } // namespace bench const int max_lead_text_width = 40; -#define RUN_ALL_BENCHMARKS 0 - TEST(BenchMerge1000x1000Instructions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_1000x1000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<1000>(test_context, results); + bench::transform_instructions<1000>(test_context); } TEST(BenchMerge2000x2000Instructions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_2000x2000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<2000>(test_context, results); -} - -TEST_IF(BenchMerge3000x3000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_3000x3000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<3000>(test_context, results); + bench::transform_instructions<2000>(test_context); } TEST(BenchMerge4000x4000Instructions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_4000x4000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<4000>(test_context, results); -} - -TEST_IF(BenchMerge5000x5000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_5000x5000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<5000>(test_context, results); + bench::transform_instructions<4000>(test_context); } TEST(BenchMerge8000x8000Instructions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_8000x8000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<8000>(test_context, results); -} - -TEST_IF(BenchMerge10000x10000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_10000x10000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<10000>(test_context, results); -} - -TEST_IF(BenchMerge11000x11000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_11000x11000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<11000>(test_context, results); -} - -TEST_IF(BenchMerge12000x12000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_12000x12000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<12000>(test_context, results); -} - -TEST_IF(BenchMerge13000x13000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_13000x13000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<13000>(test_context, results); -} - -TEST_IF(BenchMerge14000x14000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_14000x14000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<14000>(test_context, results); -} - -TEST_IF(BenchMerge15000x15000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_15000x15000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<15000>(test_context, results); + bench::transform_instructions<8000>(test_context); } TEST(BenchMerge16000x16000Instructions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_16000x16000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<16000>(test_context, results); -} - -TEST_IF(BenchMerge17000x17000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_17000x17000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<17000>(test_context, results); -} - -TEST_IF(BenchMerge18000x18000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_18000x18000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<18000>(test_context, results); -} - -TEST_IF(BenchMerge19000x19000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_19000x19000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<19000>(test_context, results); + bench::transform_instructions<16000>(test_context); } -TEST_IF(BenchMerge20000x20000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_20000x20000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<20000>(test_context, results); -} - - TEST(BenchMerge100x100Transactions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_100x100"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<100>(test_context, results); + bench::transform_transactions<100>(test_context); } TEST(BenchMerge500x500Transactions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_500x500"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<500>(test_context, results); + bench::transform_transactions<500>(test_context); } TEST(BenchMerge1000x1000Transactions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_1000x1000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<1000>(test_context, results); + bench::transform_transactions<1000>(test_context); } TEST(BenchMerge2000x2000Transactions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_2000x2000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<2000>(test_context, results); -} - -TEST_IF(BenchMerge3000x3000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_3000x3000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<3000>(test_context, results); + bench::transform_transactions<2000>(test_context); } - TEST(BenchMerge4000x4000Transactions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_4000x4000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<4000>(test_context, results); + bench::transform_transactions<4000>(test_context); } -TEST_IF(BenchMerge5000x5000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_5000x5000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<5000>(test_context, results); -} - -TEST_IF(BenchMerge6000x6000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_6000x6000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<6000>(test_context, results); -} - -TEST_IF(BenchMerge7000x7000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_7000x7000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<7000>(test_context, results); -} - - TEST(BenchMerge8000x8000Transactions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_8000x8000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<8000>(test_context, results); -} - -TEST_IF(BenchMerge9000x9000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_9000x9000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<9000>(test_context, results); + bench::transform_transactions<8000>(test_context); } TEST(BenchMerge16000x16000Transactions) { - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_16000x16000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<16000>(test_context, results); + bench::transform_transactions<16000>(test_context); } TEST(BenchMergeManyConnectedObjects) { - std::string results_file_stem = test_util::get_test_path_prefix() + "connected_objects"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::connected_objects<8000>(test_context, results); + bench::connected_objects<1000>(test_context); } #if !REALM_IOS -int main(int argc, char** argv) +int main() { - return test_all(argc, argv, nullptr); + std::string results_file_stem = realm::test_util::get_test_path_prefix() + "results"; + bench::results = + std::make_unique(max_lead_text_width, "benchmark-sync", results_file_stem.c_str()); + auto exit_status = test_all(nullptr, true); + // Save to file when deallocated. + bench::results.reset(); + return exit_status; } #endif // REALM_IOS + From 313ed46b7dcfb50e1e7001c642ddefd161990cdb Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 10:09:28 +0200 Subject: [PATCH 03/20] Add method to check if other threads are waiting for the write mutex --- src/realm/db.cpp | 11 +++++++++++ src/realm/db.hpp | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/src/realm/db.cpp b/src/realm/db.cpp index 6fba3c7b70c..db32504586f 100644 --- a/src/realm/db.cpp +++ b/src/realm/db.cpp @@ -1518,6 +1518,17 @@ void DB::close_internal(std::unique_lock lock, bool allow_ope } } +bool DB::waiting_for_write_lock() const +{ + SharedInfo* info = m_file_map.get_addr(); + + uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed); + uint32_t next_served = info->next_served.load(std::memory_order_relaxed); + // When holding the write mutex, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and + // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write mutex. + return next_ticket > next_served + 1; +} + class DB::AsyncCommitHelper { public: AsyncCommitHelper(DB* db) diff --git a/src/realm/db.hpp b/src/realm/db.hpp index 6640bfac647..0d043e012b0 100644 --- a/src/realm/db.hpp +++ b/src/realm/db.hpp @@ -408,6 +408,10 @@ class DB : public std::enable_shared_from_this { void claim_sync_agent(); void release_sync_agent(); + /// Returs true if there are threads waiting to acquire the write mutex, false otherwise. + /// To be used only when already holding the mutex. + bool waiting_for_write_lock() const; + protected: explicit DB(const DBOptions& options); // Is this ever used? From 5429e856396a046f7d7b05e0250f156967920072 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 11:00:20 +0200 Subject: [PATCH 04/20] Allow users to commit changes while sync client is integrating remote changes --- src/realm/sync/client.cpp | 7 +- src/realm/sync/noinst/client_history_impl.cpp | 234 ++++++++++-------- src/realm/sync/noinst/client_history_impl.hpp | 10 +- src/realm/sync/noinst/client_impl_base.cpp | 3 +- .../sync/noinst/server/server_history.cpp | 32 ++- .../sync/tools/apply_to_state_command.cpp | 8 +- src/realm/sync/transform.cpp | 47 +++- src/realm/sync/transform.hpp | 18 +- test/peer.hpp | 16 +- 9 files changed, 218 insertions(+), 157 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index f912e7cfa47..96ce974cc96 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -813,10 +813,11 @@ void SessionImpl::process_pending_flx_bootstrap() history.integrate_server_changesets( *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger, - [&](const TransactionRef& tr) { - bootstrap_store->pop_front_pending(tr, pending_batch.changesets.size()); + [&](const TransactionRef& tr, size_t count) { + REALM_ASSERT_3(count, <=, pending_batch.changesets.size()); + bootstrap_store->pop_front_pending(tr, count); }, - get_transact_reporter()); + get_transact_reporter(), true); progress = *pending_batch.progress; logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version " diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 0b29bf4ca2e..83ad1da573d 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -376,133 +376,161 @@ void ClientHistory::find_uploadable_changesets(UploadCursor& upload_progress, ve } -void ClientHistory::integrate_server_changesets(const SyncProgress& progress, - const std::uint_fast64_t* downloadable_bytes, - util::Span incoming_changesets, - VersionInfo& version_info, DownloadBatchState batch_state, - util::Logger& logger, - util::UniqueFunction run_in_write_tr, - SyncTransactReporter* transact_reporter) +void ClientHistory::integrate_server_changesets( + const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, + util::Span incoming_changesets, VersionInfo& version_info, DownloadBatchState batch_state, + util::Logger& logger, util::UniqueFunction run_in_write_tr, + SyncTransactReporter* transact_reporter, bool is_flx_sync_connection) { REALM_ASSERT(incoming_changesets.size() != 0); - - std::uint_fast64_t downloaded_bytes_in_message = 0; + REALM_ASSERT(is_flx_sync_connection || batch_state == DownloadBatchState::LastInBatch); std::vector changesets; changesets.resize(incoming_changesets.size()); // Throws + // Parse incoming changesets without holding the write lock. try { for (std::size_t i = 0; i < incoming_changesets.size(); ++i) { const RemoteChangeset& changeset = incoming_changesets[i]; - downloaded_bytes_in_message += changeset.original_changeset_size; parse_remote_changeset(changeset, changesets[i]); // Throws changesets[i].transform_sequence = i; } } - catch (const TransformError& e) { + catch (const BadChangesetError& e) { throw IntegrationException(ClientError::bad_changeset, util::format("Failed to parse received changeset: %1", e.what())); } - TransactionRef transact = m_db->start_write(); // Throws - VersionID old_version = transact->get_version_of_current_transaction(); - version_type local_version = old_version.version; - auto sync_file_id = transact->get_sync_file_id(); - REALM_ASSERT(sync_file_id != 0); + VersionID new_version{0, 0}; + constexpr std::size_t commit_byte_size_limit = 0x19000; // 100 KB + + // Ideally, this loop runs only once, but it can run up to `incoming_changesets.size()` times, + // depending on how many times the sync client needs to yield the write lock to allow + // the user to commit their changes. + // In each iteration, at least one changeset is transformed and committed. + for (size_t changeset_ndx = 0; changeset_ndx < incoming_changesets.size();) { + TransactionRef transact = m_db->start_write(); // Throws + VersionID old_version = transact->get_version_of_current_transaction(); + version_type local_version = old_version.version; + auto sync_file_id = transact->get_sync_file_id(); + REALM_ASSERT(sync_file_id != 0); + + ensure_updated(local_version); // Throws + prepare_for_write(); // Throws + + std::uint64_t downloaded_bytes_in_transaction = 0; + size_t changesets_transformed_count = 0; + + try { + for (std::size_t i = changeset_ndx; i < incoming_changesets.size(); ++i) { + const RemoteChangeset& changeset = incoming_changesets[i]; + REALM_ASSERT(changeset.last_integrated_local_version <= local_version); + REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id); + + // It is possible that the synchronization history has been trimmed + // to a point where a prefix of the merge window is no longer + // available, but this can only happen if that prefix consisted + // entirely of upload skippable entries. Since such entries (those + // that are empty or of remote origin) will be skipped by the + // transformer anyway, we can simply clamp the beginning of the + // merge window to the beginning of the synchronization history, + // when this situation occurs. + // + // See trim_sync_history() for further details. + if (changesets[i].last_integrated_remote_version < m_sync_history_base_version) + changesets[i].last_integrated_remote_version = m_sync_history_base_version; + } - ensure_updated(local_version); // Throws - prepare_for_write(); // Throws + if (m_replication.apply_server_changes()) { + Transformer& transformer = get_transformer(); // Throws + auto changeset_applier = [&](Changeset* transformed_changeset) -> bool { + InstructionApplier applier{*transact}; + { + TempShortCircuitReplication tscr{m_replication}; + applier.apply(*transformed_changeset, &logger); // Throws + } - ChangesetEncoder::Buffer transformed_changeset; - try { - for (std::size_t i = 0; i < incoming_changesets.size(); ++i) { - const RemoteChangeset& changeset = incoming_changesets[i]; - REALM_ASSERT(changeset.last_integrated_local_version <= local_version); - REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id); - - // It is possible that the synchronization history has been trimmed - // to a point where a prefix of the merge window is no longer - // available, but this can only happen if that prefix consisted - // entirely of upload skippable entries. Since such entries (those - // that are empty or of remote origin) will be skipped by the - // transformer anyway, we can simply clamp the beginning of the - // merge window to the beginning of the synchronization history, - // when this situation occurs. - // - // See trim_sync_history() for further details. - if (changesets[i].last_integrated_remote_version < m_sync_history_base_version) - changesets[i].last_integrated_remote_version = m_sync_history_base_version; - } + return !(m_db->waiting_for_write_lock() && transact->get_commit_size() >= commit_byte_size_limit); + }; + changesets_transformed_count = transformer.transform_remote_changesets( + *this, sync_file_id, local_version, + util::Span{changesets.data() + changeset_ndx, changesets.size() - changeset_ndx}, + std::move(changeset_applier), &logger); // Throws + } + else { + // Skip over all changesets if they don't need to be transformed and applied. + changesets_transformed_count = incoming_changesets.size(); + } - if (m_replication.apply_server_changes()) { - Transformer& transformer = get_transformer(); // Throws - transformer.transform_remote_changesets(*this, sync_file_id, local_version, changesets, - &logger); // Throws - - // Changesets are applied to the Realm with replication temporarily - // disabled. The main reason for disabling replication and manually adding - // the transformed changesets to the history, is that the replication system - // (due to technical debt) is unable in some cases to produce a correct - // changeset while applying another one (i.e., it cannot carbon copy). - TempShortCircuitReplication tscr{m_replication}; - InstructionApplier applier{*transact}; - for (std::size_t i = 0; i < incoming_changesets.size(); ++i) { - encode_changeset(changesets[i], transformed_changeset); - applier.apply(changesets[i], &logger); // Throws + // Compute downloaded bytes only after we know how many remote changesets are going to be commited in this + // transaction. + for (std::size_t i = changeset_ndx; i < changeset_ndx + changesets_transformed_count; ++i) { + downloaded_bytes_in_transaction += incoming_changesets[i].original_changeset_size; } } - } - catch (const BadChangesetError& e) { - throw IntegrationException(ClientError::bad_changeset, - util::format("Failed to apply received changeset: %1", e.what())); - } - catch (const TransformError& e) { - throw IntegrationException(ClientError::bad_changeset, - util::format("Failed to transform received changeset: %1", e.what())); - } - - // downloaded_bytes always contains the total number of downloaded bytes - // from the Realm. downloaded_bytes must be persisted in the Realm, since - // the downloaded changesets are trimmed after use, and since it would be - // expensive to traverse the entire history. - Array& root = m_arrays->root; - auto downloaded_bytes = - std::uint_fast64_t(root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int()); - downloaded_bytes += downloaded_bytes_in_message; - root.set(s_progress_downloaded_bytes_iip, RefOrTagged::make_tagged(downloaded_bytes)); // Throws - - // During the bootstrap phase in flexible sync, the server sends multiple download messages with the same - // synthetic server version that represents synthetic changesets generated from state on the server. - if (batch_state == DownloadBatchState::LastInBatch) { - update_sync_progress(progress, downloadable_bytes, transact); // Throws - } - if (run_in_write_tr) { - run_in_write_tr(transact); - } - - // The reason we can use the `origin_timestamp`, and the `origin_file_ident` - // from the last incoming changeset, and ignore all the other changesets, is - // that these values are actually irrelevant for changesets of remote origin - // stored in the client-side history (for now), except that - // `origin_file_ident` is required to be nonzero, to mark it as having been - // received from the server. - const Changeset& last_changeset = changesets.back(); - HistoryEntry entry; - entry.origin_timestamp = last_changeset.origin_timestamp; - entry.origin_file_ident = last_changeset.origin_file_ident; - entry.remote_version = last_changeset.version; - entry.changeset = BinaryData(transformed_changeset.data(), transformed_changeset.size()); - add_sync_history_entry(entry); // Throws + catch (const BadChangesetError& e) { + throw IntegrationException(ClientError::bad_changeset, + util::format("Failed to apply received changeset: %1", e.what())); + } + catch (const TransformError& e) { + throw IntegrationException(ClientError::bad_changeset, + util::format("Failed to transform received changeset: %1", e.what())); + } - // Tell prepare_commit()/add_changeset() not to write a history entry for - // this transaction as we already did it. - REALM_ASSERT(!m_applying_server_changeset); - m_applying_server_changeset = true; - auto new_version = transact->commit_and_continue_as_read(); // Throws + changeset_ndx += changesets_transformed_count; + + // downloaded_bytes always contains the total number of downloaded bytes + // from the Realm. downloaded_bytes must be persisted in the Realm, since + // the downloaded changesets are trimmed after use, and since it would be + // expensive to traverse the entire history. + Array& root = m_arrays->root; + auto downloaded_bytes = + std::uint64_t(root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int()); + downloaded_bytes += downloaded_bytes_in_transaction; + root.set(s_progress_downloaded_bytes_iip, RefOrTagged::make_tagged(downloaded_bytes)); // Throws + + const RemoteChangeset& last_changeset = incoming_changesets[changeset_ndx - 1]; + + // During the bootstrap phase in flexible sync, the server sends multiple download messages with the same + // synthetic server version that represents synthetic changesets generated from state on the server. + if (is_flx_sync_connection && batch_state == DownloadBatchState::LastInBatch && + changeset_ndx == incoming_changesets.size()) { + update_sync_progress(progress, downloadable_bytes, transact); // Throws + } + // Always update progress in PBS. + else if (!is_flx_sync_connection) { + auto partial_progress = progress; + partial_progress.download.server_version = last_changeset.remote_version; + partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version; + update_sync_progress(partial_progress, downloadable_bytes, transact); // Throws + } + if (run_in_write_tr) { + run_in_write_tr(transact, changesets_transformed_count); + } - if (transact_reporter) { - transact_reporter->report_sync_transact(old_version, new_version); // Throws + // The reason we can use the `origin_timestamp`, and the `origin_file_ident` + // from the last transformed changeset, and ignore all the other changesets, is + // that these values are actually irrelevant for changesets of remote origin + // stored in the client-side history (for now), except that + // `origin_file_ident` is required to be nonzero, to mark it as having been + // received from the server. + HistoryEntry entry; + entry.origin_timestamp = last_changeset.origin_timestamp; + entry.origin_file_ident = last_changeset.origin_file_ident; + entry.remote_version = last_changeset.remote_version; + add_sync_history_entry(entry); // Throws + + // Tell prepare_commit()/add_changeset() not to write a history entry for + // this transaction as we already did it. + REALM_ASSERT(!m_applying_server_changeset); + m_applying_server_changeset = true; + new_version = transact->commit_and_continue_as_read(); // Throws + + if (transact_reporter) { + transact_reporter->report_sync_transact(old_version, new_version); // Throws + } } + REALM_ASSERT(new_version.version > 0); version_info.realm_version = new_version.version; version_info.sync_version = {new_version.version, 0}; } @@ -908,7 +936,7 @@ bool ClientHistory::no_pending_local_changes(version_type version) const void ClientHistory::do_trim_sync_history(std::size_t n) { - REALM_ASSERT(sync_history_size() == sync_history_size()); + REALM_ASSERT(m_arrays->changesets.size() == sync_history_size()); REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size()); REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size()); REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size()); diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index bc149ab22aa..af79dbf5620 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -249,11 +249,11 @@ class ClientHistory final : public _impl::History, public TransformHistory { /// \param transact_reporter An optional callback which will be called with the /// version immediately processing the sync transaction and that of the sync /// transaction. - void integrate_server_changesets(const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, - util::Span changesets, VersionInfo& new_version, - DownloadBatchState download_type, util::Logger&, - util::UniqueFunction run_in_write_tr = nullptr, - SyncTransactReporter* transact_reporter = nullptr); + void integrate_server_changesets( + const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, + util::Span changesets, VersionInfo& new_version, DownloadBatchState download_type, + util::Logger&, util::UniqueFunction run_in_write_tr = nullptr, + SyncTransactReporter* transact_reporter = nullptr, bool is_flx_sync_connection = false); static void get_upload_download_bytes(DB*, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index a29c4f82048..f1c0a65c02d 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -1396,7 +1396,8 @@ void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& } before_download_integration_hook(received_changesets.size()); history.integrate_server_changesets(progress, &downloadable_bytes, received_changesets, version_info, - download_batch_state, logger, {}, get_transact_reporter()); // Throws + download_batch_state, logger, {}, get_transact_reporter(), + m_is_flx_sync_session); // Throws after_download_integration_hook(received_changesets.size()); if (received_changesets.size() == 1) { logger.debug("1 remote changeset integrated, producing client version %1", diff --git a/src/realm/sync/noinst/server/server_history.cpp b/src/realm/sync/noinst/server/server_history.cpp index b7c90675a3a..5a7a22df3aa 100644 --- a/src/realm/sync/noinst/server/server_history.cpp +++ b/src/realm/sync/noinst/server/server_history.cpp @@ -1180,8 +1180,6 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden if (num_changesets > 0) { recip_hist.ensure_instantiated(); // Throws - version_type lowest_last_integrated_local_version = changesets[0].last_integrated_local_version; - // Parse the changesets std::vector parsed_transformed_changesets; parsed_transformed_changesets.resize(num_changesets); @@ -1190,19 +1188,23 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden // Transform the changesets version_type current_server_version = get_server_version(); - bool may_have_causally_unrelated_changes = (current_server_version > lowest_last_integrated_local_version); - if (may_have_causally_unrelated_changes) { - // Merge with causally unrelated changesets, and resolve the - // conflicts if there are any. - TransformHistoryImpl transform_hist{remote_file_ident, *this, recip_hist}; - Transformer& transformer = m_context.get_transformer(); // Throws - transformer.transform_remote_changesets(transform_hist, m_local_file_ident, current_server_version, - parsed_transformed_changesets, &logger); // Throws - } - - // Apply the transformed changesets to the Realm state Group& group = *m_group; Transaction& transaction = dynamic_cast(group); + auto apply = [&](Changeset* c) -> bool { + TempShortCircuitReplication tdr{*this}; // Short-circuit while integrating changes + InstructionApplier applier{transaction}; + applier.apply(*c, &logger); + reset(); // Reset the instruction encoder + return true; + }; + // Merge with causally unrelated changesets, and resolve the + // conflicts if there are any. + TransformHistoryImpl transform_hist{remote_file_ident, *this, recip_hist}; + Transformer& transformer = m_context.get_transformer(); // Throws + transformer.transform_remote_changesets(transform_hist, m_local_file_ident, current_server_version, + parsed_transformed_changesets, std::move(apply), + &logger); // Throws + for (std::size_t i = 0; i < num_changesets; ++i) { REALM_ASSERT(get_instruction_encoder().buffer().size() == 0); const Changeset& changeset = parsed_transformed_changesets[i]; @@ -1214,14 +1216,10 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden ChangesetEncoder::Buffer changeset_buffer; - TempShortCircuitReplication tdr{*this}; // Short-circuit while integrating changes - InstructionApplier applier{transaction}; - applier.apply(parsed_transformed_changesets[i], &logger); // Throws encode_changeset(parsed_transformed_changesets[i], changeset_buffer); // Throws entry.changeset = BinaryData{changeset_buffer.data(), changeset_buffer.size()}; add_sync_history_entry(entry); // Throws - reset(); // Reset the instruction encoder } } diff --git a/src/realm/sync/tools/apply_to_state_command.cpp b/src/realm/sync/tools/apply_to_state_command.cpp index f23a2119061..928bb3eba79 100644 --- a/src/realm/sync/tools/apply_to_state_command.cpp +++ b/src/realm/sync/tools/apply_to_state_command.cpp @@ -291,10 +291,10 @@ int main(int argc, const char** argv) mpark::visit(realm::util::overload{ [&](const DownloadMessage& download_message) { realm::sync::VersionInfo version_info; - history.integrate_server_changesets(download_message.progress, - &download_message.downloadable_bytes, - download_message.changesets, version_info, - download_message.batch_state, *logger, nullptr); + history.integrate_server_changesets( + download_message.progress, &download_message.downloadable_bytes, + download_message.changesets, version_info, download_message.batch_state, *logger, + nullptr, nullptr, bool(flx_sync_arg)); }, [&](const UploadMessage& upload_message) { for (const auto& changeset : upload_message.changesets) { diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index d9b3e168266..6ee49b2dbcf 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -1359,12 +1359,12 @@ struct Merge parsed_changesets, util::Logger* logger) +size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, file_ident_type local_file_ident, + version_type current_local_version, + util::Span parsed_changesets, + util::UniqueFunction changeset_applier, + util::Logger* logger) { REALM_ASSERT(local_file_ident != 0); std::vector our_changesets; + size_t applied_changesets_count = 0; + try { // p points to the beginning of a range of changesets that share the same // "base", i.e. are based on the same local version. @@ -2554,12 +2558,33 @@ void TransformerImpl::transform_remote_changesets(TransformHistory& history, fil begin_version = version; } + bool must_apply_all = false; + if (!our_changesets.empty()) { merge_changesets(local_file_ident, &*p, same_base_range_end - p, our_changesets.data(), our_changesets.size(), logger); // Throws + // We need to apply all transformed changesets if at least one reciprocal changeset was modified + // during OT. + must_apply_all = std::find_if(our_changesets.begin(), our_changesets.end(), [](const Changeset* c) { + return c->is_dirty(); + }) != our_changesets.end(); + } + + while (p != same_base_range_end) { + bool continue_applying = changeset_applier(p); + ++applied_changesets_count; + // It is safe to stop applying the changesets if: + // 1. There are no reciprocal changesets + // 2. No reciprocal changeset was modified + if (!must_apply_all && !continue_applying) { + break; + } + ++p; + } + if (p != same_base_range_end) { + break; } - p = same_base_range_end; our_changesets.clear(); // deliberately not releasing memory } } @@ -2578,6 +2603,8 @@ void TransformerImpl::transform_remote_changesets(TransformHistory& history, fil // NOTE: Any exception thrown during flushing *MUST* lead to rollback of // the current transaction. flush_reciprocal_transform_cache(history); // Throws + + return applied_changesets_count; } @@ -2643,12 +2670,8 @@ void parse_remote_changeset(const Transformer::RemoteChangeset& remote_changeset REALM_ASSERT(remote_changeset.remote_version != 0); ChunkedBinaryInputStream remote_in{remote_changeset.data}; - try { - parse_changeset(remote_in, parsed_changeset); // Throws - } - catch (sync::BadChangesetError& e) { - throw TransformError(e.what()); - } + parse_changeset(remote_in, parsed_changeset); // Throws + parsed_changeset.version = remote_changeset.remote_version; parsed_changeset.last_integrated_remote_version = remote_changeset.last_integrated_local_version; parsed_changeset.origin_timestamp = remote_changeset.origin_timestamp; diff --git a/src/realm/sync/transform.hpp b/src/realm/sync/transform.hpp index 9749bad3d74..4a01dd58163 100644 --- a/src/realm/sync/transform.hpp +++ b/src/realm/sync/transform.hpp @@ -136,7 +136,7 @@ class Transformer { /// changesets and all causally unrelated changesets in the local history. A /// changeset in the local history is causally unrelated if, and only if it /// occurs after the local changeset that produced - /// `remote_changeset.last_integrated_local_version` and is not a produced + /// `remote_changeset.last_integrated_local_version` and it is not produced /// by integration of a changeset received from P. This assumes that /// `remote_changeset.last_integrated_local_version` is set to the local /// version produced by the last local changeset, that was integrated by P @@ -164,14 +164,20 @@ class Transformer { /// changeset is of local origin. The specified identifier must never be /// zero. /// + /// \param changeset_applier Called to to apply each transformed changeset. + /// Returns true if it can continue applying the changests, false otherwise. + /// + /// \return The number of changesets that have been transformed and applied. + /// /// \throw TransformError Thrown if operational transformation fails due to /// a problem with the specified changeset. /// /// FIXME: Consider using std::error_code instead of throwing /// TransformError. - virtual void transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, - version_type current_local_version, util::Span changesets, - util::Logger* = nullptr) = 0; + virtual size_t transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, + version_type current_local_version, util::Span changesets, + util::UniqueFunction changeset_applier, + util::Logger* = nullptr) = 0; virtual ~Transformer() noexcept {} }; @@ -194,8 +200,8 @@ class TransformerImpl : public sync::Transformer { TransformerImpl(); - void transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, - util::Logger*) override; + size_t transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, + util::UniqueFunction, util::Logger*) override; struct Side; struct MajorSide; diff --git a/test/peer.hpp b/test/peer.hpp index 2ca4e3d8c9f..cd19b656e0a 100644 --- a/test/peer.hpp +++ b/test/peer.hpp @@ -486,16 +486,20 @@ inline auto ShortCircuitHistory::integrate_remote_changesets(file_ident_type rem } TransformHistoryImpl transform_hist{*this, remote_file_ident}; - m_transformer->transform_remote_changesets(transform_hist, m_local_file_ident, local_version, changesets, logger); + auto apply = [&](Changeset* c) -> bool { + sync::InstructionApplier applier{*transact}; + applier.apply(*c, logger); - sync::ChangesetEncoder::Buffer assembled_transformed_changeset; + return true; + }; + m_transformer->transform_remote_changesets(transform_hist, m_local_file_ident, local_version, changesets, + std::move(apply), logger); - for (size_t i = 0; i < num_changesets; ++i) { - sync::InstructionApplier applier{*transact}; - applier.apply(changesets[i], logger); + transact->verify(); - transact->verify(); + sync::ChangesetEncoder::Buffer assembled_transformed_changeset; + for (size_t i = 0; i < num_changesets; ++i) { sync::encode_changeset(changesets[i], assembled_transformed_changeset); } From 7c62329bf9caa5f4b16f5b419b467159b46f1739 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 11:01:34 +0200 Subject: [PATCH 05/20] Fix typos/grammar in comments --- doc/algebra_of_changesets.md | 2 +- src/realm/sync/noinst/changeset_index.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/algebra_of_changesets.md b/doc/algebra_of_changesets.md index fb57a476514..5f87bf61048 100644 --- a/doc/algebra_of_changesets.md +++ b/doc/algebra_of_changesets.md @@ -22,7 +22,7 @@ stepwise for a concatenated changeset: S(α, A + B) = S(S(α, A), B) (1) **Definition:** Two changesets `A` and `B`, having the same base state, `α`, are -*equivalent*, written as `A ~ B`, if, and onlæy if they produce the same final +*equivalent*, written as `A ~ B`, if, and only if they produce the same final state, that is, if, and only if `S(α, A) = S(α, B)`. This does not mean that `A` and `B` are equal. diff --git a/src/realm/sync/noinst/changeset_index.hpp b/src/realm/sync/noinst/changeset_index.hpp index 7c9df2f1968..30753782226 100644 --- a/src/realm/sync/noinst/changeset_index.hpp +++ b/src/realm/sync/noinst/changeset_index.hpp @@ -13,7 +13,7 @@ namespace _impl { /// The ChangesetIndex is responsible for keeping track of exactly which /// instructions touch which objects. It does this by recording ranges of -/// instructions in changesets, such that the merge algorithm can make do with +/// instructions in changesets, such that the merge algorithm can do with /// just merging the "relevant" instructions. Due to the semantics of link /// nullification, instruction ranges for objects that have ever been /// "connected" by a link instruction must be joined together. In other words, From 54b35d15f5d744387c0927afe50e4be83c7d8da6 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 11:03:18 +0200 Subject: [PATCH 06/20] Fix hack in async write unit test --- test/object-store/realm.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index ac70a115fd2..8f111954553 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -1177,12 +1177,10 @@ TEST_CASE("SharedRealm: async writes") { auto write = db->start_write(); sema.add_stone(); - // We want to wait until the main thread is waiting for the - // lock, which we can't do deterministically. If this sleep - // is too short the test will still pass and it'll just fail - // to test the intended code path. - std::chrono::milliseconds wait_time{500}; - std::this_thread::sleep_for(wait_time); + // Wait until the main thread is waiting for the lock. + while (!db->waiting_for_write_lock()) { + millisleep(1); + } write->close(); }); From 6ef9f0e29917b7db3935d43ef806d0bd7de7a3ad Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 11:04:22 +0200 Subject: [PATCH 07/20] Add sync unit tests --- test/test_sync.cpp | 193 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) diff --git a/test/test_sync.cpp b/test/test_sync.cpp index e6e9ebbabd0..05479fd23eb 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -4304,6 +4305,198 @@ TEST(Sync_MergeLargeChangesets) CHECK_EQUAL(table->size(), 2 * number_of_rows); } + +TEST(Sync_MergeMultipleChangesets) +{ + constexpr int number_of_changesets = 100; + constexpr int number_of_instructions = 10; + + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); + + { + WriteTransaction wt(db_1); + TableRef table = wt.add_table("class_table name"); + table->add_column(type_Int, "integer column"); + wt.commit(); + } + + { + WriteTransaction wt(db_2); + TableRef table = wt.add_table("class_table name"); + table->add_column(type_Int, "integer column"); + wt.commit(); + } + + { + for (int i = 0; i < number_of_changesets; ++i) { + WriteTransaction wt(db_1); + TableRef table = wt.get_table("class_table name"); + for (int j = 0; j < number_of_instructions; ++j) { + auto obj = table->create_object(); + obj.set("integer column", 2 * j); + } + wt.commit(); + } + } + + { + for (int i = 0; i < number_of_changesets; ++i) { + WriteTransaction wt(db_2); + TableRef table = wt.get_table("class_table name"); + for (int j = 0; j < number_of_instructions; ++j) { + auto obj = table->create_object(); + obj.set("integer column", 2 * j + 1); + } + wt.commit(); + } + } + + { + TEST_DIR(dir); + MultiClientServerFixture fixture(2, 1, dir, test_context); + + Session session_1 = fixture.make_session(0, db_1); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of first client. + fixture.start_server(0); + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); + // Stop first client. + fixture.stop_client(0); + + // Start the second client and upload their changes. + // Wait to integrate changes from the first client. + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + } + + ReadTransaction read_1(db_1); + ReadTransaction read_2(db_2); + const Group& group1 = read_1; + const Group& group2 = read_2; + ConstTableRef table1 = group1.get_table("class_table name"); + ConstTableRef table2 = group2.get_table("class_table name"); + CHECK_EQUAL(table1->size(), number_of_changesets * number_of_instructions); + CHECK_EQUAL(table2->size(), 2 * number_of_changesets * number_of_instructions); +} + +TEST(Sync_UserInterruptsIntegrationOfRemoteChanges) +{ + constexpr int number_of_changesets = 10; + constexpr int number_of_instructions = 100; + + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); + + { + WriteTransaction wt(db_1); + TableRef table = wt.add_table("class_table name"); + table->add_column(type_String, "string column"); + wt.commit(); + } + + { + WriteTransaction wt(db_2); + // Use different table name to avoid schema conflicts when merging changes from this client to the other + // client. + TableRef table = wt.add_table("class_table name2"); + table->add_column(type_String, "string column"); + wt.commit(); + } + + { + for (int i = 0; i < number_of_changesets; ++i) { + WriteTransaction wt(db_1); + TableRef table = wt.get_table("class_table name"); + for (int j = 0; j < number_of_instructions; ++j) { + auto obj = table->create_object(); + obj.set("string column", std::string(1024, 'a' + (j % 26))); + } + wt.commit(); + } + } + + { + for (int i = 0; i < number_of_changesets; ++i) { + WriteTransaction wt(db_2); + TableRef table = wt.get_table("class_table name2"); + for (int j = 0; j < number_of_instructions; ++j) { + auto obj = table->create_object(); + obj.set("string column", std::string(1000, char('a' + j % 26))); + } + wt.commit(); + } + } + + { + TEST_DIR(dir); + MultiClientServerFixture fixture(2, 1, dir, test_context); + + std::future future; + version_type user_commit_version = UINT_FAST64_MAX; + + Session::Config config; + config.on_before_download_integration = [&](size_t num_changesets) { + CHECK(num_changesets > 0); + + future = std::async([&] { + auto write = db_1->start_write(); + // Keep the transaction open until the sync client is waiting to acquire the write lock. + while (!db_1->waiting_for_write_lock()) { + millisleep(1); + } + write->close(); + + // Sync client holds the write lock now, so commit a local change. + WriteTransaction wt(db_1); + TableRef table = wt.get_table("class_table name"); + auto obj = table->create_object(); + obj.set("string column", "foobar"); + user_commit_version = wt.commit(); + }); + }; + Session session_1 = fixture.make_session(0, db_1, std::move(config)); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of second client. + fixture.start_server(0); + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + // Stop second client. + fixture.stop_client(1); + + // Start the first client and upload their changes. + // Wait to integrate changes from the second client. + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); + + future.wait(); + + // Check that local change was commited before all remote changes were integrated. + ReadTransaction rt(db_1); + CHECK(user_commit_version < rt.get_version()); + } + + ReadTransaction read_1(db_1); + ReadTransaction read_2(db_2); + const Group& group_1 = read_1; + const Group& group_2 = read_1; + ConstTableRef table_1 = group_1.get_table("class_table name"); + CHECK_EQUAL(table_1->size(), number_of_changesets * number_of_instructions + 1); + ConstTableRef table_2 = group_2.get_table("class_table name2"); + CHECK_EQUAL(table_2->size(), number_of_changesets * number_of_instructions); +} + #endif // REALM_PLATFORM_WIN32 From 793517e4c1454df46523d87932dfc4e6c59f9943 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 13:48:04 +0200 Subject: [PATCH 08/20] Adapt benchmark code to use proper arguments in function call --- test/benchmark-sync/bench_transform.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/benchmark-sync/bench_transform.cpp b/test/benchmark-sync/bench_transform.cpp index 54708762a45..6565c2b2a3f 100644 --- a/test/benchmark-sync/bench_transform.cpp +++ b/test/benchmark-sync/bench_transform.cpp @@ -312,7 +312,7 @@ int main() std::string results_file_stem = realm::test_util::get_test_path_prefix() + "results"; bench::results = std::make_unique(max_lead_text_width, "benchmark-sync", results_file_stem.c_str()); - auto exit_status = test_all(nullptr, true); + auto exit_status = test_all(); // Save to file when deallocated. bench::results.reset(); return exit_status; From 433d591644ae9b2d50895a3708d34df06df1e9f0 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 12 Sep 2022 14:49:09 +0200 Subject: [PATCH 09/20] Clang-format + minor changes --- test/benchmark-sync/bench_transform.cpp | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/test/benchmark-sync/bench_transform.cpp b/test/benchmark-sync/bench_transform.cpp index 6565c2b2a3f..07d34a1b843 100644 --- a/test/benchmark-sync/bench_transform.cpp +++ b/test/benchmark-sync/bench_transform.cpp @@ -66,13 +66,11 @@ void transform_transactions(TestContext& test_context) Session::Config session_config; session_config.on_before_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - + CHECK(num_changesets > 0); t.reset(); }; session_config.on_after_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - + CHECK(num_changesets > 0); results->submit(ident.c_str(), t.get_elapsed_time()); }; @@ -138,13 +136,11 @@ void transform_instructions(TestContext& test_context) Session::Config session_config; session_config.on_before_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - + CHECK(num_changesets > 0); t.reset(); }; session_config.on_after_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - + CHECK(num_changesets > 0); results->submit(ident.c_str(), t.get_elapsed_time()); }; Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); @@ -208,12 +204,10 @@ void connected_objects(TestContext& test_context) Session::Config session_config; session_config.on_before_download_integration = [&](size_t num_changesets) { CHECK(num_changesets > 0); - t.reset(); }; session_config.on_after_download_integration = [&](size_t num_changesets) { CHECK(num_changesets > 0); - results->submit(ident.c_str(), t.get_elapsed_time()); }; Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); @@ -318,4 +312,3 @@ int main() return exit_status; } #endif // REALM_IOS - From aa741071da86ef91cb9dd69ba4498281d143290c Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Fri, 16 Sep 2022 18:40:35 +0200 Subject: [PATCH 10/20] Code review changes --- src/realm/db.cpp | 6 +- src/realm/db.hpp | 6 +- src/realm/object-store/sync/sync_session.cpp | 17 +++-- src/realm/sync/client.cpp | 52 ++++++++------- src/realm/sync/client.hpp | 10 ++- src/realm/sync/config.hpp | 12 ++-- src/realm/sync/noinst/client_history_impl.cpp | 59 +++++++++-------- src/realm/sync/noinst/client_history_impl.hpp | 13 ++-- src/realm/sync/noinst/client_impl_base.cpp | 17 +++-- src/realm/sync/noinst/client_impl_base.hpp | 7 +- .../sync/noinst/server/server_history.cpp | 2 +- src/realm/sync/protocol.hpp | 1 + .../sync/tools/apply_to_state_command.cpp | 8 +-- src/realm/sync/transform.cpp | 38 ++++------- src/realm/sync/transform.hpp | 18 +++-- test/benchmark-sync/bench_transform.cpp | 66 ++++++++++++------- test/object-store/realm.cpp | 2 +- test/object-store/sync/flx_sync.cpp | 5 +- test/peer.hpp | 2 +- test/test_sync.cpp | 43 ++++++------ 20 files changed, 210 insertions(+), 174 deletions(-) diff --git a/src/realm/db.cpp b/src/realm/db.cpp index db32504586f..cb01f3b7fed 100644 --- a/src/realm/db.cpp +++ b/src/realm/db.cpp @@ -1518,14 +1518,14 @@ void DB::close_internal(std::unique_lock lock, bool allow_ope } } -bool DB::waiting_for_write_lock() const +bool DB::other_writers_waiting_for_lock() const { SharedInfo* info = m_file_map.get_addr(); uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed); uint32_t next_served = info->next_served.load(std::memory_order_relaxed); - // When holding the write mutex, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and - // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write mutex. + // When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and + // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock. return next_ticket > next_served + 1; } diff --git a/src/realm/db.hpp b/src/realm/db.hpp index 0d043e012b0..5f426b14321 100644 --- a/src/realm/db.hpp +++ b/src/realm/db.hpp @@ -408,9 +408,9 @@ class DB : public std::enable_shared_from_this { void claim_sync_agent(); void release_sync_agent(); - /// Returs true if there are threads waiting to acquire the write mutex, false otherwise. - /// To be used only when already holding the mutex. - bool waiting_for_write_lock() const; + /// Returns true if there are threads waiting to acquire the write lock, false otherwise. + /// To be used only when already holding the lock. + bool other_writers_waiting_for_lock() const; protected: explicit DB(const DBOptions& options); // Is this ever used? diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 723b67295ea..40c9fab7979 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -740,9 +740,10 @@ void SyncSession::create_sync_session() session_config.simulate_integration_error = sync_config.simulate_integration_error; if (sync_config.on_download_message_received_hook) { session_config.on_download_message_received_hook = - [hook = sync_config.on_download_message_received_hook, anchor = weak_from_this()]( - const sync::SyncProgress& progress, int64_t query_version, sync::DownloadBatchState batch_state) { - hook(anchor, progress, query_version, batch_state); + [hook = sync_config.on_download_message_received_hook, + anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version, + sync::DownloadBatchState batch_state, size_t num_changesets) { + hook(anchor, progress, query_version, batch_state, num_changesets); }; } if (sync_config.on_bootstrap_message_processed_hook) { @@ -753,8 +754,14 @@ void SyncSession::create_sync_session() return hook(anchor, progress, query_version, batch_state); }; } - session_config.on_before_download_integration = sync_config.on_before_download_integration; - session_config.on_after_download_integration = sync_config.on_after_download_integration; + if (sync_config.on_download_message_integrated_hook) { + session_config.on_download_message_integrated_hook = + [hook = sync_config.on_download_message_integrated_hook, + anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version, + sync::DownloadBatchState batch_state, size_t num_changesets) { + hook(anchor, progress, query_version, batch_state, num_changesets); + }; + } { std::string sync_route = m_sync_manager->sync_route(); diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 96ce974cc96..3de6f2ea914 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -244,11 +244,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac util::UniqueFunction m_progress_handler; util::UniqueFunction m_connection_state_change_listener; - std::function m_on_download_message_received_hook; + std::function m_on_download_message_received_hook; std::function m_on_bootstrap_message_processed_hook; - - std::function m_on_before_download_integration; - std::function m_on_after_download_integration; + std::function on_download_message_integrated_hook; std::shared_ptr m_flx_subscription_store; int64_t m_flx_active_version = 0; @@ -734,12 +732,7 @@ void SessionImpl::on_resumed() bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state, int64_t query_version, const ReceivedChangesets& received_changesets) { - if (!m_is_flx_sync_session) { - return false; - } - - // If this is a steady state DOWNLOAD, no need for special handling. - if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) { + if (is_steady_state_download_message(batch_state, query_version)) { return false; } @@ -817,9 +810,11 @@ void SessionImpl::process_pending_flx_bootstrap() REALM_ASSERT_3(count, <=, pending_batch.changesets.size()); bootstrap_store->pop_front_pending(tr, count); }, - get_transact_reporter(), true); + get_transact_reporter()); progress = *pending_batch.progress; + download_message_integrated_hook(progress, query_version, batch_state, pending_batch.changesets.size()); + logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version " "%3. %4 changesets remaining in bootstrap", pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version, @@ -864,30 +859,40 @@ void SessionImpl::non_sync_flx_completion(int64_t version) } void SessionImpl::receive_download_message_hook(const SyncProgress& progress, int64_t query_version, - DownloadBatchState batch_state) + DownloadBatchState batch_state, size_t num_changesets) { if (REALM_LIKELY(!m_wrapper.m_on_download_message_received_hook)) { return; } - m_wrapper.m_on_download_message_received_hook(progress, query_version, batch_state); + m_wrapper.m_on_download_message_received_hook(progress, query_version, batch_state, num_changesets); } -void SessionImpl::before_download_integration_hook(size_t num_changesets) +void SessionImpl::download_message_integrated_hook(const SyncProgress& progress, int64_t query_version, + DownloadBatchState batch_state, size_t num_changesets) { - if (REALM_LIKELY(!m_wrapper.m_on_before_download_integration)) { + if (REALM_LIKELY(!m_wrapper.on_download_message_integrated_hook)) { return; } - m_wrapper.m_on_before_download_integration(num_changesets); + m_wrapper.on_download_message_integrated_hook(progress, query_version, batch_state, num_changesets); } -void SessionImpl::after_download_integration_hook(size_t num_changesets) +bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version) { - if (REALM_LIKELY(!m_wrapper.m_on_after_download_integration)) { - return; + if (batch_state == DownloadBatchState::SteadyState) { + return true; } - m_wrapper.m_on_after_download_integration(num_changesets); + if (!m_is_flx_sync_session) { + return true; + } + + // If this is a steady state DOWNLOAD, no need for special handling. + if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) { + return true; + } + + return false; } // ################ SessionWrapper ################ @@ -913,8 +918,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr= m_flx_last_seen_version); REALM_ASSERT(new_version >= m_flx_active_version); + REALM_ASSERT(batch_state != DownloadBatchState::SteadyState); SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy switch (batch_state) { + case DownloadBatchState::SteadyState: + // Cannot be called with this value. This is to make compiler happy. + return; case DownloadBatchState::LastInBatch: if (m_flx_active_version == new_version) { return; diff --git a/src/realm/sync/client.hpp b/src/realm/sync/client.hpp index 2d2ec1dcf0a..873f4908a2b 100644 --- a/src/realm/sync/client.hpp +++ b/src/realm/sync/client.hpp @@ -316,17 +316,15 @@ class Session { /// Will be called after a download message is received and validated by /// the client but befefore it's been transformed or applied. To be used in /// testing only. - std::function + std::function on_download_message_received_hook; /// Will be called after each bootstrap message is added to the pending bootstrap store, /// but before processing a finalized bootstrap. For testing only. std::function on_bootstrap_message_processed_hook; - - /// Called before each download message is integrated. For testing only. - std::function on_before_download_integration; - /// Called after each download message is integrated. For testing only. - std::function on_after_download_integration; + /// Will be called after a download message is integrated. For testing only. + std::function + on_download_message_integrated_hook; }; /// \brief Start a new session for the specified client-side Realm. diff --git a/src/realm/sync/config.hpp b/src/realm/sync/config.hpp index e5ed6d3b308..5c2082e5c4a 100644 --- a/src/realm/sync/config.hpp +++ b/src/realm/sync/config.hpp @@ -174,17 +174,17 @@ struct SyncConfig { // Will be called after a download message is received and validated by the client but befefore it's been // transformed or applied. To be used in testing only. - std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)> + std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState, + size_t)> on_download_message_received_hook; // Will be called after each bootstrap message is added to the pending bootstrap store, but before // processing a finalized bootstrap. For testing only. std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)> on_bootstrap_message_processed_hook; - - // Called before each download message is integrated on the sync worker thread. For testing only. - std::function on_before_download_integration; - // Called after each download message is integrated on the sync worker thread. For testing only. - std::function on_after_download_integration; + // Will be called after a download message is integrated. For testing only. + std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState, + size_t)> + on_download_message_integrated_hook; bool simulate_integration_error = false; diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 83ad1da573d..d81586222b5 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -324,7 +324,7 @@ void ClientHistory::find_uploadable_changesets(UploadCursor& upload_progress, ve const auto sync_history_size = arrays.changesets.size(); const auto sync_history_base_version = rt->get_version() - sync_history_size; - std::size_t accum_byte_size_soft_limit = 0x20000; // 128 KB + std::size_t accum_byte_size_soft_limit = 131072; // 128 KB std::size_t accum_byte_size_hard_limit = 16777216; // server-imposed limit std::size_t accum_byte_size = 0; @@ -380,10 +380,9 @@ void ClientHistory::integrate_server_changesets( const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, util::Span incoming_changesets, VersionInfo& version_info, DownloadBatchState batch_state, util::Logger& logger, util::UniqueFunction run_in_write_tr, - SyncTransactReporter* transact_reporter, bool is_flx_sync_connection) + SyncTransactReporter* transact_reporter) { REALM_ASSERT(incoming_changesets.size() != 0); - REALM_ASSERT(is_flx_sync_connection || batch_state == DownloadBatchState::LastInBatch); std::vector changesets; changesets.resize(incoming_changesets.size()); // Throws @@ -401,13 +400,14 @@ void ClientHistory::integrate_server_changesets( } VersionID new_version{0, 0}; - constexpr std::size_t commit_byte_size_limit = 0x19000; // 100 KB - - // Ideally, this loop runs only once, but it can run up to `incoming_changesets.size()` times, - // depending on how many times the sync client needs to yield the write lock to allow - // the user to commit their changes. - // In each iteration, at least one changeset is transformed and committed. - for (size_t changeset_ndx = 0; changeset_ndx < incoming_changesets.size();) { + constexpr std::size_t commit_byte_size_limit = 102400; // 100 KB + auto num_changesets = incoming_changesets.size(); + util::Span changesets_to_integrate(changesets); + + // Ideally, this loop runs only once, but it can run up to `incoming_changesets.size()` times, depending on how + // many times the sync client needs to yield the write lock to allow the user to commit their changes. In each + // iteration, at least one changeset is transformed and committed. + while (!changesets_to_integrate.empty()) { TransactionRef transact = m_db->start_write(); // Throws VersionID old_version = transact->get_version_of_current_transaction(); version_type local_version = old_version.version; @@ -421,9 +421,8 @@ void ClientHistory::integrate_server_changesets( size_t changesets_transformed_count = 0; try { - for (std::size_t i = changeset_ndx; i < incoming_changesets.size(); ++i) { - const RemoteChangeset& changeset = incoming_changesets[i]; - REALM_ASSERT(changeset.last_integrated_local_version <= local_version); + for (auto& changeset : changesets_to_integrate) { + REALM_ASSERT(changeset.last_integrated_remote_version <= local_version); REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id); // It is possible that the synchronization history has been trimmed @@ -436,34 +435,35 @@ void ClientHistory::integrate_server_changesets( // when this situation occurs. // // See trim_sync_history() for further details. - if (changesets[i].last_integrated_remote_version < m_sync_history_base_version) - changesets[i].last_integrated_remote_version = m_sync_history_base_version; + if (changeset.last_integrated_remote_version < m_sync_history_base_version) + changeset.last_integrated_remote_version = m_sync_history_base_version; } if (m_replication.apply_server_changes()) { Transformer& transformer = get_transformer(); // Throws - auto changeset_applier = [&](Changeset* transformed_changeset) -> bool { + auto changeset_applier = [&](const Changeset* transformed_changeset) -> bool { InstructionApplier applier{*transact}; { TempShortCircuitReplication tscr{m_replication}; applier.apply(*transformed_changeset, &logger); // Throws } - return !(m_db->waiting_for_write_lock() && transact->get_commit_size() >= commit_byte_size_limit); + return !(m_db->other_writers_waiting_for_lock() && + transact->get_commit_size() >= commit_byte_size_limit); }; - changesets_transformed_count = transformer.transform_remote_changesets( - *this, sync_file_id, local_version, - util::Span{changesets.data() + changeset_ndx, changesets.size() - changeset_ndx}, - std::move(changeset_applier), &logger); // Throws + auto it = transformer.transform_remote_changesets(*this, sync_file_id, local_version, + changesets_to_integrate, + std::move(changeset_applier), &logger); // Throws + changesets_transformed_count = std::distance(changesets_to_integrate.begin(), it); } else { // Skip over all changesets if they don't need to be transformed and applied. - changesets_transformed_count = incoming_changesets.size(); + changesets_transformed_count = changesets_to_integrate.size(); } // Compute downloaded bytes only after we know how many remote changesets are going to be commited in this // transaction. - for (std::size_t i = changeset_ndx; i < changeset_ndx + changesets_transformed_count; ++i) { + for (std::size_t i = 0; i < changesets_transformed_count; ++i) { downloaded_bytes_in_transaction += incoming_changesets[i].original_changeset_size; } } @@ -476,7 +476,7 @@ void ClientHistory::integrate_server_changesets( util::format("Failed to transform received changeset: %1", e.what())); } - changeset_ndx += changesets_transformed_count; + logger.debug("Integrated %1 changesets out of %2", changesets_transformed_count, num_changesets); // downloaded_bytes always contains the total number of downloaded bytes // from the Realm. downloaded_bytes must be persisted in the Realm, since @@ -488,16 +488,17 @@ void ClientHistory::integrate_server_changesets( downloaded_bytes += downloaded_bytes_in_transaction; root.set(s_progress_downloaded_bytes_iip, RefOrTagged::make_tagged(downloaded_bytes)); // Throws - const RemoteChangeset& last_changeset = incoming_changesets[changeset_ndx - 1]; + const RemoteChangeset& last_changeset = incoming_changesets[changesets_transformed_count - 1]; + changesets_to_integrate = changesets_to_integrate.sub_span(changesets_transformed_count); + incoming_changesets = incoming_changesets.sub_span(changesets_transformed_count); // During the bootstrap phase in flexible sync, the server sends multiple download messages with the same // synthetic server version that represents synthetic changesets generated from state on the server. - if (is_flx_sync_connection && batch_state == DownloadBatchState::LastInBatch && - changeset_ndx == incoming_changesets.size()) { + if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) { update_sync_progress(progress, downloadable_bytes, transact); // Throws } - // Always update progress in PBS. - else if (!is_flx_sync_connection) { + // Always update progress for download messages from steady state. + else if (batch_state == DownloadBatchState::SteadyState) { auto partial_progress = progress; partial_progress.download.server_version = last_changeset.remote_version; partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version; diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index af79dbf5620..5fb50315f31 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -102,7 +102,7 @@ class ClientHistory final : public _impl::History, public TransformHistory { virtual void report_sync_transact(VersionID old_version, VersionID new_version) = 0; protected: - ~SyncTransactReporter() {} + ~SyncTransactReporter() = default; }; @@ -249,11 +249,12 @@ class ClientHistory final : public _impl::History, public TransformHistory { /// \param transact_reporter An optional callback which will be called with the /// version immediately processing the sync transaction and that of the sync /// transaction. - void integrate_server_changesets( - const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, - util::Span changesets, VersionInfo& new_version, DownloadBatchState download_type, - util::Logger&, util::UniqueFunction run_in_write_tr = nullptr, - SyncTransactReporter* transact_reporter = nullptr, bool is_flx_sync_connection = false); + void + integrate_server_changesets(const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, + util::Span changesets, VersionInfo& new_version, + DownloadBatchState download_type, util::Logger&, + util::UniqueFunction run_in_write_tr = nullptr, + SyncTransactReporter* transact_reporter = nullptr); static void get_upload_download_bytes(DB*, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index f1c0a65c02d..493e085f604 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -1387,18 +1387,15 @@ void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& { auto& history = repl.get_history(); if (received_changesets.empty()) { - if (download_batch_state != DownloadBatchState::LastInBatch) { + if (download_batch_state == DownloadBatchState::MoreToCome) { throw IntegrationException(ClientError::bad_progress, "received empty download message that was not the last in batch"); } history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws return; } - before_download_integration_hook(received_changesets.size()); history.integrate_server_changesets(progress, &downloadable_bytes, received_changesets, version_info, - download_batch_state, logger, {}, get_transact_reporter(), - m_is_flx_sync_session); // Throws - after_download_integration_hook(received_changesets.size()); + download_batch_state, logger, {}, get_transact_reporter()); // Throws if (received_changesets.size() == 1) { logger.debug("1 remote changeset integrated, producing client version %1", version_info.sync_version.version); // Throws @@ -2084,6 +2081,10 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f DownloadBatchState batch_state, int64_t query_version, const ReceivedChangesets& received_changesets) { + if (is_steady_state_download_message(batch_state, query_version)) { + batch_state = DownloadBatchState::SteadyState; + } + logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, " "latest_server_version=%3, latest_server_version_salt=%4, " "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, " @@ -2091,7 +2092,7 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f progress.download.server_version, progress.download.last_integrated_client_version, progress.latest_server_version.version, progress.latest_server_version.salt, progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes, - batch_state == DownloadBatchState::LastInBatch, query_version, received_changesets.size()); // Throws + batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws // Ignore the message if the deactivation process has been initiated, // because in that case, the associated Realm must not be accessed any @@ -2151,7 +2152,7 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f } } - receive_download_message_hook(progress, query_version, batch_state); + receive_download_message_hook(progress, query_version, batch_state, received_changesets.size()); if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) { clear_resumption_delay_state(); @@ -2160,6 +2161,8 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws + download_message_integrated_hook(progress, query_version, batch_state, received_changesets.size()); + // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect // after a retryable session error. clear_resumption_delay_state(); diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index c4fe7aa36d4..0ea5acee96f 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -1078,9 +1078,10 @@ class ClientImpl::Session { bool check_received_sync_progress(const SyncProgress&, int&) noexcept; void check_for_upload_completion(); void check_for_download_completion(); - void receive_download_message_hook(const SyncProgress&, int64_t, DownloadBatchState); - void before_download_integration_hook(size_t); - void after_download_integration_hook(size_t); + void receive_download_message_hook(const SyncProgress&, int64_t, DownloadBatchState, size_t); + void download_message_integrated_hook(const SyncProgress&, int64_t, DownloadBatchState, size_t); + + bool is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version); friend class Connection; }; diff --git a/src/realm/sync/noinst/server/server_history.cpp b/src/realm/sync/noinst/server/server_history.cpp index 5a7a22df3aa..32196179d5a 100644 --- a/src/realm/sync/noinst/server/server_history.cpp +++ b/src/realm/sync/noinst/server/server_history.cpp @@ -1190,7 +1190,7 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden version_type current_server_version = get_server_version(); Group& group = *m_group; Transaction& transaction = dynamic_cast(group); - auto apply = [&](Changeset* c) -> bool { + auto apply = [&](const Changeset* c) -> bool { TempShortCircuitReplication tdr{*this}; // Short-circuit while integrating changes InstructionApplier applier{transaction}; applier.apply(*c, &logger); diff --git a/src/realm/sync/protocol.hpp b/src/realm/sync/protocol.hpp index e28d9f21e3c..4b8f772604a 100644 --- a/src/realm/sync/protocol.hpp +++ b/src/realm/sync/protocol.hpp @@ -134,6 +134,7 @@ struct DownloadCursor { enum class DownloadBatchState { MoreToCome, LastInBatch, + SteadyState, }; /// Checks that `dc.last_integrated_client_version` is zero if diff --git a/src/realm/sync/tools/apply_to_state_command.cpp b/src/realm/sync/tools/apply_to_state_command.cpp index 928bb3eba79..f23a2119061 100644 --- a/src/realm/sync/tools/apply_to_state_command.cpp +++ b/src/realm/sync/tools/apply_to_state_command.cpp @@ -291,10 +291,10 @@ int main(int argc, const char** argv) mpark::visit(realm::util::overload{ [&](const DownloadMessage& download_message) { realm::sync::VersionInfo version_info; - history.integrate_server_changesets( - download_message.progress, &download_message.downloadable_bytes, - download_message.changesets, version_info, download_message.batch_state, *logger, - nullptr, nullptr, bool(flx_sync_arg)); + history.integrate_server_changesets(download_message.progress, + &download_message.downloadable_bytes, + download_message.changesets, version_info, + download_message.batch_state, *logger, nullptr); }, [&](const UploadMessage& upload_message) { for (const auto& changeset : upload_message.changesets) { diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index 6ee49b2dbcf..4fd31f4910d 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -2519,23 +2519,21 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes #endif // LCOV_EXCL_STOP REALM_DEBUG } -size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, file_ident_type local_file_ident, - version_type current_local_version, - util::Span parsed_changesets, - util::UniqueFunction changeset_applier, - util::Logger* logger) +TransformerImpl::iterator TransformerImpl::transform_remote_changesets( + TransformHistory& history, file_ident_type local_file_ident, version_type current_local_version, + util::Span parsed_changesets, util::UniqueFunction changeset_applier, + util::Logger* logger) { REALM_ASSERT(local_file_ident != 0); std::vector our_changesets; - size_t applied_changesets_count = 0; + // p points to the beginning of a range of changesets that share the same + // "base", i.e. are based on the same local version. + auto p = parsed_changesets.begin(); + auto parsed_changesets_end = parsed_changesets.end(); try { - // p points to the beginning of a range of changesets that share the same - // "base", i.e. are based on the same local version. - auto p = parsed_changesets.begin(); - auto parsed_changesets_end = parsed_changesets.end(); while (p != parsed_changesets_end) { // Find the range of incoming changesets that share the same // last_integrated_local_version, which means we can merge them in one go. @@ -2565,24 +2563,16 @@ size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, f our_changesets.size(), logger); // Throws // We need to apply all transformed changesets if at least one reciprocal changeset was modified // during OT. - must_apply_all = std::find_if(our_changesets.begin(), our_changesets.end(), [](const Changeset* c) { - return c->is_dirty(); - }) != our_changesets.end(); + must_apply_all = std::any_of(our_changesets.begin(), our_changesets.end(), [](const Changeset* c) { + return c->is_dirty(); + }); } - while (p != same_base_range_end) { - bool continue_applying = changeset_applier(p); - ++applied_changesets_count; + for (auto continue_applying = true; p != same_base_range_end && continue_applying; ++p) { // It is safe to stop applying the changesets if: // 1. There are no reciprocal changesets // 2. No reciprocal changeset was modified - if (!must_apply_all && !continue_applying) { - break; - } - ++p; - } - if (p != same_base_range_end) { - break; + continue_applying = changeset_applier(p) || must_apply_all; } our_changesets.clear(); // deliberately not releasing memory @@ -2604,7 +2594,7 @@ size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, f // the current transaction. flush_reciprocal_transform_cache(history); // Throws - return applied_changesets_count; + return p; } diff --git a/src/realm/sync/transform.hpp b/src/realm/sync/transform.hpp index 4a01dd58163..bcceb6774f3 100644 --- a/src/realm/sync/transform.hpp +++ b/src/realm/sync/transform.hpp @@ -127,6 +127,8 @@ class Transformer { public: class RemoteChangeset; + using iterator = util::Span::iterator; + /// Produce operationally transformed versions of the specified changesets, /// which are assumed to be received from a particular remote peer, P, /// represented by the specified transform history. Note that P is not @@ -167,17 +169,18 @@ class Transformer { /// \param changeset_applier Called to to apply each transformed changeset. /// Returns true if it can continue applying the changests, false otherwise. /// - /// \return The number of changesets that have been transformed and applied. + /// \return Iterator to the next changeset that needs to be transformed and + /// applied, or `end()` if there is no changeset left. /// /// \throw TransformError Thrown if operational transformation fails due to /// a problem with the specified changeset. /// /// FIXME: Consider using std::error_code instead of throwing /// TransformError. - virtual size_t transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, - version_type current_local_version, util::Span changesets, - util::UniqueFunction changeset_applier, - util::Logger* = nullptr) = 0; + virtual iterator transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, + version_type current_local_version, util::Span changesets, + util::UniqueFunction changeset_applier, + util::Logger* = nullptr) = 0; virtual ~Transformer() noexcept {} }; @@ -197,11 +200,12 @@ class TransformerImpl : public sync::Transformer { using Instruction = sync::Instruction; using TransformHistory = sync::TransformHistory; using version_type = sync::version_type; + using iterator = sync::Transformer::iterator; TransformerImpl(); - size_t transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, - util::UniqueFunction, util::Logger*) override; + iterator transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, + util::UniqueFunction, util::Logger*) override; struct Side; struct MajorSide; diff --git a/test/benchmark-sync/bench_transform.cpp b/test/benchmark-sync/bench_transform.cpp index 07d34a1b843..ad4f533b7e6 100644 --- a/test/benchmark-sync/bench_transform.cpp +++ b/test/benchmark-sync/bench_transform.cpp @@ -65,14 +65,20 @@ void transform_transactions(TestContext& test_context) Timer t{Timer::type_RealTime}; Session::Config session_config; - session_config.on_before_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - t.reset(); - }; - session_config.on_after_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - results->submit(ident.c_str(), t.get_elapsed_time()); - }; + session_config.on_download_message_received_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + t.reset(); + }; + session_config.on_download_message_integrated_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + results->submit(ident.c_str(), t.get_elapsed_time()); + }; Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); fixture.bind_session(session_1, 0, "/test"); @@ -135,14 +141,20 @@ void transform_instructions(TestContext& test_context) Timer t{Timer::type_RealTime}; Session::Config session_config; - session_config.on_before_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - t.reset(); - }; - session_config.on_after_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - results->submit(ident.c_str(), t.get_elapsed_time()); - }; + session_config.on_download_message_received_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + t.reset(); + }; + session_config.on_download_message_integrated_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + results->submit(ident.c_str(), t.get_elapsed_time()); + }; Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); fixture.bind_session(session_1, 0, "/test"); Session session_2 = fixture.make_session(1, db_2); @@ -202,14 +214,20 @@ void connected_objects(TestContext& test_context) Timer t{Timer::type_RealTime}; Session::Config session_config; - session_config.on_before_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - t.reset(); - }; - session_config.on_after_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - results->submit(ident.c_str(), t.get_elapsed_time()); - }; + session_config.on_download_message_received_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + t.reset(); + }; + session_config.on_download_message_integrated_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + results->submit(ident.c_str(), t.get_elapsed_time()); + }; Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); fixture.bind_session(session_1, 0, "/test"); Session session_2 = fixture.make_session(1, db_2); diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 8f111954553..8497ba7d4fb 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -1178,7 +1178,7 @@ TEST_CASE("SharedRealm: async writes") { sema.add_stone(); // Wait until the main thread is waiting for the lock. - while (!db->waiting_for_write_lock()) { + while (!db->other_writers_waiting_for_lock()) { millisleep(1); } write->close(); diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 4fa4432ce23..fba43459619 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -1037,7 +1037,8 @@ TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][f config.sync_config->on_download_message_received_hook = [promise = std::move(shared_promise)]( std::weak_ptr weak_session, const sync::SyncProgress&, int64_t query_version, - sync::DownloadBatchState batch_state) mutable { + sync::DownloadBatchState batch_state, + size_t) mutable { auto session = weak_session.lock(); if (!session) { return; @@ -1693,7 +1694,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app] interrupted_realm_config.sync_config->on_download_message_received_hook = [&, promise = std::move(shared_saw_valid_state_promise)](std::weak_ptr weak_session, const sync::SyncProgress&, int64_t query_version, - sync::DownloadBatchState batch_state) { + sync::DownloadBatchState batch_state, size_t) { auto session = weak_session.lock(); if (!session) { return; diff --git a/test/peer.hpp b/test/peer.hpp index cd19b656e0a..19149bbca80 100644 --- a/test/peer.hpp +++ b/test/peer.hpp @@ -486,7 +486,7 @@ inline auto ShortCircuitHistory::integrate_remote_changesets(file_ident_type rem } TransformHistoryImpl transform_hist{*this, remote_file_ident}; - auto apply = [&](Changeset* c) -> bool { + auto apply = [&](const Changeset* c) -> bool { sync::InstructionApplier applier{*transact}; applier.apply(*c, logger); diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 05479fd23eb..710b736f82d 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -4442,25 +4442,28 @@ TEST(Sync_UserInterruptsIntegrationOfRemoteChanges) version_type user_commit_version = UINT_FAST64_MAX; Session::Config config; - config.on_before_download_integration = [&](size_t num_changesets) { - CHECK(num_changesets > 0); - - future = std::async([&] { - auto write = db_1->start_write(); - // Keep the transaction open until the sync client is waiting to acquire the write lock. - while (!db_1->waiting_for_write_lock()) { - millisleep(1); - } - write->close(); - - // Sync client holds the write lock now, so commit a local change. - WriteTransaction wt(db_1); - TableRef table = wt.get_table("class_table name"); - auto obj = table->create_object(); - obj.set("string column", "foobar"); - user_commit_version = wt.commit(); - }); - }; + config.on_download_message_integrated_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + + future = std::async([&] { + auto write = db_1->start_write(); + // Keep the transaction open until the sync client is waiting to acquire the write lock. + while (!db_1->other_writers_waiting_for_lock()) { + millisleep(1); + } + write->close(); + + // Sync client holds the write lock now, so commit a local change. + WriteTransaction wt(db_1); + TableRef table = wt.get_table("class_table name"); + auto obj = table->create_object(); + obj.set("string column", "foobar"); + user_commit_version = wt.commit(); + }); + }; Session session_1 = fixture.make_session(0, db_1, std::move(config)); fixture.bind_session(session_1, 0, "/test"); Session session_2 = fixture.make_session(1, db_2); @@ -6803,7 +6806,7 @@ TEST(Sync_NonIncreasingServerVersions) VersionInfo version_info; util::StderrLogger logger; history.integrate_server_changesets(progress, &downloadable_bytes, server_changesets_encoded, version_info, - DownloadBatchState::LastInBatch, logger); + DownloadBatchState::SteadyState, logger); } TEST(Sync_InvalidChangesetFromServer) From d5843dcdddd7ee78c8a7a2c2fdee209bda1b88db Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Sat, 17 Sep 2022 11:25:30 +0200 Subject: [PATCH 11/20] Fix bugs introduced after code review changes --- src/realm/sync/transform.cpp | 3 +++ test/test_sync.cpp | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index 4fd31f4910d..dabf2180384 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -2574,6 +2574,9 @@ TransformerImpl::iterator TransformerImpl::transform_remote_changesets( // 2. No reciprocal changeset was modified continue_applying = changeset_applier(p) || must_apply_all; } + if (p != same_base_range_end) { + break; + } our_changesets.clear(); // deliberately not releasing memory } diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 710b736f82d..e7deb4d0a1e 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -4442,7 +4442,7 @@ TEST(Sync_UserInterruptsIntegrationOfRemoteChanges) version_type user_commit_version = UINT_FAST64_MAX; Session::Config config; - config.on_download_message_integrated_hook = + config.on_download_message_received_hook = [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { CHECK(batch_state == sync::DownloadBatchState::SteadyState); if (num_changesets == 0) From c44c806e114e612613c30974df620da7a154fcce Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 21 Sep 2022 09:33:18 +0200 Subject: [PATCH 12/20] Use thread instead of future --- test/test_sync.cpp | 58 +++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/test/test_sync.cpp b/test/test_sync.cpp index e7deb4d0a1e..bc00e8e6cb3 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include @@ -4438,32 +4437,33 @@ TEST(Sync_UserInterruptsIntegrationOfRemoteChanges) TEST_DIR(dir); MultiClientServerFixture fixture(2, 1, dir, test_context); - std::future future; + std::thread th; version_type user_commit_version = UINT_FAST64_MAX; Session::Config config; - config.on_download_message_received_hook = - [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { - CHECK(batch_state == sync::DownloadBatchState::SteadyState); - if (num_changesets == 0) - return; - - future = std::async([&] { - auto write = db_1->start_write(); - // Keep the transaction open until the sync client is waiting to acquire the write lock. - while (!db_1->other_writers_waiting_for_lock()) { - millisleep(1); - } - write->close(); - - // Sync client holds the write lock now, so commit a local change. - WriteTransaction wt(db_1); - TableRef table = wt.get_table("class_table name"); - auto obj = table->create_object(); - obj.set("string column", "foobar"); - user_commit_version = wt.commit(); - }); + config.on_download_message_received_hook = [&](const sync::SyncProgress&, int64_t, + sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + + auto func = [&] { + auto write = db_1->start_write(); + // Keep the transaction open until the sync client is waiting to acquire the write lock. + while (!db_1->other_writers_waiting_for_lock()) { + millisleep(1); + } + write->close(); + + // Sync client holds the write lock now, so commit a local change. + WriteTransaction wt(db_1); + TableRef table = wt.get_table("class_table name"); + auto obj = table->create_object(); + obj.set("string column", "foobar"); + user_commit_version = wt.commit(); }; + th = std::thread{std::move(func)}; + }; Session session_1 = fixture.make_session(0, db_1, std::move(config)); fixture.bind_session(session_1, 0, "/test"); Session session_2 = fixture.make_session(1, db_2); @@ -4483,20 +4483,24 @@ TEST(Sync_UserInterruptsIntegrationOfRemoteChanges) session_1.wait_for_upload_complete_or_client_stopped(); session_1.wait_for_download_complete_or_client_stopped(); - future.wait(); + th.join(); // Check that local change was commited before all remote changes were integrated. ReadTransaction rt(db_1); - CHECK(user_commit_version < rt.get_version()); + CHECK_LESS(user_commit_version, rt.get_version()); } ReadTransaction read_1(db_1); ReadTransaction read_2(db_2); const Group& group_1 = read_1; - const Group& group_2 = read_1; + const Group& group_2 = read_2; ConstTableRef table_1 = group_1.get_table("class_table name"); CHECK_EQUAL(table_1->size(), number_of_changesets * number_of_instructions + 1); - ConstTableRef table_2 = group_2.get_table("class_table name2"); + ConstTableRef table_2 = group_1.get_table("class_table name2"); + CHECK_EQUAL(table_2->size(), number_of_changesets * number_of_instructions); + table_1 = group_2.get_table("class_table name"); + CHECK(!table_1); + table_2 = group_2.get_table("class_table name2"); CHECK_EQUAL(table_2->size(), number_of_changesets * number_of_instructions); } From 3d15242216d69c2cff94db9abc531d5811ee5adb Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 21 Sep 2022 09:56:38 +0200 Subject: [PATCH 13/20] Move _impl::ForEventLoopDispatcher out of realm::util namespace so realm::util::_impl does not exist --- src/realm/object-store/util/event_loop_dispatcher.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/realm/object-store/util/event_loop_dispatcher.hpp b/src/realm/object-store/util/event_loop_dispatcher.hpp index 0c59e0f2f19..a9a429b1bd3 100644 --- a/src/realm/object-store/util/event_loop_dispatcher.hpp +++ b/src/realm/object-store/util/event_loop_dispatcher.hpp @@ -63,6 +63,8 @@ class EventLoopDispatcher { } }; +} // namespace util + namespace _impl::ForEventLoopDispatcher { template struct ExtractSignatureImpl { @@ -106,6 +108,8 @@ template using ExtractSignature = typename ExtractSignatureImpl::signature; } // namespace _impl::ForEventLoopDispatcher +namespace util { + // Deduction guide for function pointers. template EventLoopDispatcher(void (*)(Args...)) -> EventLoopDispatcher; From 168ef37eec90f763cb5e942ae6784ccfb6db1009 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 21 Sep 2022 10:04:28 +0200 Subject: [PATCH 14/20] Refactor transforming and applying server changesets into separate method --- src/realm/sync/changeset.hpp | 4 + src/realm/sync/noinst/client_history_impl.cpp | 133 ++++++++++-------- src/realm/sync/noinst/client_history_impl.hpp | 3 + src/realm/sync/transform.cpp | 17 ++- src/realm/sync/transform.hpp | 16 +-- 5 files changed, 95 insertions(+), 78 deletions(-) diff --git a/src/realm/sync/changeset.hpp b/src/realm/sync/changeset.hpp index 0482b88d1cc..e37a29a7f0b 100644 --- a/src/realm/sync/changeset.hpp +++ b/src/realm/sync/changeset.hpp @@ -178,6 +178,10 @@ struct Changeset { /// be part of refactoring the ChangesetIndex size_t transform_sequence = 0; + /// If the changeset was compacted during download, the size of the original + /// changeset. Only applies to changesets sent by the server. + std::size_t original_changeset_size = 0; + /// Compare for exact equality, including that interned strings have the /// same integer values, and there is the same number of interned strings, /// same topology of tombstones, etc. diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index d81586222b5..436c0bf5186 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -400,12 +400,11 @@ void ClientHistory::integrate_server_changesets( } VersionID new_version{0, 0}; - constexpr std::size_t commit_byte_size_limit = 102400; // 100 KB auto num_changesets = incoming_changesets.size(); util::Span changesets_to_integrate(changesets); - // Ideally, this loop runs only once, but it can run up to `incoming_changesets.size()` times, depending on how - // many times the sync client needs to yield the write lock to allow the user to commit their changes. In each + // Ideally, this loop runs only once, but it can run up to `incoming_changesets.size()` times, depending on the + // number of times the sync client yields the write lock to allow the user to commit their changes. In each // iteration, at least one changeset is transformed and committed. while (!changesets_to_integrate.empty()) { TransactionRef transact = m_db->start_write(); // Throws @@ -418,65 +417,8 @@ void ClientHistory::integrate_server_changesets( prepare_for_write(); // Throws std::uint64_t downloaded_bytes_in_transaction = 0; - size_t changesets_transformed_count = 0; - - try { - for (auto& changeset : changesets_to_integrate) { - REALM_ASSERT(changeset.last_integrated_remote_version <= local_version); - REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id); - - // It is possible that the synchronization history has been trimmed - // to a point where a prefix of the merge window is no longer - // available, but this can only happen if that prefix consisted - // entirely of upload skippable entries. Since such entries (those - // that are empty or of remote origin) will be skipped by the - // transformer anyway, we can simply clamp the beginning of the - // merge window to the beginning of the synchronization history, - // when this situation occurs. - // - // See trim_sync_history() for further details. - if (changeset.last_integrated_remote_version < m_sync_history_base_version) - changeset.last_integrated_remote_version = m_sync_history_base_version; - } - - if (m_replication.apply_server_changes()) { - Transformer& transformer = get_transformer(); // Throws - auto changeset_applier = [&](const Changeset* transformed_changeset) -> bool { - InstructionApplier applier{*transact}; - { - TempShortCircuitReplication tscr{m_replication}; - applier.apply(*transformed_changeset, &logger); // Throws - } - - return !(m_db->other_writers_waiting_for_lock() && - transact->get_commit_size() >= commit_byte_size_limit); - }; - auto it = transformer.transform_remote_changesets(*this, sync_file_id, local_version, - changesets_to_integrate, - std::move(changeset_applier), &logger); // Throws - changesets_transformed_count = std::distance(changesets_to_integrate.begin(), it); - } - else { - // Skip over all changesets if they don't need to be transformed and applied. - changesets_transformed_count = changesets_to_integrate.size(); - } - - // Compute downloaded bytes only after we know how many remote changesets are going to be commited in this - // transaction. - for (std::size_t i = 0; i < changesets_transformed_count; ++i) { - downloaded_bytes_in_transaction += incoming_changesets[i].original_changeset_size; - } - } - catch (const BadChangesetError& e) { - throw IntegrationException(ClientError::bad_changeset, - util::format("Failed to apply received changeset: %1", e.what())); - } - catch (const TransformError& e) { - throw IntegrationException(ClientError::bad_changeset, - util::format("Failed to transform received changeset: %1", e.what())); - } - - logger.debug("Integrated %1 changesets out of %2", changesets_transformed_count, num_changesets); + auto changesets_transformed_count = transform_and_apply_server_changesets( + changesets_to_integrate, transact, logger, downloaded_bytes_in_transaction); // downloaded_bytes always contains the total number of downloaded bytes // from the Realm. downloaded_bytes must be persisted in the Realm, since @@ -529,6 +471,8 @@ void ClientHistory::integrate_server_changesets( if (transact_reporter) { transact_reporter->report_sync_transact(old_version, new_version); // Throws } + + logger.debug("Integrated %1 changesets out of %2", changesets_transformed_count, num_changesets); } REALM_ASSERT(new_version.version > 0); @@ -537,6 +481,71 @@ void ClientHistory::integrate_server_changesets( } +size_t ClientHistory::transform_and_apply_server_changesets(util::Span changesets_to_integrate, + TransactionRef transact, util::Logger& logger, + std::uint64_t& downloaded_bytes) +{ + REALM_ASSERT(transact->get_transact_stage() == DB::transact_Writing); + + if (!m_replication.apply_server_changes()) { + std::for_each(changesets_to_integrate.begin(), changesets_to_integrate.end(), [&](const Changeset c) { + downloaded_bytes += c.original_changeset_size; + }); + // Skip over all changesets if they don't need to be transformed and applied. + return changesets_to_integrate.size(); + } + + version_type local_version = transact->get_version_of_current_transaction().version; + auto sync_file_id = transact->get_sync_file_id(); + + try { + for (auto& changeset : changesets_to_integrate) { + REALM_ASSERT(changeset.last_integrated_remote_version <= local_version); + REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id); + + // It is possible that the synchronization history has been trimmed + // to a point where a prefix of the merge window is no longer + // available, but this can only happen if that prefix consisted + // entirely of upload skippable entries. Since such entries (those + // that are empty or of remote origin) will be skipped by the + // transformer anyway, we can simply clamp the beginning of the + // merge window to the beginning of the synchronization history, + // when this situation occurs. + // + // See trim_sync_history() for further details. + if (changeset.last_integrated_remote_version < m_sync_history_base_version) + changeset.last_integrated_remote_version = m_sync_history_base_version; + } + + Transformer& transformer = get_transformer(); // Throws + constexpr std::size_t commit_byte_size_limit = 102400; // 100 KB + + auto changeset_applier = [&](const Changeset* transformed_changeset) -> bool { + InstructionApplier applier{*transact}; + { + TempShortCircuitReplication tscr{m_replication}; + applier.apply(*transformed_changeset, &logger); // Throws + } + downloaded_bytes += transformed_changeset->original_changeset_size; + + return !(m_db->other_writers_waiting_for_lock() && transact->get_commit_size() >= commit_byte_size_limit); + }; + auto changesets_transformed_count = + transformer.transform_remote_changesets(*this, sync_file_id, local_version, changesets_to_integrate, + std::move(changeset_applier), &logger); // Throws + return changesets_transformed_count; + } + catch (const BadChangesetError& e) { + throw IntegrationException(ClientError::bad_changeset, + util::format("Failed to apply received changeset: %1", e.what())); + } + catch (const TransformError& e) { + throw IntegrationException(ClientError::bad_changeset, + util::format("Failed to transform received changeset: %1", e.what())); + } +} + + void ClientHistory::get_upload_download_bytes(DB* db, std::uint_fast64_t& downloaded_bytes, std::uint_fast64_t& downloadable_bytes, std::uint_fast64_t& uploaded_bytes, diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index 5fb50315f31..671332a34a7 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -416,6 +416,9 @@ class ClientHistory final : public _impl::History, public TransformHistory { std::uint_fast64_t sum_of_history_entry_sizes(version_type begin_version, version_type end_version) const noexcept; + size_t transform_and_apply_server_changesets(util::Span changesets_to_integrate, TransactionRef, + util::Logger&, std::uint64_t& downloaded_bytes); + void prepare_for_write(); Replication::version_type add_changeset(BinaryData changeset, BinaryData sync_changeset); void add_sync_history_entry(const HistoryEntry&); diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index dabf2180384..5a17f1c665c 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -2519,10 +2519,11 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes #endif // LCOV_EXCL_STOP REALM_DEBUG } -TransformerImpl::iterator TransformerImpl::transform_remote_changesets( - TransformHistory& history, file_ident_type local_file_ident, version_type current_local_version, - util::Span parsed_changesets, util::UniqueFunction changeset_applier, - util::Logger* logger) +size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, file_ident_type local_file_ident, + version_type current_local_version, + util::Span parsed_changesets, + util::UniqueFunction changeset_applier, + util::Logger* logger) { REALM_ASSERT(local_file_ident != 0); @@ -2568,13 +2569,14 @@ TransformerImpl::iterator TransformerImpl::transform_remote_changesets( }); } - for (auto continue_applying = true; p != same_base_range_end && continue_applying; ++p) { + auto continue_applying = true; + for (; p != same_base_range_end && continue_applying; ++p) { // It is safe to stop applying the changesets if: // 1. There are no reciprocal changesets // 2. No reciprocal changeset was modified continue_applying = changeset_applier(p) || must_apply_all; } - if (p != same_base_range_end) { + if (!continue_applying) { break; } @@ -2597,7 +2599,7 @@ TransformerImpl::iterator TransformerImpl::transform_remote_changesets( // the current transaction. flush_reciprocal_transform_cache(history); // Throws - return p; + return p - parsed_changesets.begin(); } @@ -2669,6 +2671,7 @@ void parse_remote_changeset(const Transformer::RemoteChangeset& remote_changeset parsed_changeset.last_integrated_remote_version = remote_changeset.last_integrated_local_version; parsed_changeset.origin_timestamp = remote_changeset.origin_timestamp; parsed_changeset.origin_file_ident = remote_changeset.origin_file_ident; + parsed_changeset.original_changeset_size = remote_changeset.original_changeset_size; } } // namespace sync diff --git a/src/realm/sync/transform.hpp b/src/realm/sync/transform.hpp index bcceb6774f3..96789b2794a 100644 --- a/src/realm/sync/transform.hpp +++ b/src/realm/sync/transform.hpp @@ -169,18 +169,17 @@ class Transformer { /// \param changeset_applier Called to to apply each transformed changeset. /// Returns true if it can continue applying the changests, false otherwise. /// - /// \return Iterator to the next changeset that needs to be transformed and - /// applied, or `end()` if there is no changeset left. + /// \return The number of changesets that have been transformed and applied. /// /// \throw TransformError Thrown if operational transformation fails due to /// a problem with the specified changeset. /// /// FIXME: Consider using std::error_code instead of throwing /// TransformError. - virtual iterator transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, - version_type current_local_version, util::Span changesets, - util::UniqueFunction changeset_applier, - util::Logger* = nullptr) = 0; + virtual size_t transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, + version_type current_local_version, util::Span changesets, + util::UniqueFunction changeset_applier, + util::Logger* = nullptr) = 0; virtual ~Transformer() noexcept {} }; @@ -200,12 +199,11 @@ class TransformerImpl : public sync::Transformer { using Instruction = sync::Instruction; using TransformHistory = sync::TransformHistory; using version_type = sync::version_type; - using iterator = sync::Transformer::iterator; TransformerImpl(); - iterator transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, - util::UniqueFunction, util::Logger*) override; + size_t transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, + util::UniqueFunction, util::Logger*) override; struct Side; struct MajorSide; From 36ef66835bc43dc7fde21f58fc39ae7e0e07f803 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 21 Sep 2022 12:03:21 +0200 Subject: [PATCH 15/20] Fix test after introducing DownloadBatchState::SteadyState --- test/object-store/sync/flx_sync.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 7f11ae7ab36..97ef7b48918 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -1700,7 +1700,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app] return; } - if (query_version != 1 || batch_state != sync::DownloadBatchState::LastInBatch) { + if (query_version != 1 || batch_state == sync::DownloadBatchState::MoreToCome) { return; } From 97c54171dc2a04e204b2e7b6281c17a872f74b23 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 21 Sep 2022 13:52:59 +0200 Subject: [PATCH 16/20] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 23262ede912..5b46e9945f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Cut the runtime of aggregate operations on large dictionaries in half ([PR #5864](https://github.com/realm/realm-core/pull/5864)). * Improve performance of aggregate operations on collections of objects by 2x to 10x ([PR #5864](https://github.com/realm/realm-core/pull/5864)). * Adding support in the C API for `realm_results_find` and supporting all native types for `realm_results_get()` ([PR 5875](https://github.com/realm/realm-core/pull/5875)). +* Prioritize integration of local changes over remote changes - shorten the time users may have to wait when committing local changes. Stop storing downloaded changesets in history. ([PR #5844](https://github.com/realm/realm-core/pull/5844)). ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) From b3584c8dbd90323b0cffe352ef18bcb9f5019e1c Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 21 Sep 2022 21:02:06 +0200 Subject: [PATCH 17/20] Add integration test for pbs --- test/object-store/sync/app.cpp | 104 +++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/test/object-store/sync/app.cpp b/test/object-store/sync/app.cpp index 9f432f90e36..25268915b57 100644 --- a/test/object-store/sync/app.cpp +++ b/test/object-store/sync/app.cpp @@ -69,6 +69,18 @@ using util::Optional; using namespace std::string_view_literals; +namespace realm { + +class TestHelper { +public: + static DBRef& get_db(SharedRealm const& shared_realm) + { + return Realm::Internal::get_db(*shared_realm); + } +}; + +} // namespace realm + namespace { std::shared_ptr log_in(std::shared_ptr app, AppCredentials credentials = AppCredentials::anonymous()) { @@ -2829,6 +2841,98 @@ TEST_CASE("app: jwt login and metadata tests", "[sync][app]") { } } +TEST_CASE("app: user interrupts integration of remote changes", "[sync][app]") { + std::string base_url = get_base_url(); + const auto partition = random_string(100); + REQUIRE(!base_url.empty()); + + Schema schema1{ + {"TopLevel", + { + {"_id", PropertyType::ObjectId | PropertyType::Nullable, true}, + {"list_of_strings", PropertyType::Array | PropertyType::String}, + }}, + }; + + auto server_app_config = minimal_app_config(base_url, "interrupt_integration", schema1); + TestAppSession test_session(create_app(server_app_config)); + auto app = test_session.app(); + + // Upload changes from first client. + { + SyncTestFile config(app, partition, schema1); + auto realm = Realm::get_shared_realm(config); + + CppContext c(realm); + for (int i = 0; i < 10; ++i) { + realm->begin_transaction(); + auto obj = Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, {"list_of_strings", AnyVector{}}})); + List str_list(obj, realm->schema().find("TopLevel")->property_for_name("list_of_strings")); + for (int j = 0; j < 100; ++j) { + str_list.add(c, std::any(std::string(1024, 'a' + (j % 26)))); + } + realm->commit_transaction(); + } + + wait_for_upload(*realm); + } + + // Start second client and download the changes from first client. + + // Use different table name to avoid schema conflicts when merging changes from the other client. + Schema schema2{ + {"TopLevel2", + { + {"_id", PropertyType::ObjectId | PropertyType::Nullable, true}, + {"list_of_strings", PropertyType::Array | PropertyType::String}, + }}, + }; + SyncTestFile config(app, partition, schema2); + + SharedRealm realm; + std::thread th; + sync::version_type user_commit_version = UINT_FAST64_MAX; + + config.sync_config->on_download_message_received_hook = [&](std::weak_ptr, const sync::SyncProgress&, + int64_t, sync::DownloadBatchState batch_state, + size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + + // Interrupt integration of server changesets by commiting a local change. + auto func = [&] { + auto db = TestHelper::get_db(realm); + auto write = db->start_write(); + // Keep the transaction open until the sync client is waiting to acquire the write lock. + while (!db->other_writers_waiting_for_lock()) { + millisleep(1); + } + write->close(); + + // Sync client holds the write lock now, so commit a local change. + WriteTransaction wt(db); + TableRef table = wt.get_table("class_TopLevel2"); + table->create_object_with_primary_key(ObjectId::gen()); + user_commit_version = wt.commit(); + }; + th = std::thread{std::move(func)}; + }; + + realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + + th.join(); + + CHECK(realm->read_group().get_table("class_TopLevel")->size() == 10); // objects downloaded + CHECK(realm->read_group().get_table("class_TopLevel2")->size() == 1); // local user write + + auto db = TestHelper::get_db(realm); + ReadTransaction rt(db); + CHECK(user_commit_version < rt.get_version()); +} + namespace cf = realm::collection_fixtures; TEMPLATE_TEST_CASE("app: collections of links integration", "[sync][app][collections]", cf::ListOfObjects, cf::ListOfMixedLinks, cf::SetOfObjects, cf::SetOfMixedLinks, cf::DictionaryOfObjects, From 475b6697bfce4ad1c3faa81f0971949eb2c632ff Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Fri, 23 Sep 2022 07:37:01 +0200 Subject: [PATCH 18/20] It is a violation to call on_flx_sync_progress with SteadyState --- src/realm/sync/client.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 3de6f2ea914..2d9bb8a33b3 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1001,8 +1001,8 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat switch (batch_state) { case DownloadBatchState::SteadyState: - // Cannot be called with this value. This is to make compiler happy. - return; + // Cannot be called with this value. + REALM_UNREACHABLE(); case DownloadBatchState::LastInBatch: if (m_flx_active_version == new_version) { return; From cea8594e20e15c1b4f279d234b5cb5077cab5bba Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 26 Sep 2022 09:05:01 +0200 Subject: [PATCH 19/20] Remove flaky tests --- test/object-store/sync/app.cpp | 91 ------------------------- test/test_sync.cpp | 118 --------------------------------- 2 files changed, 209 deletions(-) diff --git a/test/object-store/sync/app.cpp b/test/object-store/sync/app.cpp index 25268915b57..82c359469e4 100644 --- a/test/object-store/sync/app.cpp +++ b/test/object-store/sync/app.cpp @@ -2841,97 +2841,6 @@ TEST_CASE("app: jwt login and metadata tests", "[sync][app]") { } } -TEST_CASE("app: user interrupts integration of remote changes", "[sync][app]") { - std::string base_url = get_base_url(); - const auto partition = random_string(100); - REQUIRE(!base_url.empty()); - - Schema schema1{ - {"TopLevel", - { - {"_id", PropertyType::ObjectId | PropertyType::Nullable, true}, - {"list_of_strings", PropertyType::Array | PropertyType::String}, - }}, - }; - - auto server_app_config = minimal_app_config(base_url, "interrupt_integration", schema1); - TestAppSession test_session(create_app(server_app_config)); - auto app = test_session.app(); - - // Upload changes from first client. - { - SyncTestFile config(app, partition, schema1); - auto realm = Realm::get_shared_realm(config); - - CppContext c(realm); - for (int i = 0; i < 10; ++i) { - realm->begin_transaction(); - auto obj = Object::create(c, realm, "TopLevel", - std::any(AnyDict{{"_id", ObjectId::gen()}, {"list_of_strings", AnyVector{}}})); - List str_list(obj, realm->schema().find("TopLevel")->property_for_name("list_of_strings")); - for (int j = 0; j < 100; ++j) { - str_list.add(c, std::any(std::string(1024, 'a' + (j % 26)))); - } - realm->commit_transaction(); - } - - wait_for_upload(*realm); - } - - // Start second client and download the changes from first client. - - // Use different table name to avoid schema conflicts when merging changes from the other client. - Schema schema2{ - {"TopLevel2", - { - {"_id", PropertyType::ObjectId | PropertyType::Nullable, true}, - {"list_of_strings", PropertyType::Array | PropertyType::String}, - }}, - }; - SyncTestFile config(app, partition, schema2); - - SharedRealm realm; - std::thread th; - sync::version_type user_commit_version = UINT_FAST64_MAX; - - config.sync_config->on_download_message_received_hook = [&](std::weak_ptr, const sync::SyncProgress&, - int64_t, sync::DownloadBatchState batch_state, - size_t num_changesets) { - CHECK(batch_state == sync::DownloadBatchState::SteadyState); - if (num_changesets == 0) - return; - - // Interrupt integration of server changesets by commiting a local change. - auto func = [&] { - auto db = TestHelper::get_db(realm); - auto write = db->start_write(); - // Keep the transaction open until the sync client is waiting to acquire the write lock. - while (!db->other_writers_waiting_for_lock()) { - millisleep(1); - } - write->close(); - - // Sync client holds the write lock now, so commit a local change. - WriteTransaction wt(db); - TableRef table = wt.get_table("class_TopLevel2"); - table->create_object_with_primary_key(ObjectId::gen()); - user_commit_version = wt.commit(); - }; - th = std::thread{std::move(func)}; - }; - - realm = Realm::get_shared_realm(config); - wait_for_download(*realm); - - th.join(); - - CHECK(realm->read_group().get_table("class_TopLevel")->size() == 10); // objects downloaded - CHECK(realm->read_group().get_table("class_TopLevel2")->size() == 1); // local user write - - auto db = TestHelper::get_db(realm); - ReadTransaction rt(db); - CHECK(user_commit_version < rt.get_version()); -} namespace cf = realm::collection_fixtures; TEMPLATE_TEST_CASE("app: collections of links integration", "[sync][app][collections]", cf::ListOfObjects, diff --git a/test/test_sync.cpp b/test/test_sync.cpp index bc00e8e6cb3..a3853de2a3b 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -4385,124 +4385,6 @@ TEST(Sync_MergeMultipleChangesets) CHECK_EQUAL(table2->size(), 2 * number_of_changesets * number_of_instructions); } -TEST(Sync_UserInterruptsIntegrationOfRemoteChanges) -{ - constexpr int number_of_changesets = 10; - constexpr int number_of_instructions = 100; - - TEST_CLIENT_DB(db_1); - TEST_CLIENT_DB(db_2); - - { - WriteTransaction wt(db_1); - TableRef table = wt.add_table("class_table name"); - table->add_column(type_String, "string column"); - wt.commit(); - } - - { - WriteTransaction wt(db_2); - // Use different table name to avoid schema conflicts when merging changes from this client to the other - // client. - TableRef table = wt.add_table("class_table name2"); - table->add_column(type_String, "string column"); - wt.commit(); - } - - { - for (int i = 0; i < number_of_changesets; ++i) { - WriteTransaction wt(db_1); - TableRef table = wt.get_table("class_table name"); - for (int j = 0; j < number_of_instructions; ++j) { - auto obj = table->create_object(); - obj.set("string column", std::string(1024, 'a' + (j % 26))); - } - wt.commit(); - } - } - - { - for (int i = 0; i < number_of_changesets; ++i) { - WriteTransaction wt(db_2); - TableRef table = wt.get_table("class_table name2"); - for (int j = 0; j < number_of_instructions; ++j) { - auto obj = table->create_object(); - obj.set("string column", std::string(1000, char('a' + j % 26))); - } - wt.commit(); - } - } - - { - TEST_DIR(dir); - MultiClientServerFixture fixture(2, 1, dir, test_context); - - std::thread th; - version_type user_commit_version = UINT_FAST64_MAX; - - Session::Config config; - config.on_download_message_received_hook = [&](const sync::SyncProgress&, int64_t, - sync::DownloadBatchState batch_state, size_t num_changesets) { - CHECK(batch_state == sync::DownloadBatchState::SteadyState); - if (num_changesets == 0) - return; - - auto func = [&] { - auto write = db_1->start_write(); - // Keep the transaction open until the sync client is waiting to acquire the write lock. - while (!db_1->other_writers_waiting_for_lock()) { - millisleep(1); - } - write->close(); - - // Sync client holds the write lock now, so commit a local change. - WriteTransaction wt(db_1); - TableRef table = wt.get_table("class_table name"); - auto obj = table->create_object(); - obj.set("string column", "foobar"); - user_commit_version = wt.commit(); - }; - th = std::thread{std::move(func)}; - }; - Session session_1 = fixture.make_session(0, db_1, std::move(config)); - fixture.bind_session(session_1, 0, "/test"); - Session session_2 = fixture.make_session(1, db_2); - fixture.bind_session(session_2, 0, "/test"); - - // Start server and upload changes of second client. - fixture.start_server(0); - fixture.start_client(1); - session_2.wait_for_upload_complete_or_client_stopped(); - session_2.wait_for_download_complete_or_client_stopped(); - // Stop second client. - fixture.stop_client(1); - - // Start the first client and upload their changes. - // Wait to integrate changes from the second client. - fixture.start_client(0); - session_1.wait_for_upload_complete_or_client_stopped(); - session_1.wait_for_download_complete_or_client_stopped(); - - th.join(); - - // Check that local change was commited before all remote changes were integrated. - ReadTransaction rt(db_1); - CHECK_LESS(user_commit_version, rt.get_version()); - } - - ReadTransaction read_1(db_1); - ReadTransaction read_2(db_2); - const Group& group_1 = read_1; - const Group& group_2 = read_2; - ConstTableRef table_1 = group_1.get_table("class_table name"); - CHECK_EQUAL(table_1->size(), number_of_changesets * number_of_instructions + 1); - ConstTableRef table_2 = group_1.get_table("class_table name2"); - CHECK_EQUAL(table_2->size(), number_of_changesets * number_of_instructions); - table_1 = group_2.get_table("class_table name"); - CHECK(!table_1); - table_2 = group_2.get_table("class_table name2"); - CHECK_EQUAL(table_2->size(), number_of_changesets * number_of_instructions); -} #endif // REALM_PLATFORM_WIN32 From 6091910273524c290e890bfa702d428f1ef063be Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 26 Sep 2022 09:29:50 +0200 Subject: [PATCH 20/20] Remove unused test code --- test/object-store/sync/app.cpp | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/test/object-store/sync/app.cpp b/test/object-store/sync/app.cpp index 9a29b8310a1..07628a2537e 100644 --- a/test/object-store/sync/app.cpp +++ b/test/object-store/sync/app.cpp @@ -69,18 +69,6 @@ using util::Optional; using namespace std::string_view_literals; -namespace realm { - -class TestHelper { -public: - static DBRef& get_db(SharedRealm const& shared_realm) - { - return Realm::Internal::get_db(*shared_realm); - } -}; - -} // namespace realm - namespace { std::shared_ptr log_in(std::shared_ptr app, AppCredentials credentials = AppCredentials::anonymous()) { @@ -2841,7 +2829,6 @@ TEST_CASE("app: jwt login and metadata tests", "[sync][app]") { } } - namespace cf = realm::collection_fixtures; TEMPLATE_TEST_CASE("app: collections of links integration", "[sync][app][collections]", cf::ListOfObjects, cf::ListOfMixedLinks, cf::SetOfObjects, cf::SetOfMixedLinks, cf::DictionaryOfObjects,