Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for MARK message to mark a SubscriptionSet as Complete #5795

Merged
merged 26 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f18c87a
Add AwaitingMark state for SubscriptionSets
jbreams Aug 30, 2022
9b8768d
undo config override
jbreams Aug 30, 2022
7a537eb
do not break SubscriptionSet schema
jbreams Aug 30, 2022
39a5181
add test and fix up enum translation
jbreams Sep 29, 2022
66f0e3e
fix override file
jbreams Sep 29, 2022
244ec8b
use naive comparisons again
jbreams Sep 29, 2022
9ce02a6
use proper enum values in more places
jbreams Sep 29, 2022
1f765a8
remove workaround for client reset
jbreams Sep 30, 2022
f1a3549
Merge remote-tracking branch 'origin/master' into jbr/ensure_fully_sy…
jbreams Oct 7, 2022
eea8257
more tests
jbreams Oct 7, 2022
d6238d3
fix windows warnings
jbreams Oct 10, 2022
704a102
Merge remote-tracking branch 'origin/master' into jbr/ensure_fully_sy…
jbreams Oct 19, 2022
d95f001
fixes
jbreams Oct 20, 2022
3cc9a78
changelog
jbreams Oct 20, 2022
1197d03
fix ordering of state enum
jbreams Oct 20, 2022
38dbf38
Merge remote-tracking branch 'origin/master' into jbr/ensure_fully_sy…
jbreams Oct 21, 2022
ea5a07c
update changelog
jbreams Oct 21, 2022
e8ccb4c
bump server
jbreams Oct 21, 2022
9d91042
fix windows warning
jbreams Oct 24, 2022
93e872c
Merge remote-tracking branch 'origin/master' into jbr/ensure_fully_sy…
jbreams Oct 24, 2022
30523f6
remove unnecessary enum conversion logic
jbreams Oct 25, 2022
32ad0aa
do not load empty MutableSubscriptionSets from disk
jbreams Oct 25, 2022
95fa856
fix getting the active subscription when there is none
jbreams Oct 25, 2022
a0bcc33
Merge remote-tracking branch 'origin/master' into jbr/ensure_fully_sy…
jbreams Nov 1, 2022
550d81c
remove unneeded case and add more tests
jbreams Nov 1, 2022
684e596
fix changelog
jbreams Nov 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion evergreen/config_overrides.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
}
},
"sync": {
"brokerRegistryCacheTTL": "0s",
"allowSyncSessionTestCommands": true
}
}
3 changes: 2 additions & 1 deletion src/realm.h
Original file line number Diff line number Diff line change
Expand Up @@ -3565,10 +3565,11 @@ typedef struct realm_flx_sync_subscription_desc realm_flx_sync_subscription_desc
typedef enum realm_flx_sync_subscription_set_state {
RLM_SYNC_SUBSCRIPTION_UNCOMMITTED = 0,
RLM_SYNC_SUBSCRIPTION_PENDING,
RLM_SYNC_BOOTSTRAPPING,
RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING,
RLM_SYNC_SUBSCRIPTION_COMPLETE,
RLM_SYNC_SUBSCRIPTION_ERROR,
RLM_SYNC_SUBSCRIPTION_SUPERSEDED,
RLM_SYNC_SUBSCRIPTION_AWAITING_MARK,
} realm_flx_sync_subscription_set_state_e;
typedef void (*realm_sync_on_subscription_state_changed_t)(realm_userdata_t userdata,
realm_flx_sync_subscription_set_state_e state);
Expand Down
52 changes: 47 additions & 5 deletions src/realm/object-store/c_api/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,49 @@ static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientReset)
RLM_SYNC_ERROR_ACTION_CLIENT_RESET);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientResetNoRecovery) ==
RLM_SYNC_ERROR_ACTION_CLIENT_RESET_NO_RECOVERY);

SubscriptionSet::State sub_state_from_c_enum(realm_flx_sync_subscription_set_state_e value)
{
switch (static_cast<realm_flx_sync_subscription_set_state>(value)) {
case RLM_SYNC_SUBSCRIPTION_PENDING:
return SubscriptionSet::State::Pending;
case RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING:
return SubscriptionSet::State::Bootstrapping;
case RLM_SYNC_SUBSCRIPTION_AWAITING_MARK:
return SubscriptionSet::State::AwaitingMark;
case RLM_SYNC_SUBSCRIPTION_COMPLETE:
return SubscriptionSet::State::Complete;
case RLM_SYNC_SUBSCRIPTION_ERROR:
return SubscriptionSet::State::Error;
case RLM_SYNC_SUBSCRIPTION_SUPERSEDED:
return SubscriptionSet::State::Superseded;
case RLM_SYNC_SUBSCRIPTION_UNCOMMITTED:
return SubscriptionSet::State::Uncommitted;
}
REALM_UNREACHABLE();
}

realm_flx_sync_subscription_set_state_e sub_state_to_c_enum(SubscriptionSet::State state)
{
switch (state) {
case SubscriptionSet::State::Pending:
return RLM_SYNC_SUBSCRIPTION_PENDING;
case SubscriptionSet::State::Bootstrapping:
return RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING;
case SubscriptionSet::State::AwaitingMark:
return RLM_SYNC_SUBSCRIPTION_AWAITING_MARK;
case SubscriptionSet::State::Complete:
return RLM_SYNC_SUBSCRIPTION_COMPLETE;
case SubscriptionSet::State::Error:
return RLM_SYNC_SUBSCRIPTION_ERROR;
case SubscriptionSet::State::Uncommitted:
return RLM_SYNC_SUBSCRIPTION_UNCOMMITTED;
case SubscriptionSet::State::Superseded:
return RLM_SYNC_SUBSCRIPTION_SUPERSEDED;
}
REALM_UNREACHABLE();
}

} // namespace

static realm_sync_error_code_t to_capi(const std::error_code& error_code, std::string& message)
Expand Down Expand Up @@ -569,8 +612,8 @@ realm_sync_on_subscription_set_state_change_wait(const realm_flx_sync_subscripti
{
REALM_ASSERT(subscription_set != nullptr);
SubscriptionSet::State state =
subscription_set->get_state_change_notification(SubscriptionSet::State{notify_when}).get();
return realm_flx_sync_subscription_set_state_e(static_cast<int>(state));
subscription_set->get_state_change_notification(sub_state_from_c_enum(notify_when)).get();
return sub_state_to_c_enum(state);
}

RLM_API bool
Expand All @@ -581,13 +624,12 @@ realm_sync_on_subscription_set_state_change_async(const realm_flx_sync_subscript
{
REALM_ASSERT(subscription_set != nullptr && callback != nullptr);
return wrap_err([&]() {
auto future_state = subscription_set->get_state_change_notification(SubscriptionSet::State{notify_when});
auto future_state = subscription_set->get_state_change_notification(sub_state_from_c_enum(notify_when));
std::move(future_state)
.get_async([callback, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](
const StatusWith<SubscriptionSet::State>& state) -> void {
if (state.is_ok())
callback(userdata.get(),
realm_flx_sync_subscription_set_state_e(static_cast<int>(state.get_value())));
callback(userdata.get(), sub_state_to_c_enum(state.get_value()));
else
callback(userdata.get(), realm_flx_sync_subscription_set_state_e::RLM_SYNC_SUBSCRIPTION_ERROR);
});
Expand Down
26 changes: 3 additions & 23 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,28 +401,7 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
std::move(fresh_mut_sub)
.commit()
.get_state_change_notification(sync::SubscriptionSet::State::Complete)
.then([=, weak_self = weak_from_this()](sync::SubscriptionSet::State) {
auto pf = util::make_promise_future<void>();
sync_session->wait_for_download_completion([=, promise = std::move(pf.promise)](
std::error_code ec) mutable {
auto strong_self = weak_self.lock();
if (!strong_self) {
return promise.set_error({ErrorCodes::RuntimeError,
"SyncSession was destroyed before download could be completed"});
}

if (ec) {
return promise.set_error(
{ErrorCodes::RuntimeError,
util::format("Error waiting for download completion for fresh realm (code: %1): %2",
ec.value(), ec.message())});
}

promise.emplace_value();
});
return std::move(pf.future);
})
.get_async([=, weak_self = weak_from_this()](Status s) {
.get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
// Keep the sync session alive while it's downloading, but then close
// it immediately
sync_session->close();
Expand All @@ -431,7 +410,8 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
strong_self->handle_fresh_realm_downloaded(db, none, server_requests_action);
}
else {
strong_self->handle_fresh_realm_downloaded(nullptr, s.reason(), server_requests_action);
strong_self->handle_fresh_realm_downloaded(nullptr, s.get_status().reason(),
server_requests_action);
}
}
});
Expand Down
40 changes: 33 additions & 7 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
int64_t m_flx_active_version = 0;
int64_t m_flx_last_seen_version = 0;
int64_t m_flx_latest_version = 0;
int64_t m_flx_pending_mark_version = 0;
std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;

bool m_initiated = false;
Expand Down Expand Up @@ -785,6 +786,7 @@ void SessionImpl::process_pending_flx_bootstrap()
VersionInfo new_version;
SyncProgress progress;
int64_t query_version = -1;
size_t changesets_processed = 0;
while (bootstrap_store->has_pending()) {
auto pending_batch = bootstrap_store->peek_pending(batch_size_in_bytes);
if (!pending_batch.progress) {
Expand All @@ -803,7 +805,6 @@ void SessionImpl::process_pending_flx_bootstrap()
throw IntegrationException(ClientError::bad_changeset, "simulated failure");
}


history.integrate_server_changesets(
*pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
[&](const TransactionRef& tr, size_t count) {
Expand All @@ -812,9 +813,7 @@ void SessionImpl::process_pending_flx_bootstrap()
},
get_transact_reporter());
progress = *pending_batch.progress;

REALM_ASSERT(call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed? I am using it in RCORE-1225 to update an object brought into view during bootstrap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think by accident. I've put it back.

batch_state, pending_batch.changesets.size()) == SyncClientHookAction::NoAction);
changesets_processed += pending_batch.changesets.size();

logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
"%3. %4 changesets remaining in bootstrap",
Expand All @@ -826,6 +825,10 @@ void SessionImpl::process_pending_flx_bootstrap()
REALM_ASSERT_3(query_version, !=, -1);
m_wrapper.on_sync_progress();
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);

auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
DownloadBatchState::LastInBatch, changesets_processed);
REALM_ASSERT(action == SyncClientHookAction::NoAction);
}

void SessionImpl::on_new_flx_subscription_set(int64_t new_version)
Expand Down Expand Up @@ -950,8 +953,10 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));

if (m_flx_subscription_store) {
std::tie(m_flx_active_version, m_flx_latest_version) =
m_flx_subscription_store->get_active_and_latest_versions();
auto versions_info = m_flx_subscription_store->get_active_and_latest_versions();
m_flx_active_version = versions_info.active;
m_flx_latest_version = versions_info.latest;
m_flx_pending_mark_version = versions_info.pending_mark;
}
}

Expand Down Expand Up @@ -1032,7 +1037,13 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat
return;
}
on_flx_sync_version_complete(new_version);
new_state = SubscriptionSet::State::Complete;
if (new_version == 0) {
new_state = SubscriptionSet::State::Complete;
}
else {
new_state = SubscriptionSet::State::AwaitingMark;
m_flx_pending_mark_version = new_version;
}
break;
case DownloadBatchState::MoreToCome:
if (m_flx_last_seen_version == new_version) {
Expand Down Expand Up @@ -1438,6 +1449,16 @@ void SessionWrapper::on_download_completion()
m_upload_completion_handlers.push_back(std::move(handler)); // Throws
m_sync_completion_handlers.pop_back();
}

if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
m_flx_pending_mark_version);
auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version);
mutable_subs.update_state(SubscriptionSet::State::Complete);
std::move(mutable_subs).commit();
m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
}

util::LockGuard lock{m_client.m_mutex};
if (m_staged_download_mark > m_reached_download_mark) {
m_reached_download_mark = m_staged_download_mark;
Expand All @@ -1458,6 +1479,11 @@ void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
void SessionWrapper::on_resumed()
{
m_suspended = false;
if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't, so I went and wrote one, and then found out this block wasn't necessary and have removed it (but left the test case).

m_sess->logger.debug("Requesting download notification for query version %1 after resume",
m_flx_pending_mark_version);
m_sess->request_download_completion_notification();
}
if (m_connection_state_change_listener) {
ClientImpl::Connection& conn = m_sess->get_connection();
if (conn.get_state() != ConnectionState::disconnected) {
Expand Down
1 change: 1 addition & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ enum class SyncClientHookEvent {
DownloadMessageReceived,
DownloadMessageIntegrated,
BootstrapMessageProcessed,
BootstrapProcessed,
};

enum class SyncClientHookAction {
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,7 @@ void Session::send_query_change_message()

m_last_sent_flx_query_version = latest_sub_set.version();

enlist_to_send(); // throws
request_download_completion_notification();
}

void Session::send_upload_message()
Expand Down
Loading