-
Notifications
You must be signed in to change notification settings - Fork 170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure interrupted FLX bootstraps get restarted after re-connecting #5466
Conversation
if (REALM_UNLIKELY(!known_error_code)) { | ||
logger.error("Unknown error code"); // Throws | ||
return ClientError::bad_error_code; | ||
} | ||
ProtocolError error_code_2 = ProtocolError(error_code); | ||
if (REALM_UNLIKELY(!is_session_level_error(error_code_2))) { | ||
ProtocolError error_code = ProtocolError(error_code_int); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't like when we have variables that end in _2
. This variable renaming is just to try to make the variable names match their purpose.
@@ -678,7 +678,11 @@ SubscriptionStore::get_next_pending_version(int64_t last_query_version, DB::vers | |||
descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {true}}); | |||
auto res = sub_sets->where() | |||
.greater(sub_sets->get_primary_key_column(), last_query_version) | |||
.group() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the crux of the whole bug. When you're first firing up the sync client, the last_query_version will be zero which should include all the non-complete/non-error subscription sets, but didn't. This meant the sync client would never follow up the IDENT with a QUERY message and never make progress.
|
||
auto realm = Realm::get_shared_realm(interrupted_realm_config); | ||
auto table = realm->read_group().get_table("class_TopLevel"); | ||
realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without the changes in subscriptions.cpp, this test would hang forever here.
@@ -451,4 +451,42 @@ TEST(Sync_SubscriptionStoreInternalSchemaMigration) | |||
CHECK(!versions.get_version_for(tr, "non_existent_table")); | |||
} | |||
|
|||
TEST(Sync_SubscriptionStoreNextPendingVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a unit test for the changes in subscriptions.cpp.
auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {}); | ||
REQUIRE(sub_store->get_active_and_latest_versions() == std::pair<int64_t, int64_t>{0, 1}); | ||
auto latest_subs = sub_store->get_latest(); | ||
REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section is checking whether we've actually set this test up correctly and that there is a SubscriptionSet that has started bootstrapping but hasn't completed yet.
src/realm/sync/client.hpp
Outdated
@@ -312,6 +312,11 @@ class Session { | |||
/// | |||
/// This feature exists exclusively for testing purposes at this time. | |||
bool simulate_integration_error = false; | |||
|
|||
// Will be called after a download message is received and validated bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bytes -> by
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Darned autocorrect
@@ -165,6 +165,10 @@ struct SyncConfig { | |||
std::function<void(std::shared_ptr<Realm> before_frozen, std::shared_ptr<Realm> after, bool did_recover)> | |||
notify_after_client_reset; | |||
|
|||
// Will be called after a download message is received and validated by the client but befefore it's been | |||
// transformed or applied. To be used in testing only. | |||
std::function<void(std::weak_ptr<SyncSession>)> on_download_message_received_hook; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you really need the SyncSession here? It seems to be used only in tests. Can you use realm->sync_session()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there's a chicken/egg problm here where you need to define this hook before you have a valid realm, so you have to capture the realm by reference, but the hook may get called after the realm is destroyed in some cases. This caused a problem with ASAN. So I had this pass the sync session as a weak_ptr here as the easiest memory-correct way to access the sync session.
// and are bigger than 1MB respectively. | ||
// | ||
// So this generates 5 objects each with 1000+ instructions that are each 1MB+ big. This should result in | ||
// 3 download messages total with one changeset each for the bootstrap download messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's 5 download messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are two download messages for subscription set zero (the schema instructions) and then 5 total for the entire sync session. The trace output for after the session resumes is below:
Connection[3]: Session[3]: Sending: IDENT(client_file_ident=4, client_file_ident_salt=8478468785466499577, scan_server_version=5, scan_client_version=5, latest_server_version=5, latest_server_version_salt=2271452601256551051, query_version: 0 query_size: 2, query: "{}")
Connection[3]: Session[3]: Sending: MARK(request_ident=1)
Connection[3]: Session[3]: Received: MARK(request_ident=1)
Connection[3]: Session[3]: Sending: QUERY(query_version=1, query_size=30, query="{"TopLevel":"(TRUEPREDICATE)"}"
Connection[3]: Session[3]: Sending: UPLOAD(progress_client_version=15, progress_server_version=5, locked_server_version=5, num_changesets=0)
Connection[3]: Download message compression: is_body_compressed = true, compressed_body_size=17203, uncompressed_body_size=2146234
Connection[3]: Received: DOWNLOAD CHANGESET(server_version=5, client_version=5, origin_timestamp=232116755366, origin_file_ident=1, original_changeset_size=2146199, changeset_size=2146199)
Connection[3]: Changeset (parsed):
InternStrings 0="TopLevel", 1="list_of_strings", 2="queryable_int_field"
EraseObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6736}]
CreateObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6736}]
Update path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6736}].queryable_int_field, value=Int(0), default=0
EraseObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6737}]
CreateObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6737}]
Update path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6737}].queryable_int_field, value=Int(5), default=0
Connection[3]: Session[3]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2271452601256551051, upload_client_version=11, upload_server_version=5, downloadable_bytes=0, last_in_batch=false, query_version=1, num_changesets=1, ...)
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa6736);
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa6737);
Connection[3]: Session[3]: 1 remote changeset integrated, producing client version 16
Connection[3]: Download message compression: is_body_compressed = true, compressed_body_size=17205, uncompressed_body_size=2146234
Connection[3]: Received: DOWNLOAD CHANGESET(server_version=5, client_version=5, origin_timestamp=232116755366, origin_file_ident=1, original_changeset_size=2146199, changeset_size=2146199)
Connection[3]: Changeset (parsed):
InternStrings 0="TopLevel", 1="list_of_strings", 2="queryable_int_field"
EraseObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6738}]
CreateObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6738}]
Update path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6738}].queryable_int_field, value=Int(10), default=0
EraseObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6739}]
CreateObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6739}]
Update path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6739}].queryable_int_field, value=Int(15), default=0
Connection[3]: Session[3]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2271452601256551051, upload_client_version=11, upload_server_version=5, downloadable_bytes=0, last_in_batch=false, query_version=1, num_changesets=1, ...)
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa6738);
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa6739);
Connection[3]: Session[3]: 1 remote changeset integrated, producing client version 18
Connection[3]: Download message compression: is_body_compressed = true, compressed_body_size=8713, uncompressed_body_size=1073160
Connection[3]: Received: DOWNLOAD CHANGESET(server_version=5, client_version=5, origin_timestamp=232116755366, origin_file_ident=1, original_changeset_size=1073125, changeset_size=1073125)
Connection[3]: Changeset (parsed):
InternStrings 0="TopLevel", 1="list_of_strings", 2="queryable_int_field"
EraseObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa673a}]
CreateObject path=TopLevel[ObjectId{627a6011c0fcbd05b4fa673a}]
Update path=TopLevel[ObjectId{627a6011c0fcbd05b4fa673a}].queryable_int_field, value=Int(20), default=0
Connection[3]: Session[3]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2271452601256551051, upload_client_version=11, upload_server_version=5, downloadable_bytes=0, last_in_batch=true, query_version=1, num_changesets=1, ...)
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa673a);
Connection[3]: Session[3]: 1 remote changeset integrated, producing client version 19
test/object-store/sync/flx_sync.cpp
Outdated
promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise))]( | ||
std::weak_ptr<SyncSession> weak_session) mutable { | ||
auto session = weak_session.lock(); | ||
// We interrupt on the 5rd download message, which should be 1/3rd of the way through the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment seems wrong. aren't 5 download messages in total? If that's the case, then bootstrapping should be completed, but that contradicts with the check at line 336. I guess the whole idea is to re-open the same realm afterwards. Is that the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first two download messages here are for exchanging the schema instructions. The 3rd/4th download message puts the SubscriptionSet into the Bootstrapping
state. The 5th need to be restarted. I've also updated this comment to try to make it clearer.
Here's the trace output of the interrupted session:
Connection[2]: Session[2]: Received: IDENT(client_file_ident=4, client_file_ident_salt=2564681099856153049)
Connection[2]: Session[2]: Sending: IDENT(client_file_ident=4, client_file_ident_salt=2564681099856153049, scan_server_version=0, scan_client_version=0, latest_server_version=0, latest_server_version_salt=0, query_version: 0 query_size: 2, query: "{}")
Connection[2]: Session[2]: Sending: MARK(request_ident=1)
Connection[2]: Download message compression: is_body_compressed = false, compressed_body_size=118, uncompressed_body_size=118
Connection[2]: Received: DOWNLOAD CHANGESET(server_version=1, client_version=0, origin_timestamp=232117527249, origin_file_ident=1, original_changeset_size=93, changeset_size=93)
Connection[2]: Changeset: 3F 00 08 54 6F 70 4C 65 76 65 6C 3F 01 03 5F 69 64 3F 02 0F 6C 69 73 74 5F 6F 66 5F 73 74 72 69 6E 67 73 3F 03 09 70 61 72 74 69 74 69 6F 6E 3F 04 13 71 75 65 72 79 61 62 6C 65 5F 69 6E 74 5F 66 69 65 6C 64 00 00 00 01 0A 00 06 00 02 03 00 01 06 00 03 03 01 00 06 00 04 01 01 00
Connection[2]: Changeset (parsed):
InternStrings 0="TopLevel", 1="_id", 2="list_of_strings", 3="partition", 4="queryable_int_field"
AddTable path="TopLevel", pk_field="_id", pk_type=ObjectId, pk_nullable=0
AddColumn table="TopLevel", field="list_of_strings", type=String, nullable=0, collection_type=List
AddColumn table="TopLevel", field="partition", type=String, nullable=1, collection_type=Single
AddColumn table="TopLevel", field="queryable_int_field", type=Int, nullable=1, collection_type=Single
Connection[2]: Session[2]: Received: DOWNLOAD(download_server_version=1, download_client_version=0, latest_server_version=1, latest_server_version_salt=0, upload_client_version=0, upload_server_version=0, downloadable_bytes=0, last_in_batch=false, query_version=0, num_changesets=1, ...)
Connection[2]: Session[2]: Scanning incoming changeset [1/1] (4 instructions)
Connection[2]: Session[2]: Scanning local changeset [1/1] (3 instructions)
Connection[2]: Session[2]: Indexing incoming changeset [1/1] (4 instructions)
Connection[2]: Session[2]: Finished changeset indexing (incoming: 1 changeset(s) / 4 instructions, local: 1 changeset(s) / 3 instructions, conflict group(s): 1)
Connection[2]: Session[2]: Transforming local changeset [1/1] through 1 incoming changeset(s) with 1 conflict group(s)
Connection[2]: Session[2]: Finished transforming 1 local changesets through 1 incoming changesets (3 vs 4 instructions, in 1 conflict groups)
Connection[2]: Session[2]: 1 remote changeset integrated, producing client version 8
Connection[2]: Download message compression: is_body_compressed = false, compressed_body_size=23, uncompressed_body_size=23
Connection[2]: Received: DOWNLOAD CHANGESET(server_version=1, client_version=0, origin_timestamp=232117527249, origin_file_ident=1, original_changeset_size=0, changeset_size=0)
Connection[2]: Changeset:
Connection[2]: Changeset (parsed):
InternStrings
Connection[2]: Session[2]: Received: DOWNLOAD(download_server_version=1, download_client_version=0, latest_server_version=1, latest_server_version_salt=0, upload_client_version=0, upload_server_version=0, downloadable_bytes=0, last_in_batch=true, query_version=0, num_changesets=1, ...)
Connection[2]: Session[2]: Scanning incoming changeset [1/1] (0 instructions)
Connection[2]: Session[2]: Scanning local changeset [1/1] (3 instructions)
Connection[2]: Session[2]: Indexing incoming changeset [1/1] (0 instructions)
Connection[2]: Session[2]: Finished changeset indexing (incoming: 1 changeset(s) / 0 instructions, local: 1 changeset(s) / 3 instructions, conflict group(s): 1)
Connection[2]: Session[2]: Transforming local changeset [1/1] through 1 incoming changeset(s) with 1 conflict group(s)
Connection[2]: Session[2]: Finished transforming 1 local changesets through 1 incoming changesets (3 vs 0 instructions, in 1 conflict groups)
Connection[2]: Session[2]: 1 remote changeset integrated, producing client version 9
Connection[2]: Session[2]: Received: MARK(request_ident=1)
Connection[2]: Session[2]: Limiting UPLOAD message up to version 5 to send QUERY version 1
Connection[2]: Session[2]: Sending: UPLOAD(progress_client_version=5, progress_server_version=0, locked_server_version=1, num_changesets=1)
Connection[2]: Session[2]: Fetching changeset for upload (client_version=5, server_version=0, changeset_size=78, origin_timestamp=232117526996, origin_file_ident=0)
Connection[2]: Session[2]: Changeset: 3F 00 08 54 6F 70 4C 65 76 65 6C 3F 01 03 5F 69 64 00 00 00 01 0A 00 3F 02 13 71 75 65 72 79 61 62 6C 65 5F 69 6E 74 5F 66 69 65 6C 64 3F 03 00 06 00 02 01 01 00 3F 04 0F 6C 69 73 74 5F 6F 66 5F 73 74 72 69 6E 67 73 06 00 04 03 00 01
Connection[2]: Session[2]: Changeset (parsed):
InternStrings 0="TopLevel", 1="_id", 2="queryable_int_field", 3="", 4="list_of_strings"
AddTable path="TopLevel", pk_field="_id", pk_type=ObjectId, pk_nullable=0
AddColumn table="TopLevel", field="queryable_int_field", type=Int, nullable=1, collection_type=Single
AddColumn table="TopLevel", field="list_of_strings", type=String, nullable=0, collection_type=List
Connection[2]: Session[2]: Sending: QUERY(query_version=1, query_size=30, query="{"TopLevel":"(TRUEPREDICATE)"}"
Connection[2]: Session[2]: Sending: UPLOAD(progress_client_version=9, progress_server_version=1, locked_server_version=1, num_changesets=0)
Connection[2]: Download message compression: is_body_compressed = false, compressed_body_size=0, uncompressed_body_size=0
Connection[2]: Session[2]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2530630698783247753, upload_client_version=5, upload_server_version=0, downloadable_bytes=0, last_in_batch=true, query_version=0, num_changesets=0, ...)
Connection[2]: Session[2]: Sending: UPLOAD(progress_client_version=11, progress_server_version=5, locked_server_version=5, num_changesets=0)
Connection[2]: Download message compression: is_body_compressed = true, compressed_body_size=17202, uncompressed_body_size=2146234
Connection[2]: Received: DOWNLOAD CHANGESET(server_version=5, client_version=5, origin_timestamp=232117527373, origin_file_ident=1, original_changeset_size=2146199, changeset_size=2146199)
Connection[2]: Changeset (parsed):
InternStrings 0="TopLevel", 1="list_of_strings", 2="queryable_int_field"
EraseObject path=TopLevel[ObjectId{627a63154f9f80f1d627d81e}]
CreateObject path=TopLevel[ObjectId{627a63154f9f80f1d627d81e}]
Update path=TopLevel[ObjectId{627a63154f9f80f1d627d81e}].queryable_int_field, value=Int(0), default=0
EraseObject path=TopLevel[ObjectId{627a63154f9f80f1d627d81f}]
CreateObject path=TopLevel[ObjectId{627a63154f9f80f1d627d81f}]
Update path=TopLevel[ObjectId{627a63154f9f80f1d627d81f}].queryable_int_field, value=Int(5), default=0
Connection[2]: Session[2]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2530630698783247753, upload_client_version=5, upload_server_version=0, downloadable_bytes=0, last_in_batch=false, query_version=1, num_changesets=1, ...)
Connection[2]: Session[2]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a63154f9f80f1d627d81e);
Connection[2]: Session[2]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a63154f9f80f1d627d81f);
Connection[2]: Session[2]: 1 remote changeset integrated, producing client version 13
Connection[2]: Download message compression: is_body_compressed = true, compressed_body_size=17204, uncompressed_body_size=2146234
Connection[2]: Received: DOWNLOAD CHANGESET(server_version=5, client_version=5, origin_timestamp=232117527373, origin_file_ident=1, original_changeset_size=2146199, changeset_size=2146199)
Connection[2]: Changeset (parsed):
InternStrings 0="TopLevel", 1="list_of_strings", 2="queryable_int_field"
EraseObject path=TopLevel[ObjectId{627a63154f9f80f1d627d820}]
CreateObject path=TopLevel[ObjectId{627a63154f9f80f1d627d820}]
Update path=TopLevel[ObjectId{627a63154f9f80f1d627d820}].queryable_int_field, value=Int(10), default=0
EraseObject path=TopLevel[ObjectId{627a63154f9f80f1d627d821}]
CreateObject path=TopLevel[ObjectId{627a63154f9f80f1d627d821}]
Update path=TopLevel[ObjectId{627a63154f9f80f1d627d821}].queryable_int_field, value=Int(15), default=0
Connection[2]: Session[2]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2530630698783247753, upload_client_version=5, upload_server_version=0, downloadable_bytes=0, last_in_batch=false, query_version=1, num_changesets=1, ...)
Connection[2]: Session[2]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a63154f9f80f1d627d820);
Connection[2]: Session[2]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a63154f9f80f1d627d821);
Connection[2]: Session[2]: 1 remote changeset integrated, producing client version 15
Connection[2]: Session[2]: Initiating deactivation
Connection[2]: Session[2]: Sending: UNBIND
Connection[2]: Session[2]: Deactivation completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
0c5c492
to
699d7b1
Compare
What, How & Why?
While working on compensating writes stuff I found there was a case where an interrupted bootstrap could cause the sync client to hang for forever on re-connect. I tried to annotate this PR explaining what's going on, but feel free to reach out if this is confusing.
☑️ ToDos