Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor syncUpload Method to improve flow and error handling #2173

Merged
merged 18 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import com.google.android.fhir.search.has
import com.google.android.fhir.search.include
import com.google.android.fhir.search.revInclude
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.UploadSyncResult
import com.google.android.fhir.testing.assertJsonArrayEqualsIgnoringOrder
import com.google.android.fhir.testing.assertResourceEquals
import com.google.android.fhir.testing.readFromFile
Expand All @@ -49,7 +50,7 @@ import com.google.common.truth.Truth.assertThat
import java.math.BigDecimal
import java.time.Instant
import java.util.Date
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import org.hl7.fhir.r4.model.Address
import org.hl7.fhir.r4.model.CarePlan
Expand Down Expand Up @@ -528,19 +529,23 @@ class DatabaseImplTest {
database.insert(patient)
// Delete the patient created in setup as we only want to upload the patient in this test
database.deleteUpdates(listOf(TEST_PATIENT_1))
services.fhirEngine.syncUpload(LocalChangesFetchMode.AllChanges) {
it
.first { it.resourceId == "remote-patient-3" }
.let {
flowOf(
it.token to
Patient().apply {
id = it.resourceId
meta = remoteMeta
},
)
}
}
services.fhirEngine
.syncUpload(LocalChangesFetchMode.AllChanges) {
it
.first { it.resourceId == "remote-patient-3" }
.let {
UploadSyncResult.Success(
it.token,
listOf(
Patient().apply {
id = it.resourceId
meta = remoteMeta
},
),
)
}
}
.collect()
val selectedEntity = database.selectEntity(ResourceType.Patient, "remote-patient-3")
assertThat(selectedEntity.versionId).isEqualTo(remoteMeta.versionId)
assertThat(selectedEntity.lastUpdatedRemote).isEqualTo(remoteMeta.lastUpdated.toInstant())
Expand Down
18 changes: 13 additions & 5 deletions engine/src/main/java/com/google/android/fhir/FhirEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.google.android.fhir.db.ResourceNotFoundException
import com.google.android.fhir.search.Search
import com.google.android.fhir.sync.ConflictResolver
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.SyncUploadProgress
import com.google.android.fhir.sync.upload.UploadSyncResult
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import org.hl7.fhir.r4.model.Resource
Expand Down Expand Up @@ -49,14 +51,20 @@ interface FhirEngine {
suspend fun <R : Resource> search(search: Search): List<SearchResult<R>>

/**
* Synchronizes the [upload] result in the database. [upload] operation may result in multiple
* calls to the server to upload the data. Result of each call will be emitted by [upload] and the
* api caller should [Flow.collect] it.
* Synchronizes the upload results in the database.
*
* The [upload] function may initiate multiple server calls. Each call's result can then be used
* to emit [UploadSyncResult]. The caller should collect these results using [Flow.collect].
*
* @param localChangesFetchMode Specifies the mode to fetch local changes.
* @param upload A suspend function that takes a list of [LocalChange] and returns an
* [UploadSyncResult].
* @return A [Flow] that emits the progress of the synchronization process.
*/
suspend fun syncUpload(
localChangesFetchMode: LocalChangesFetchMode,
upload: (suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>),
)
upload: (suspend (List<LocalChange>) -> UploadSyncResult),
): Flow<SyncUploadProgress>

/**
* Synchronizes the [download] result in the database. The database will be updated to reflect the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import android.content.Context
import com.google.android.fhir.DatastoreUtil
import com.google.android.fhir.FhirEngine
import com.google.android.fhir.LocalChange
import com.google.android.fhir.LocalChangeToken
import com.google.android.fhir.SearchResult
import com.google.android.fhir.db.Database
import com.google.android.fhir.logicalId
Expand All @@ -32,8 +31,11 @@ import com.google.android.fhir.sync.Resolved
import com.google.android.fhir.sync.upload.DefaultResourceConsolidator
import com.google.android.fhir.sync.upload.LocalChangeFetcherFactory
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.SyncUploadProgress
import com.google.android.fhir.sync.upload.UploadSyncResult
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType

Expand Down Expand Up @@ -125,13 +127,37 @@ internal class FhirEngineImpl(private val database: Database, private val contex

override suspend fun syncUpload(
localChangesFetchMode: LocalChangesFetchMode,
upload: suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>,
) {
upload: (suspend (List<LocalChange>) -> UploadSyncResult),
): Flow<SyncUploadProgress> = flow {
val resourceConsolidator = DefaultResourceConsolidator(database)
val localChangeFetcher = LocalChangeFetcherFactory.byMode(localChangesFetchMode, database)

emit(
SyncUploadProgress(
remaining = localChangeFetcher.total,
initialTotal = localChangeFetcher.total,
),
)

while (localChangeFetcher.hasNext()) {
upload(localChangeFetcher.next()).collect {
resourceConsolidator.consolidate(it.first, it.second)
val localChanges = localChangeFetcher.next()
val uploadSyncResult = upload(localChanges)

resourceConsolidator.consolidate(uploadSyncResult)
when (uploadSyncResult) {
is UploadSyncResult.Success -> emit(localChangeFetcher.getProgress())
is UploadSyncResult.Failure -> {
with(localChangeFetcher.getProgress()) {
emit(
SyncUploadProgress(
remaining = remaining,
initialTotal = initialTotal,
uploadError = uploadSyncResult.syncError,
),
)
}
break
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.google.android.fhir.FhirEngine
import com.google.android.fhir.sync.download.DownloadState
import com.google.android.fhir.sync.download.Downloader
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.UploadState
import com.google.android.fhir.sync.upload.Uploader
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
Expand Down Expand Up @@ -120,23 +119,19 @@ internal class FhirSynchronizer(
private suspend fun upload(): SyncResult {
val exceptions = mutableListOf<ResourceSyncException>()
val localChangesFetchMode = LocalChangesFetchMode.AllChanges
fhirEngine.syncUpload(localChangesFetchMode) { list ->
flow {
uploader.upload(list).collect { result ->
when (result) {
is UploadState.Started ->
setSyncState(SyncJobStatus.InProgress(SyncOperation.UPLOAD, result.total))
is UploadState.Success ->
emit(result.localChangeToken to result.resource).also {
setSyncState(
SyncJobStatus.InProgress(SyncOperation.UPLOAD, result.total, result.completed),
)
}
is UploadState.Failure -> exceptions.add(result.syncError)
}
}
fhirEngine
.syncUpload(localChangesFetchMode) { uploader.upload(it) }
omarismail94 marked this conversation as resolved.
Show resolved Hide resolved
.collect { progress ->
progress.uploadError?.let { exceptions.add(it) }
?: setSyncState(
SyncJobStatus.InProgress(
SyncOperation.UPLOAD,
progress.initialTotal,
progress.initialTotal - progress.remaining,
),
)
}
}

return if (exceptions.isEmpty()) {
SyncResult.Success()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.google.android.fhir.sync.upload

import com.google.android.fhir.LocalChange
import com.google.android.fhir.db.Database
import com.google.android.fhir.sync.ResourceSyncException
import kotlin.properties.Delegates

/**
Expand All @@ -40,15 +41,16 @@ internal interface LocalChangeFetcher {
suspend fun next(): List<LocalChange>

/**
* Returns [FetchProgress], which contains the remaining changes left to upload and the initial
* total to upload.
* Returns [SyncUploadProgress], which contains the remaining changes left to upload and the
* initial total to upload.
*/
suspend fun getProgress(): FetchProgress
suspend fun getProgress(): SyncUploadProgress
}

data class FetchProgress(
data class SyncUploadProgress(
val remaining: Int,
val initialTotal: Int,
val uploadError: ResourceSyncException? = null,
)

internal class AllChangesLocalChangeFetcher(
Expand All @@ -65,8 +67,8 @@ internal class AllChangesLocalChangeFetcher(

override suspend fun next(): List<LocalChange> = database.getAllLocalChanges()

override suspend fun getProgress(): FetchProgress =
FetchProgress(database.getLocalChangesCount(), total)
override suspend fun getProgress(): SyncUploadProgress =
SyncUploadProgress(database.getLocalChangesCount(), total)
}

/** Represents the mode in which local changes should be fetched. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.android.fhir.sync.upload

import com.google.android.fhir.LocalChangeToken
import com.google.android.fhir.db.Database
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Resource
Expand All @@ -36,19 +35,29 @@ import timber.log.Timber
internal fun interface ResourceConsolidator {

/** Consolidates the local change token with the provided response from the FHIR server. */
suspend fun consolidate(localChangeToken: LocalChangeToken, response: Resource)
suspend fun consolidate(uploadSyncResult: UploadSyncResult)
}

/** Default implementation of [ResourceConsolidator] that uses the database to aid consolidation. */
internal class DefaultResourceConsolidator(private val database: Database) : ResourceConsolidator {

override suspend fun consolidate(localChangeToken: LocalChangeToken, response: Resource) {
database.deleteUpdates(localChangeToken)
when (response) {
is Bundle -> updateVersionIdAndLastUpdated(response)
else -> updateVersionIdAndLastUpdated(response)
override suspend fun consolidate(uploadSyncResult: UploadSyncResult) =
when (uploadSyncResult) {
is UploadSyncResult.Success -> {
database.deleteUpdates(uploadSyncResult.localChangeToken)
uploadSyncResult.resources.forEach {
when (it) {
is Bundle -> updateVersionIdAndLastUpdated(it)
else -> updateVersionIdAndLastUpdated(it)
}
}
}
is UploadSyncResult.Failure -> {
/* For now, do nothing (we do not delete the local changes from the database as they were
not uploaded successfully. In the future, add consolidation required if upload fails.
*/
}
}
}

private suspend fun updateVersionIdAndLastUpdated(bundle: Bundle) {
when (bundle.type) {
Expand Down
Loading