Skip to content

Commit

Permalink
RCOCOA-2285: Wire up the new progress notifications (#8492)
Browse files Browse the repository at this point in the history
* Wire up the new progress notifications

* Fix whitespace violations

* Address some comments

* wip

* fix build

* Bump baas version; add non-streaming test

* Add changelog

* fix lint violations

* Fix build errors

* Fix bad merge

* Remove @mainactor annotations

* Address PR comments

* Update forgotten test

* Replace some session.waits with waitFor...
  • Loading branch information
nirinchev authored May 2, 2024
1 parent 771f922 commit 452588e
Showing 13 changed files with 491 additions and 84 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -37,6 +37,9 @@ store. Xcode 15.1 is now the minimum supported version.
to "Embed & Sign" in the "Frameworks, Libraries, and Embedded Content"
section on the General tab of your target's settings.
([#8561](https://github.com/realm/realm-swift/pull/8561)).
* The `transferredBytes` and `transferrableBytes` fields on `Progress` have been deprecated
in favor of `progressEstimate` which is a value between 0.0 and 1.0 indicating the estimated
progress toward the upload/download transfer. ([#8476](https://github.com/realm/realm-swift/issues/8476))

### Fixed
* `-[RLMUser allSessions]` did not include sessions which were currently
@@ -65,6 +68,9 @@ store. Xcode 15.1 is now the minimum supported version.
* Schema initialization could hit an assertion failure if the sync client
applied a downloaded changeset while the Realm file was in the process of
being opened ([#7041](https://github.com/realm/realm-core/issues/7041), since v10.15.0).
* The reported download progress for flexible sync Realms was incorrect. It is now replaced by a
progress estimate, which is derived by the server based on historical data and other heuristics.
([#8476](https://github.com/realm/realm-swift/issues/8476))

<!-- ### Breaking Changes - ONLY INCLUDE FOR NEW MAJOR version -->

43 changes: 25 additions & 18 deletions Realm/ObjectServerTests/RealmServer.swift
Original file line number Diff line number Diff line change
@@ -188,9 +188,11 @@ struct AdminProfile: Codable {
struct Role: Codable {
enum CodingKeys: String, CodingKey {
case groupId = "group_id"
case roleName = "role_name"
}

let groupId: String
let roleName: String
let groupId: String?
}

let roles: [Role]
@@ -453,7 +455,9 @@ class Admin {
}
.flatMap { (accessToken: String) -> Result<AdminSession, Error> in
self.userProfile(accessToken: accessToken).map {
AdminSession(accessToken: accessToken, groupId: $0.roles[0].groupId)
AdminSession(accessToken: accessToken, groupId: $0.roles.first(where: { role in
role.roleName == "GROUP_OWNER"
})!.groupId!)
}
}
.get()
@@ -883,6 +887,21 @@ public class RealmServer: NSObject {
]
]
]

// We only need to create the userData rule for .pbs since for .flx we
// have a default rule that covers all collections
let userDataRule: [String: Json] = [
"database": "test_data",
"collection": "UserData",
"roles": [[
"name": "default",
"apply_when": [:],
"insert": true,
"delete": true,
"additional_fields": [:]
]]
]
_ = app.services[serviceId].rules.post(userDataRule)
case .flx(let fields):
serviceConfig = [
"flexible_sync": [
@@ -939,19 +958,6 @@ public class RealmServer: NSObject {
"""
], failOnError)

let rules = app.services[serviceId].rules
let userDataRule: [String: Json] = [
"database": "test_data",
"collection": "UserData",
"roles": [[
"name": "default",
"apply_when": [:],
"insert": true,
"delete": true,
"additional_fields": [:]
]]
]
_ = rules.post(userDataRule)
app.customUserData.patch(on: group, [
"mongo_service_id": serviceId,
"enabled": true,
@@ -978,9 +984,10 @@ public class RealmServer: NSObject {
], failOnError)

// Disable exponential backoff when the server isn't ready for us to connect
session.privateApps[appId].settings.patch(on: group, [
"sync": ["disable_client_error_backoff": true]
], failOnError)
// TODO: this is returning 403 with current server. Reenable once it's fixed - see https://mongodb.slack.com/archives/C0121N9LJ14/p1713885482349059
// session.privateApps[appId].settings.patch(on: group, [
// "sync": ["disable_client_error_backoff": true]
// ], failOnError)

try group.throwingWait(timeout: .now() + 5.0)

249 changes: 248 additions & 1 deletion Realm/ObjectServerTests/SwiftFlexibleSyncServerTests.swift
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ class SwiftFlexibleSyncTests: SwiftSyncTestCase {
}

override var objectTypes: [ObjectBase.Type] {
[SwiftPerson.self, SwiftTypesSyncObject.self]
[SwiftPerson.self, SwiftTypesSyncObject.self, SwiftHugeSyncObject.self]
}

override func createApp() throws -> String {
@@ -941,6 +941,253 @@ class SwiftFlexibleSyncTests: SwiftSyncTestCase {

proxy.stop()
}

// MARK: - Progress notifiers
func testAsyncOpenProgress() throws {
try populateRealm()

let asyncOpenEx = expectation(description: "async open")

let user = createUser()
var config = user.flexibleSyncConfiguration(initialSubscriptions: { subscriptions in
subscriptions.append(QuerySubscription<SwiftHugeSyncObject>())
})
config.objectTypes = objectTypes
var downloadRealm: Realm?
let task = Realm.asyncOpen(configuration: config) { result in
try! { XCTAssertNoThrow(try result.get()) }()
downloadRealm = try! result.get()
asyncOpenEx.fulfill()
}

let callCount = Locked(0)
let progress = Locked<SyncSession.Progress?>(nil)

task.addProgressNotification { p in
if let progress = progress.value {
if progress.progressEstimate < 1.0 {
XCTAssertGreaterThanOrEqual(p.progressEstimate, progress.progressEstimate)
}
}
progress.value = p
callCount.withLock({ $0 += 1 })
}

waitForExpectations(timeout: 10.0, handler: nil)

XCTAssertEqual(try XCTUnwrap(downloadRealm).objects(SwiftHugeSyncObject.self).count, 2)

let p1 = try XCTUnwrap(progress.value)
XCTAssertEqual(p1.progressEstimate, 1.0)
XCTAssertTrue(p1.isTransferComplete)
}

func testNonStreamingDownloadNotifier() throws {
try populateRealm()

let realm = try openRealm(wait: false)

let session = try XCTUnwrap(realm.syncSession)
let ex = expectation(description: "first download")
let callCount = Locked(0)
let progress = Locked<SyncSession.Progress?>(nil)

let test = Locked<Bool>(false)

let token = session.addProgressNotification(for: .download, mode: .forCurrentlyOutstandingWork) { p in
// Verify that progress increases.
if let progress = progress.value {
XCTAssertGreaterThanOrEqual(p.progressEstimate, progress.progressEstimate)
}
progress.value = p
callCount.withLock { $0 += 1 }
}
XCTAssertNotNil(token)

let subscriptions = realm.subscriptions
subscriptions.update({
subscriptions.append(QuerySubscription<SwiftHugeSyncObject>(name: "huge_objects"))
}, onComplete: { err in
XCTAssertNil(err)
ex.fulfill()
})

waitForExpectations(timeout: 60.0)

XCTAssertEqual(realm.objects(SwiftHugeSyncObject.self).count, SwiftSyncTestCase.bigObjectCount)

XCTAssertGreaterThanOrEqual(callCount.value, 1)
let p1 = try XCTUnwrap(progress.value)
XCTAssertEqual(p1.progressEstimate, 1.0)
XCTAssertTrue(p1.isTransferComplete)
let initialCallCount = callCount.value
progress.value = nil
test.value = true

// Run a second time to upload more data and verify that the callback continues to be called
try populateRealm()
waitForDownloads(for: realm)

XCTAssertEqual(realm.objects(SwiftHugeSyncObject.self).count, 2*SwiftSyncTestCase.bigObjectCount)

// We expect that the progress notifier is not called again since those objects were
// added after it has completed.
XCTAssertEqual(callCount.value, initialCallCount)
XCTAssertNil(progress.value)

token!.invalidate()
}

func testStreamingDownloadNotifier() throws {
try populateRealm()

let realm = try openRealm(wait: false)

let session = try XCTUnwrap(realm.syncSession)
let ex = expectation(description: "first download")
let callCount = Locked(0)
let progress = Locked<SyncSession.Progress?>(nil)
let token = session.addProgressNotification(for: .download, mode: .reportIndefinitely) { p in
// Verify that progress increases. If it has reached 1.0, it may decrease again
// since we're adding more data
if let progress = progress.value {
if progress.progressEstimate < 1.0 {
XCTAssertGreaterThanOrEqual(p.progressEstimate, progress.progressEstimate)
}
}
progress.value = p
callCount.withLock({ $0 += 1 })
}
XCTAssertNotNil(token)

let subscriptions = realm.subscriptions
subscriptions.update({
subscriptions.append(QuerySubscription<SwiftHugeSyncObject>(name: "huge_objects"))
}, onComplete: { err in
DispatchQueue.main.async {
XCTAssertNil(err)
ex.fulfill()
}
})

waitForExpectations(timeout: 60.0)

XCTAssertEqual(realm.objects(SwiftHugeSyncObject.self).count, SwiftSyncTestCase.bigObjectCount)

XCTAssertGreaterThanOrEqual(callCount.value, 1)
let p1 = try XCTUnwrap(progress.value)
XCTAssertEqual(p1.progressEstimate, 1.0)
XCTAssertTrue(p1.isTransferComplete)
let initialCallCount = callCount.value
progress.value = nil

// Run a second time to upload more data and verify that the callback continues to be called
try populateRealm()
waitForDownloads(for: realm)

XCTAssertEqual(realm.objects(SwiftHugeSyncObject.self).count, 2*SwiftSyncTestCase.bigObjectCount)

XCTAssertGreaterThan(callCount.value, initialCallCount)
let p2 = try XCTUnwrap(progress.value)
XCTAssertEqual(p2.progressEstimate, 1.0)
XCTAssertTrue(p2.isTransferComplete)

token!.invalidate()
}

func testStreamingUploadNotifier() throws {
let realm = try openRealm(wait: false)
let subscriptions = realm.subscriptions
subscriptions.update {
subscriptions.append(QuerySubscription<SwiftHugeSyncObject>(name: "huge_objects"))
}
let session = try XCTUnwrap(realm.syncSession)

let progress = Locked<SyncSession.Progress?>(nil)
let callCount = Locked(0)

let token = session.addProgressNotification(for: .upload, mode: .reportIndefinitely) { p in
if let progress = progress.value {
if progress.progressEstimate < 1 {
XCTAssertGreaterThanOrEqual(p.progressEstimate, progress.progressEstimate)
}
}
progress.value = p
callCount.withLock({ $0 += 1 })
}
XCTAssertNotNil(token)
waitForUploads(for: realm)

for _ in 0..<5 {
progress.value = nil
let currentCount = callCount.value
try realm.write {
for _ in 0..<SwiftSyncTestCase.bigObjectCount {
realm.add(SwiftHugeSyncObject.create())
}
}

waitForUploads(for: realm)
XCTAssertGreaterThan(callCount.value, currentCount)
}

token!.invalidate()

let p = try XCTUnwrap(progress.value)
XCTAssertEqual(p.progressEstimate, 1.0)
XCTAssertTrue(p.isTransferComplete)
}

func testStreamingNotifierInvalidate() throws {
let realm = try openRealm()
RLMRealmSubscribeToAll(ObjectiveCSupport.convert(object: realm))

let session = try XCTUnwrap(realm.syncSession)
let downloadCount = Locked(0)
let uploadCount = Locked(0)
let tokenDownload = session.addProgressNotification(for: .download, mode: .reportIndefinitely) { _ in
downloadCount.wrappedValue += 1
}
let tokenUpload = session.addProgressNotification(for: .upload, mode: .reportIndefinitely) { _ in
uploadCount.wrappedValue += 1
}

try populateRealm()
waitForDownloads(for: realm)
try realm.write {
realm.add(SwiftHugeSyncObject.create())
}
waitForUploads(for: realm)

tokenDownload!.invalidate()
tokenUpload!.invalidate()
RLMSyncSession.notificationsQueue().sync { }

XCTAssertGreaterThan(downloadCount.wrappedValue, 1)
XCTAssertGreaterThan(uploadCount.wrappedValue, 1)

// There's inherently a race condition here: notification callbacks can
// be called up to one more time after they're invalidated if the sync
// worker thread is in the middle of processing a change at the time
// that the invalidation is requested, and there's no way to wait for that.
// This whole test takes 250ms, so we don't need a very long sleep.
Thread.sleep(forTimeInterval: 0.2)

downloadCount.value = 0
uploadCount.value = 0

try populateRealm()
waitForDownloads(for: realm)
try realm.write {
realm.add(SwiftHugeSyncObject.create())
}
waitForUploads(for: realm)

// We check that the notification block is not called after we reset the
// counters on the notifiers and call invalidated().
XCTAssertEqual(downloadCount.value, 0)
XCTAssertEqual(uploadCount.value, 0)
}
}

#endif // os(macOS)
Loading

0 comments on commit 452588e

Please sign in to comment.