From da6aa54ff2a993f147d87d31a0217cf3757d66ed Mon Sep 17 00:00:00 2001 From: omarismail Date: Mon, 18 Sep 2023 18:09:56 +0100 Subject: [PATCH 01/11] Clean up SyncJobStatus and add FhirSynchronizer test --- .../fhir/demo/MainActivityViewModel.kt | 22 ++- .../android/fhir/sync/FhirSynchronizer.kt | 6 +- .../google/android/fhir/sync/SyncJobStatus.kt | 11 +- .../android/fhir/sync/FhirSynchronizerTest.kt | 181 ++++++++++++++++++ 4 files changed, 201 insertions(+), 19 deletions(-) create mode 100644 engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt diff --git a/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt b/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt index b09e223330..168398bc53 100644 --- a/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt +++ b/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt @@ -31,6 +31,7 @@ import com.google.android.fhir.sync.SyncJobStatus import java.time.format.DateTimeFormatter import java.util.concurrent.TimeUnit import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted @@ -48,6 +49,8 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica val pollState: Flow get() = _pollState + private var syncJob: Job? = null + init { viewModelScope.launch { Sync.periodicSync( @@ -55,8 +58,8 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica periodicSyncConfiguration = PeriodicSyncConfiguration( syncConstraints = Constraints.Builder().build(), - repeat = RepeatInterval(interval = 15, timeUnit = TimeUnit.MINUTES) - ) + repeat = RepeatInterval(interval = 15, timeUnit = TimeUnit.MINUTES), + ), ) .shareIn(this, SharingStarted.Eagerly, 10) .collect { _pollState.emit(it) } @@ -64,18 +67,21 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica } fun triggerOneTimeSync() { - viewModelScope.launch { - Sync.oneTimeSync(getApplication()) - .shareIn(this, SharingStarted.Eagerly, 10) - .collect { _pollState.emit(it) } - } + syncJob?.cancel() + + syncJob = + viewModelScope.launch { + Sync.oneTimeSync(getApplication()) + .shareIn(this, SharingStarted.Eagerly, 10) + .collect { _pollState.emit(it) } + } } /** Emits last sync time. */ fun updateLastSyncTimestamp() { val formatter = DateTimeFormatter.ofPattern( - if (DateFormat.is24HourFormat(getApplication())) formatString24 else formatString12 + if (DateFormat.is24HourFormat(getApplication())) formatString24 else formatString12, ) _lastSyncTimestampLiveData.value = Sync.getLastSyncTimestamp(getApplication())?.toLocalDateTime()?.format(formatter) ?: "" diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index f2f6de79b0..de31c208bf 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -77,7 +77,7 @@ internal class FhirSynchronizer( val state = when (result) { - is SyncResult.Success -> SyncJobStatus.Finished() + is SyncResult.Success -> SyncJobStatus.Finished is SyncResult.Error -> SyncJobStatus.Failed(result.exceptions) } @@ -86,7 +86,7 @@ internal class FhirSynchronizer( } suspend fun synchronize(): SyncJobStatus { - setSyncState(SyncJobStatus.Started()) + setSyncState(SyncJobStatus.Started) return listOf(download(), upload()) .filterIsInstance() @@ -123,7 +123,6 @@ internal class FhirSynchronizer( return if (exceptions.isEmpty()) { SyncResult.Success() } else { - setSyncState(SyncJobStatus.Glitch(exceptions)) SyncResult.Error(exceptions) } } @@ -151,7 +150,6 @@ internal class FhirSynchronizer( return if (exceptions.isEmpty()) { SyncResult.Success() } else { - setSyncState(SyncJobStatus.Glitch(exceptions)) SyncResult.Error(exceptions) } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt b/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt index 647dae7ec0..8c063a28df 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2022-2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,20 +22,17 @@ sealed class SyncJobStatus { val timestamp: OffsetDateTime = OffsetDateTime.now() /** Sync job has been started on the client but the syncing is not necessarily in progress. */ - class Started : SyncJobStatus() + object Started : SyncJobStatus() /** Syncing in progress with the server. */ data class InProgress( val syncOperation: SyncOperation, val total: Int = 0, - val completed: Int = 0 + val completed: Int = 0, ) : SyncJobStatus() - /** Glitched but sync job is being retried. */ - data class Glitch(val exceptions: List) : SyncJobStatus() - /** Sync job finished successfully. */ - class Finished : SyncJobStatus() + object Finished : SyncJobStatus() /** Sync job failed. */ data class Failed(val exceptions: List) : SyncJobStatus() diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt new file mode 100644 index 0000000000..af2cf87f11 --- /dev/null +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -0,0 +1,181 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.android.fhir.sync + +import androidx.test.core.app.ApplicationProvider +import com.google.android.fhir.LocalChangeToken +import com.google.android.fhir.sync.download.DownloadState +import com.google.android.fhir.sync.download.Downloader +import com.google.android.fhir.sync.upload.UploadState +import com.google.android.fhir.sync.upload.Uploader +import com.google.android.fhir.testing.TestFhirEngineImpl +import com.google.common.truth.Truth.assertThat +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import org.hl7.fhir.r4.model.Patient +import org.hl7.fhir.r4.model.ResourceType +import org.junit.Assert.assertThrows +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.mockito.Mock +import org.mockito.Mockito.`when` +import org.mockito.MockitoAnnotations +import org.mockito.kotlin.any +import org.robolectric.RobolectricTestRunner + +@RunWith(RobolectricTestRunner::class) +class FhirSynchronizerTest { + + @Mock private lateinit var uploader: Uploader + + @Mock private lateinit var downloader: Downloader + + @Mock private lateinit var conflictResolver: ConflictResolver + + private lateinit var fhirSynchronizer: FhirSynchronizer + + @Before + fun setUp() { + MockitoAnnotations.openMocks(this) + fhirSynchronizer = + FhirSynchronizer( + ApplicationProvider.getApplicationContext(), + TestFhirEngineImpl, + uploader, + downloader, + conflictResolver, + ) + } + + @Test + fun `subscribe should throw exception on second subscription`() { + val flow1 = MutableSharedFlow() + val flow2 = MutableSharedFlow() + fhirSynchronizer.subscribe(flow1) + + val exception = + assertThrows(IllegalStateException::class.java) { fhirSynchronizer.subscribe(flow2) } + + assertThat(exception.localizedMessage).isEqualTo("Already subscribed to a flow") + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Success on successful download and upload`() = + runTest(UnconfinedTestDispatcher()) { + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) + `when`(uploader.upload(any())) + .thenReturn( + flowOf( + UploadState.Success( + LocalChangeToken(listOf()), + Patient(), + 1, + 1, + ), + ), + ) + + val testFlow = MutableSharedFlow() + fhirSynchronizer.subscribe(testFlow) + + val emittedValues = mutableListOf() + backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), + SyncJobStatus.Finished, + ) + + assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Failed on failed download`() = + runTest(UnconfinedTestDispatcher()) { + val error = ResourceSyncException(ResourceType.Patient, Exception("Download error")) + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error))) + `when`(uploader.upload(any())) + .thenReturn( + flowOf( + UploadState.Success( + LocalChangeToken(listOf()), + Patient(), + 1, + 1, + ), + ), + ) + + val testFlow = MutableSharedFlow() + fhirSynchronizer.subscribe(testFlow) + + val emittedValues = mutableListOf() + backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), + SyncJobStatus.Failed(exceptions = listOf(error)), + ) + assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) + assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Failed on failed upload`() = + runTest(UnconfinedTestDispatcher()) { + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) + val error = ResourceSyncException(ResourceType.Patient, Exception("Upload error")) + `when`(uploader.upload(any())) + .thenReturn( + flowOf(UploadState.Failure(error)), + ) + + val testFlow = MutableSharedFlow() + fhirSynchronizer.subscribe(testFlow) + + val emittedValues = mutableListOf() + backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.Failed(exceptions = listOf(error)), + ) + assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) + assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) + } +} From 3b71a2302cd58c914fdc8b71443a69644741a6b7 Mon Sep 17 00:00:00 2001 From: omarismail Date: Fri, 22 Sep 2023 14:57:51 +0100 Subject: [PATCH 02/11] remove changes in MAVM --- .../fhir/demo/MainActivityViewModel.kt | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt b/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt index 168398bc53..b09e223330 100644 --- a/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt +++ b/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt @@ -31,7 +31,6 @@ import com.google.android.fhir.sync.SyncJobStatus import java.time.format.DateTimeFormatter import java.util.concurrent.TimeUnit import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted @@ -49,8 +48,6 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica val pollState: Flow get() = _pollState - private var syncJob: Job? = null - init { viewModelScope.launch { Sync.periodicSync( @@ -58,8 +55,8 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica periodicSyncConfiguration = PeriodicSyncConfiguration( syncConstraints = Constraints.Builder().build(), - repeat = RepeatInterval(interval = 15, timeUnit = TimeUnit.MINUTES), - ), + repeat = RepeatInterval(interval = 15, timeUnit = TimeUnit.MINUTES) + ) ) .shareIn(this, SharingStarted.Eagerly, 10) .collect { _pollState.emit(it) } @@ -67,21 +64,18 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica } fun triggerOneTimeSync() { - syncJob?.cancel() - - syncJob = - viewModelScope.launch { - Sync.oneTimeSync(getApplication()) - .shareIn(this, SharingStarted.Eagerly, 10) - .collect { _pollState.emit(it) } - } + viewModelScope.launch { + Sync.oneTimeSync(getApplication()) + .shareIn(this, SharingStarted.Eagerly, 10) + .collect { _pollState.emit(it) } + } } /** Emits last sync time. */ fun updateLastSyncTimestamp() { val formatter = DateTimeFormatter.ofPattern( - if (DateFormat.is24HourFormat(getApplication())) formatString24 else formatString12, + if (DateFormat.is24HourFormat(getApplication())) formatString24 else formatString12 ) _lastSyncTimestampLiveData.value = Sync.getLastSyncTimestamp(getApplication())?.toLocalDateTime()?.format(formatter) ?: "" From 1d30254035eb1ee22e612a08099adce1c377604f Mon Sep 17 00:00:00 2001 From: omarismail Date: Fri, 15 Sep 2023 18:03:08 +0100 Subject: [PATCH 03/11] use FetchProgress to track upload progress --- .../android/fhir/db/impl/DatabaseImplTest.kt | 33 ++++--- .../com/google/android/fhir/FhirEngine.kt | 6 +- .../android/fhir/impl/FhirEngineImpl.kt | 16 +-- .../android/fhir/sync/FhirSynchronizer.kt | 33 ++++--- .../fhir/sync/upload/ResourceConsolidator.kt | 21 ++-- .../android/fhir/sync/upload/Uploader.kt | 98 +++++++++---------- .../google/android/fhir/testing/Utilities.kt | 7 +- .../android/fhir/impl/FhirEngineImplTest.kt | 14 ++- .../{UploaderImplTest.kt => UploaderTest.kt} | 39 ++------ 9 files changed, 134 insertions(+), 133 deletions(-) rename engine/src/test/java/com/google/android/fhir/sync/upload/{UploaderImplTest.kt => UploaderTest.kt} (77%) diff --git a/engine/src/androidTest/java/com/google/android/fhir/db/impl/DatabaseImplTest.kt b/engine/src/androidTest/java/com/google/android/fhir/db/impl/DatabaseImplTest.kt index 2e6055fd82..f2acdf6a57 100644 --- a/engine/src/androidTest/java/com/google/android/fhir/db/impl/DatabaseImplTest.kt +++ b/engine/src/androidTest/java/com/google/android/fhir/db/impl/DatabaseImplTest.kt @@ -40,6 +40,7 @@ import com.google.android.fhir.search.has import com.google.android.fhir.search.include import com.google.android.fhir.search.revInclude import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.testing.assertJsonArrayEqualsIgnoringOrder import com.google.android.fhir.testing.assertResourceEquals import com.google.android.fhir.testing.readFromFile @@ -49,7 +50,7 @@ import com.google.common.truth.Truth.assertThat import java.math.BigDecimal import java.time.Instant import java.util.Date -import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.runBlocking import org.hl7.fhir.r4.model.Address import org.hl7.fhir.r4.model.CarePlan @@ -513,19 +514,23 @@ class DatabaseImplTest { database.insert(patient) // Delete the patient created in setup as we only want to upload the patient in this test database.deleteUpdates(listOf(TEST_PATIENT_1)) - services.fhirEngine.syncUpload(LocalChangesFetchMode.AllChanges) { - it - .first { it.resourceId == "remote-patient-3" } - .let { - flowOf( - it.token to - Patient().apply { - id = it.resourceId - meta = remoteMeta - }, - ) - } - } + services.fhirEngine + .syncUpload(LocalChangesFetchMode.AllChanges) { + it + .first { it.resourceId == "remote-patient-3" } + .let { + UploadSyncResult.Success( + it.token, + listOf( + Patient().apply { + id = it.resourceId + meta = remoteMeta + }, + ), + ) + } + } + .collect() val selectedEntity = database.selectEntity(ResourceType.Patient, "remote-patient-3") assertThat(selectedEntity.versionId).isEqualTo(remoteMeta.versionId) assertThat(selectedEntity.lastUpdatedRemote).isEqualTo(remoteMeta.lastUpdated.toInstant()) diff --git a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt index 0b9a31b4fb..8a7315ebd2 100644 --- a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt +++ b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt @@ -19,7 +19,9 @@ package com.google.android.fhir import com.google.android.fhir.db.ResourceNotFoundException import com.google.android.fhir.search.Search import com.google.android.fhir.sync.ConflictResolver +import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.UploadSyncResult import java.time.OffsetDateTime import kotlinx.coroutines.flow.Flow import org.hl7.fhir.r4.model.Resource @@ -55,8 +57,8 @@ interface FhirEngine { */ suspend fun syncUpload( localChangesFetchMode: LocalChangesFetchMode, - upload: (suspend (List) -> Flow>), - ) + upload: (suspend (List) -> UploadSyncResult), + ): Flow /** * Synchronizes the [download] result in the database. The database will be updated to reflect the diff --git a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt index adf2611487..069a7d7403 100644 --- a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt @@ -20,7 +20,6 @@ import android.content.Context import com.google.android.fhir.DatastoreUtil import com.google.android.fhir.FhirEngine import com.google.android.fhir.LocalChange -import com.google.android.fhir.LocalChangeToken import com.google.android.fhir.SearchResult import com.google.android.fhir.db.Database import com.google.android.fhir.logicalId @@ -30,10 +29,13 @@ import com.google.android.fhir.search.execute import com.google.android.fhir.sync.ConflictResolver import com.google.android.fhir.sync.Resolved import com.google.android.fhir.sync.upload.DefaultResourceConsolidator +import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangeFetcherFactory import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.UploadSyncResult import java.time.OffsetDateTime import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow import org.hl7.fhir.r4.model.Resource import org.hl7.fhir.r4.model.ResourceType @@ -125,14 +127,16 @@ internal class FhirEngineImpl(private val database: Database, private val contex override suspend fun syncUpload( localChangesFetchMode: LocalChangesFetchMode, - upload: suspend (List) -> Flow>, - ) { + upload: (suspend (List) -> UploadSyncResult), + ): Flow = flow { val resourceConsolidator = DefaultResourceConsolidator(database) val localChangeFetcher = LocalChangeFetcherFactory.byMode(localChangesFetchMode, database) + + emit(FetchProgress(localChangeFetcher.total, localChangeFetcher.total)) while (localChangeFetcher.hasNext()) { - upload(localChangeFetcher.next()).collect { - resourceConsolidator.consolidate(it.first, it.second) - } + val uploadSyncResult = upload(localChangeFetcher.next()) + resourceConsolidator.consolidate(uploadSyncResult) + emit(localChangeFetcher.getProgress()) } } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index de31c208bf..b1abfdbe89 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -22,7 +22,7 @@ import com.google.android.fhir.FhirEngine import com.google.android.fhir.sync.download.DownloadState import com.google.android.fhir.sync.download.Downloader import com.google.android.fhir.sync.upload.LocalChangesFetchMode -import com.google.android.fhir.sync.upload.UploadState +import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.sync.upload.Uploader import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow @@ -130,23 +130,26 @@ internal class FhirSynchronizer( private suspend fun upload(): SyncResult { val exceptions = mutableListOf() val localChangesFetchMode = LocalChangesFetchMode.AllChanges - fhirEngine.syncUpload(localChangesFetchMode) { list -> - flow { - uploader.upload(list).collect { result -> - when (result) { - is UploadState.Started -> - setSyncState(SyncJobStatus.InProgress(SyncOperation.UPLOAD, result.total)) - is UploadState.Success -> - emit(result.localChangeToken to result.resource).also { - setSyncState( - SyncJobStatus.InProgress(SyncOperation.UPLOAD, result.total, result.completed), - ) - } - is UploadState.Failure -> exceptions.add(result.syncError) + fhirEngine + .syncUpload(localChangesFetchMode) { + when (val result = uploader.upload(it)) { + is UploadSyncResult.Success -> result + is UploadSyncResult.Failure -> { + exceptions.add(result.syncError) + result } } } - } + .collect { + setSyncState( + SyncJobStatus.InProgress( + SyncOperation.UPLOAD, + it.initialTotal, + it.initialTotal - it.remaining, + ), + ) + } + return if (exceptions.isEmpty()) { SyncResult.Success() } else { diff --git a/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt b/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt index d8f19fc5a2..eac5d48105 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt @@ -16,7 +16,6 @@ package com.google.android.fhir.sync.upload -import com.google.android.fhir.LocalChangeToken import com.google.android.fhir.db.Database import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.Resource @@ -36,19 +35,25 @@ import timber.log.Timber internal fun interface ResourceConsolidator { /** Consolidates the local change token with the provided response from the FHIR server. */ - suspend fun consolidate(localChangeToken: LocalChangeToken, response: Resource) + suspend fun consolidate(uploadSyncResult: UploadSyncResult) } /** Default implementation of [ResourceConsolidator] that uses the database to aid consolidation. */ internal class DefaultResourceConsolidator(private val database: Database) : ResourceConsolidator { - override suspend fun consolidate(localChangeToken: LocalChangeToken, response: Resource) { - database.deleteUpdates(localChangeToken) - when (response) { - is Bundle -> updateVersionIdAndLastUpdated(response) - else -> updateVersionIdAndLastUpdated(response) + override suspend fun consolidate(uploadSyncResult: UploadSyncResult) = + when (uploadSyncResult) { + is UploadSyncResult.Success -> { + database.deleteUpdates(uploadSyncResult.localChangeToken) + uploadSyncResult.resources.forEach { + when (it) { + is Bundle -> updateVersionIdAndLastUpdated(it) + else -> updateVersionIdAndLastUpdated(it) + } + } + } + is UploadSyncResult.Failure -> {} } - } private suspend fun updateVersionIdAndLastUpdated(bundle: Bundle) { when (bundle.type) { diff --git a/engine/src/main/java/com/google/android/fhir/sync/upload/Uploader.kt b/engine/src/main/java/com/google/android/fhir/sync/upload/Uploader.kt index 0e5653371b..51efc538ef 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/upload/Uploader.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/upload/Uploader.kt @@ -22,8 +22,7 @@ import com.google.android.fhir.sync.DataSource import com.google.android.fhir.sync.ResourceSyncException import com.google.android.fhir.sync.upload.patch.PerResourcePatchGenerator import com.google.android.fhir.sync.upload.request.TransactionBundleGenerator -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow +import com.google.android.fhir.sync.upload.request.UploadRequest import org.hl7.fhir.exceptions.FHIRException import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.OperationOutcome @@ -39,70 +38,67 @@ import timber.log.Timber * 4. processing the responses from the server and consolidate any changes (i.e. updates resource * IDs). */ -internal class Uploader( - private val dataSource: DataSource, -) { +internal class Uploader(private val dataSource: DataSource) { private val patchGenerator = PerResourcePatchGenerator private val requestGenerator = TransactionBundleGenerator.getDefault() - suspend fun upload(localChanges: List): Flow = flow { + suspend fun upload(localChanges: List): UploadSyncResult { val patches = patchGenerator.generate(localChanges) val requests = requestGenerator.generateUploadRequests(patches) val token = LocalChangeToken(localChanges.flatMap { it.token.ids }) - val total = requests.size - emit(UploadState.Started(total)) - requests.forEachIndexed { index, uploadRequest -> - try { - val response = dataSource.upload(uploadRequest) - emit( - getUploadResult(uploadRequest.resource.resourceType, response, token, total, index + 1), - ) - } catch (e: Exception) { - Timber.e(e) - emit(UploadState.Failure(ResourceSyncException(ResourceType.Bundle, e))) + + val successfulResponses = mutableListOf() + + for (uploadRequest in requests) { + when (val result = attemptUpload(uploadRequest)) { + is UploadAttempt.Success -> successfulResponses.add(result.resource) + is UploadAttempt.Failure -> return UploadSyncResult.Failure(result.exception, token) } } + + return UploadSyncResult.Success(token, successfulResponses) } - private fun getUploadResult( - requestResourceType: ResourceType, - response: Resource, - localChangeToken: LocalChangeToken, - total: Int, - completed: Int, - ) = - when { - response is Bundle && response.type == Bundle.BundleType.TRANSACTIONRESPONSE -> { - UploadState.Success(localChangeToken, response, total, completed) - } - response is OperationOutcome && response.issue.isNotEmpty() -> { - UploadState.Failure( - ResourceSyncException( - requestResourceType, - FHIRException(response.issueFirstRep.diagnostics), - ), - ) - } - else -> { - UploadState.Failure( - ResourceSyncException( - requestResourceType, - FHIRException("Unknown response for ${response.resourceType}"), - ), - ) + private suspend fun attemptUpload(uploadRequest: UploadRequest): UploadAttempt { + return try { + val response = dataSource.upload(uploadRequest) + when { + response is Bundle && response.type == Bundle.BundleType.TRANSACTIONRESPONSE -> + UploadAttempt.Success(response) + response is OperationOutcome && response.issue.isNotEmpty() -> + UploadAttempt.Failure( + ResourceSyncException( + uploadRequest.resource.resourceType, + FHIRException(response.issueFirstRep.diagnostics), + ), + ) + else -> + UploadAttempt.Failure( + ResourceSyncException( + uploadRequest.resource.resourceType, + FHIRException("Unknown response for ${uploadRequest.resource.resourceType}"), + ), + ) } + } catch (e: Exception) { + Timber.e(e) + UploadAttempt.Failure(ResourceSyncException(ResourceType.Bundle, e)) } -} + } + + private sealed class UploadAttempt { + data class Success(val resource: Resource) : UploadAttempt() -internal sealed class UploadState { - data class Started(val total: Int) : UploadState() + data class Failure(val exception: ResourceSyncException) : UploadAttempt() + } +} +sealed class UploadSyncResult { data class Success( val localChangeToken: LocalChangeToken, - val resource: Resource, - val total: Int, - val completed: Int, - ) : UploadState() + val resources: List, + ) : UploadSyncResult() - data class Failure(val syncError: ResourceSyncException) : UploadState() + data class Failure(val syncError: ResourceSyncException, val localChangeToken: LocalChangeToken) : + UploadSyncResult() } diff --git a/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt b/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt index b27519565e..b4946f6bdc 100644 --- a/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt +++ b/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt @@ -31,7 +31,9 @@ import com.google.android.fhir.sync.DownloadWorkManager import com.google.android.fhir.sync.download.BundleDownloadRequest import com.google.android.fhir.sync.download.DownloadRequest import com.google.android.fhir.sync.download.UrlDownloadRequest +import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.sync.upload.request.BundleUploadRequest import com.google.android.fhir.sync.upload.request.UploadRequest import com.google.common.truth.Truth.assertThat @@ -43,6 +45,7 @@ import java.util.LinkedList import kotlin.streams.toList import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.Meta import org.hl7.fhir.r4.model.Patient @@ -149,8 +152,8 @@ object TestFhirEngineImpl : FhirEngine { override suspend fun syncUpload( localChangesFetchMode: LocalChangesFetchMode, - upload: suspend (List) -> Flow>, - ) = upload(getLocalChanges(ResourceType.Patient, "123")).collect() + upload: suspend (List) -> UploadSyncResult, + ): Flow = flow { upload(getLocalChanges(ResourceType.Patient, "123")) } override suspend fun syncDownload( conflictResolver: ConflictResolver, diff --git a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt index fbf400007e..7fc08e47e3 100644 --- a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt +++ b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt @@ -30,13 +30,14 @@ import com.google.android.fhir.search.search import com.google.android.fhir.sync.AcceptLocalConflictResolver import com.google.android.fhir.sync.AcceptRemoteConflictResolver import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.testing.assertResourceEquals import com.google.android.fhir.testing.assertResourceNotEquals import com.google.android.fhir.testing.readFromFile import com.google.common.truth.Truth.assertThat import java.util.Date import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.runBlocking import org.hl7.fhir.exceptions.FHIRException @@ -314,12 +315,15 @@ class FhirEngineImplTest { @Test fun syncUpload_uploadLocalChange() = runBlocking { val localChanges = mutableListOf() - fhirEngine.syncUpload(LocalChangesFetchMode.AllChanges) { - flow { + fhirEngine + .syncUpload(LocalChangesFetchMode.AllChanges) { localChanges.addAll(it) - emit(LocalChangeToken(it.flatMap { it.token.ids }) to TEST_PATIENT_1) + UploadSyncResult.Success( + LocalChangeToken(it.flatMap { it.token.ids }), + listOf(), + ) } - } + .collect() assertThat(localChanges).hasSize(1) with(localChanges[0]) { diff --git a/engine/src/test/java/com/google/android/fhir/sync/upload/UploaderImplTest.kt b/engine/src/test/java/com/google/android/fhir/sync/upload/UploaderTest.kt similarity index 77% rename from engine/src/test/java/com/google/android/fhir/sync/upload/UploaderImplTest.kt rename to engine/src/test/java/com/google/android/fhir/sync/upload/UploaderTest.kt index f991679c6a..ed82ce5b26 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/upload/UploaderImplTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/upload/UploaderTest.kt @@ -25,8 +25,8 @@ import com.google.android.fhir.toLocalChange import com.google.common.truth.Truth.assertThat import java.net.ConnectException import java.time.Instant -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.HumanName import org.hl7.fhir.r4.model.OperationOutcome @@ -37,31 +37,18 @@ import org.junit.runner.RunWith import org.robolectric.RobolectricTestRunner @RunWith(RobolectricTestRunner::class) -class UploaderImplTest { +class UploaderTest { @Test - fun `upload should start`() = runBlocking { - val result = Uploader(BundleDataSource { Bundle() }).upload(localChanges).toList() - - assertThat(result.first()).isInstanceOf(UploadState.Started::class.java) - } - - @Test - fun `upload should succeed if response is transaction response`() = runBlocking { + fun `upload should succeed if response is transaction response`() = runTest { val result = Uploader( BundleDataSource { Bundle().apply { type = Bundle.BundleType.TRANSACTIONRESPONSE } }, ) .upload(localChanges) - .toList() - - assertThat(result).hasSize(2) - assertThat(result.first()).isInstanceOf(UploadState.Started::class.java) - assertThat(result.last()).isInstanceOf(UploadState.Success::class.java) - val success = result.last() as UploadState.Success - assertThat(success.total).isEqualTo(1) - assertThat(success.completed).isEqualTo(1) + assertThat(result).isInstanceOf(UploadSyncResult.Success::class.java) + with(result as UploadSyncResult.Success) { assertThat(resources).hasSize(1) } } @Test @@ -81,10 +68,8 @@ class UploaderImplTest { }, ) .upload(localChanges) - .toList() - assertThat(result).hasSize(2) - assertThat(result.last()).isInstanceOf(UploadState.Failure::class.java) + assertThat(result).isInstanceOf(UploadSyncResult.Failure::class.java) } @Test @@ -94,10 +79,8 @@ class UploaderImplTest { BundleDataSource { OperationOutcome() }, ) .upload(localChanges) - .toList() - assertThat(result).hasSize(2) - assertThat(result.last()).isInstanceOf(UploadState.Failure::class.java) + assertThat(result).isInstanceOf(UploadSyncResult.Failure::class.java) } @Test @@ -108,10 +91,8 @@ class UploaderImplTest { BundleDataSource { Bundle().apply { type = Bundle.BundleType.SEARCHSET } }, ) .upload(localChanges) - .toList() - assertThat(result).hasSize(2) - assertThat(result.last()).isInstanceOf(UploadState.Failure::class.java) + assertThat(result).isInstanceOf(UploadSyncResult.Failure::class.java) } @Test @@ -121,10 +102,8 @@ class UploaderImplTest { BundleDataSource { throw ConnectException("Failed to connect to server.") }, ) .upload(localChanges) - .toList() - assertThat(result).hasSize(2) - assertThat(result.last()).isInstanceOf(UploadState.Failure::class.java) + assertThat(result).isInstanceOf(UploadSyncResult.Failure::class.java) } companion object { From b0f9ad921aae578716d992d534547afabe0d836e Mon Sep 17 00:00:00 2001 From: omarismail Date: Mon, 18 Sep 2023 16:43:47 +0100 Subject: [PATCH 04/11] add tests --- .../android/fhir/impl/FhirEngineImplTest.kt | 18 ++-- .../android/fhir/sync/FhirSynchronizerTest.kt | 83 ++++++++----------- 2 files changed, 48 insertions(+), 53 deletions(-) diff --git a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt index 7fc08e47e3..dc415e7ae8 100644 --- a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt +++ b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt @@ -29,6 +29,7 @@ import com.google.android.fhir.search.LOCAL_LAST_UPDATED_PARAM import com.google.android.fhir.search.search import com.google.android.fhir.sync.AcceptLocalConflictResolver import com.google.android.fhir.sync.AcceptRemoteConflictResolver +import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangesFetchMode import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.testing.assertResourceEquals @@ -37,7 +38,6 @@ import com.google.android.fhir.testing.readFromFile import com.google.common.truth.Truth.assertThat import java.util.Date import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.runBlocking import org.hl7.fhir.exceptions.FHIRException @@ -315,6 +315,8 @@ class FhirEngineImplTest { @Test fun syncUpload_uploadLocalChange() = runBlocking { val localChanges = mutableListOf() + val emittedProgress = mutableListOf() + fhirEngine .syncUpload(LocalChangesFetchMode.AllChanges) { localChanges.addAll(it) @@ -323,15 +325,19 @@ class FhirEngineImplTest { listOf(), ) } - .collect() + .collect { emittedProgress.add(it) } assertThat(localChanges).hasSize(1) with(localChanges[0]) { - assertThat(this.resourceType).isEqualTo(ResourceType.Patient.toString()) - assertThat(this.resourceId).isEqualTo(TEST_PATIENT_1.id) - assertThat(this.type).isEqualTo(Type.INSERT) - assertThat(this.payload).isEqualTo(services.parser.encodeResourceToString(TEST_PATIENT_1)) + assertThat(resourceType).isEqualTo(ResourceType.Patient.toString()) + assertThat(resourceId).isEqualTo(TEST_PATIENT_1.id) + assertThat(type).isEqualTo(Type.INSERT) + assertThat(payload).isEqualTo(services.parser.encodeResourceToString(TEST_PATIENT_1)) } + + assertThat(emittedProgress).hasSize(2) + assertThat(emittedProgress.first()).isEqualTo(FetchProgress(1, 1)) + assertThat(emittedProgress.last()).isEqualTo(FetchProgress(0, 1)) } @Test diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index af2cf87f11..fea3a07de4 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -20,7 +20,7 @@ import androidx.test.core.app.ApplicationProvider import com.google.android.fhir.LocalChangeToken import com.google.android.fhir.sync.download.DownloadState import com.google.android.fhir.sync.download.Downloader -import com.google.android.fhir.sync.upload.UploadState +import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.sync.upload.Uploader import com.google.android.fhir.testing.TestFhirEngineImpl import com.google.common.truth.Truth.assertThat @@ -30,7 +30,6 @@ import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.launch import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.runTest -import org.hl7.fhir.r4.model.Patient import org.hl7.fhir.r4.model.ResourceType import org.junit.Assert.assertThrows import org.junit.Before @@ -42,6 +41,7 @@ import org.mockito.MockitoAnnotations import org.mockito.kotlin.any import org.robolectric.RobolectricTestRunner +@OptIn(ExperimentalCoroutinesApi::class) @RunWith(RobolectricTestRunner::class) class FhirSynchronizerTest { @@ -78,59 +78,50 @@ class FhirSynchronizerTest { assertThat(exception.localizedMessage).isEqualTo("Already subscribed to a flow") } - @OptIn(ExperimentalCoroutinesApi::class) @Test fun `synchronize should return Success on successful download and upload`() = runTest(UnconfinedTestDispatcher()) { `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) - `when`(uploader.upload(any())) - .thenReturn( - flowOf( - UploadState.Success( - LocalChangeToken(listOf()), - Patient(), - 1, - 1, - ), - ), - ) + `when`(uploader.upload(any())) + .thenReturn( + UploadSyncResult.Success( + LocalChangeToken(listOf()), + listOf(), + ), + ) - val testFlow = MutableSharedFlow() - fhirSynchronizer.subscribe(testFlow) + val testFlow = MutableSharedFlow() + fhirSynchronizer.subscribe(testFlow) - val emittedValues = mutableListOf() - backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + val emittedValues = mutableListOf() + backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } - val result = fhirSynchronizer.synchronize() + val result = fhirSynchronizer.synchronize() assertThat(emittedValues) - .containsExactly( - SyncJobStatus.Started, - SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), - SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), - SyncJobStatus.Finished, - ) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), + SyncJobStatus.Finished, + ) - assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java) - } + assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java) + } - @OptIn(ExperimentalCoroutinesApi::class) @Test fun `synchronize should return Failed on failed download`() = runTest(UnconfinedTestDispatcher()) { val error = ResourceSyncException(ResourceType.Patient, Exception("Download error")) - `when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error))) - `when`(uploader.upload(any())) - .thenReturn( - flowOf( - UploadState.Success( - LocalChangeToken(listOf()), - Patient(), - 1, - 1, - ), - ), - ) + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error))) + `when`(uploader.upload(any())) + .thenReturn( + UploadSyncResult.Success( + LocalChangeToken(listOf()), + listOf(), + ), + ) + val testFlow = MutableSharedFlow() fhirSynchronizer.subscribe(testFlow) @@ -146,20 +137,18 @@ class FhirSynchronizerTest { SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), SyncJobStatus.Failed(exceptions = listOf(error)), ) - assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) - assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) - } - @OptIn(ExperimentalCoroutinesApi::class) + assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) + assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) + } + @Test fun `synchronize should return Failed on failed upload`() = runTest(UnconfinedTestDispatcher()) { `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) val error = ResourceSyncException(ResourceType.Patient, Exception("Upload error")) `when`(uploader.upload(any())) - .thenReturn( - flowOf(UploadState.Failure(error)), - ) + .thenReturn(UploadSyncResult.Failure(error, LocalChangeToken(listOf()))) val testFlow = MutableSharedFlow() fhirSynchronizer.subscribe(testFlow) From cb3b8029a1b87417ed0e1c60b030f75636050ff7 Mon Sep 17 00:00:00 2001 From: omarismail Date: Tue, 26 Sep 2023 09:28:30 +0100 Subject: [PATCH 05/11] spotless --- .../android/fhir/sync/FhirSynchronizerTest.kt | 63 +++++++++---------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index fea3a07de4..85a55730a8 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -82,46 +82,45 @@ class FhirSynchronizerTest { fun `synchronize should return Success on successful download and upload`() = runTest(UnconfinedTestDispatcher()) { `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) - `when`(uploader.upload(any())) - .thenReturn( - UploadSyncResult.Success( - LocalChangeToken(listOf()), - listOf(), - ), - ) + `when`(uploader.upload(any())) + .thenReturn( + UploadSyncResult.Success( + LocalChangeToken(listOf()), + listOf(), + ), + ) - val testFlow = MutableSharedFlow() - fhirSynchronizer.subscribe(testFlow) + val testFlow = MutableSharedFlow() + fhirSynchronizer.subscribe(testFlow) - val emittedValues = mutableListOf() - backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + val emittedValues = mutableListOf() + backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } - val result = fhirSynchronizer.synchronize() + val result = fhirSynchronizer.synchronize() assertThat(emittedValues) - .containsExactly( - SyncJobStatus.Started, - SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), - SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), - SyncJobStatus.Finished, - ) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), + SyncJobStatus.Finished, + ) - assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java) - } + assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java) + } @Test fun `synchronize should return Failed on failed download`() = runTest(UnconfinedTestDispatcher()) { val error = ResourceSyncException(ResourceType.Patient, Exception("Download error")) - `when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error))) - `when`(uploader.upload(any())) - .thenReturn( - UploadSyncResult.Success( - LocalChangeToken(listOf()), - listOf(), - ), - ) - + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error))) + `when`(uploader.upload(any())) + .thenReturn( + UploadSyncResult.Success( + LocalChangeToken(listOf()), + listOf(), + ), + ) val testFlow = MutableSharedFlow() fhirSynchronizer.subscribe(testFlow) @@ -138,9 +137,9 @@ class FhirSynchronizerTest { SyncJobStatus.Failed(exceptions = listOf(error)), ) - assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) - assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) - } + assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) + assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) + } @Test fun `synchronize should return Failed on failed upload`() = From 30043df661593d1d58cef5037641b132e6e78bd7 Mon Sep 17 00:00:00 2001 From: omarismail Date: Tue, 26 Sep 2023 10:05:12 +0100 Subject: [PATCH 06/11] fix tests --- .../android/fhir/sync/upload/ResourceConsolidator.kt | 6 +++++- .../java/com/google/android/fhir/testing/Utilities.kt | 8 +++++++- .../com/google/android/fhir/sync/FhirSynchronizerTest.kt | 4 ++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt b/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt index eac5d48105..f5f3c8c7f7 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt @@ -52,7 +52,11 @@ internal class DefaultResourceConsolidator(private val database: Database) : Res } } } - is UploadSyncResult.Failure -> {} + is UploadSyncResult.Failure -> { + /* For now, do nothing (we do not delete the local changes from the database as they were + not uploaded successfully. In the future, add consolidation required if upload fails. + */ + } } private suspend fun updateVersionIdAndLastUpdated(bundle: Bundle) { diff --git a/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt b/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt index b4946f6bdc..2b7c538c43 100644 --- a/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt +++ b/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt @@ -153,7 +153,13 @@ object TestFhirEngineImpl : FhirEngine { override suspend fun syncUpload( localChangesFetchMode: LocalChangesFetchMode, upload: suspend (List) -> UploadSyncResult, - ): Flow = flow { upload(getLocalChanges(ResourceType.Patient, "123")) } + ): Flow = flow { + emit(FetchProgress(1, 1)) + when (upload(getLocalChanges(ResourceType.Patient, "123"))) { + is UploadSyncResult.Success -> emit(FetchProgress(0, 1)) + is UploadSyncResult.Failure -> emit(FetchProgress(1, 1)) + } + } override suspend fun syncDownload( conflictResolver: ConflictResolver, diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index 85a55730a8..7b0f020c6f 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -102,6 +102,7 @@ class FhirSynchronizerTest { .containsExactly( SyncJobStatus.Started, SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0), SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), SyncJobStatus.Finished, ) @@ -133,6 +134,7 @@ class FhirSynchronizerTest { assertThat(emittedValues) .containsExactly( SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0), SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), SyncJobStatus.Failed(exceptions = listOf(error)), ) @@ -161,6 +163,8 @@ class FhirSynchronizerTest { .containsExactly( SyncJobStatus.Started, SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0), + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0), SyncJobStatus.Failed(exceptions = listOf(error)), ) assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) From c9a2be5f784de8e1e9aaed179a376b07bd747817 Mon Sep 17 00:00:00 2001 From: omarismail Date: Tue, 26 Sep 2023 11:23:35 +0100 Subject: [PATCH 07/11] take care of continue sync --- .../android/fhir/impl/FhirEngineImpl.kt | 18 +++++++++++++--- .../android/fhir/impl/FhirEngineImplTest.kt | 21 ++++++++++++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt index 069a7d7403..fa1021d0c1 100644 --- a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt @@ -133,10 +133,22 @@ internal class FhirEngineImpl(private val database: Database, private val contex val localChangeFetcher = LocalChangeFetcherFactory.byMode(localChangesFetchMode, database) emit(FetchProgress(localChangeFetcher.total, localChangeFetcher.total)) - while (localChangeFetcher.hasNext()) { - val uploadSyncResult = upload(localChangeFetcher.next()) + + var continueSync = true + while (continueSync && localChangeFetcher.hasNext()) { + val localChanges = localChangeFetcher.next() + val uploadSyncResult = upload(localChanges) + resourceConsolidator.consolidate(uploadSyncResult) - emit(localChangeFetcher.getProgress()) + + continueSync = + when (uploadSyncResult) { + is UploadSyncResult.Success -> { + emit(localChangeFetcher.getProgress()) + true + } + is UploadSyncResult.Failure -> false + } } } } diff --git a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt index dc415e7ae8..0524780b8a 100644 --- a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt +++ b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt @@ -29,6 +29,7 @@ import com.google.android.fhir.search.LOCAL_LAST_UPDATED_PARAM import com.google.android.fhir.search.search import com.google.android.fhir.sync.AcceptLocalConflictResolver import com.google.android.fhir.sync.AcceptRemoteConflictResolver +import com.google.android.fhir.sync.ResourceSyncException import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangesFetchMode import com.google.android.fhir.sync.upload.UploadSyncResult @@ -40,6 +41,7 @@ import java.util.Date import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest import org.hl7.fhir.exceptions.FHIRException import org.hl7.fhir.r4.model.Address import org.hl7.fhir.r4.model.CanonicalType @@ -313,7 +315,7 @@ class FhirEngineImplTest { } @Test - fun syncUpload_uploadLocalChange() = runBlocking { + fun syncUpload_uploadLocalChange_success() = runTest { val localChanges = mutableListOf() val emittedProgress = mutableListOf() @@ -340,6 +342,23 @@ class FhirEngineImplTest { assertThat(emittedProgress.last()).isEqualTo(FetchProgress(0, 1)) } + @Test + fun syncUpload_uploadLocalChange_failure() = runBlocking { + val emittedProgress = mutableListOf() + + fhirEngine + .syncUpload(LocalChangesFetchMode.AllChanges) { + UploadSyncResult.Failure( + ResourceSyncException(ResourceType.Patient, FHIRException("Did not work")), + LocalChangeToken(it.flatMap { it.token.ids }), + ) + } + .collect { emittedProgress.add(it) } + + assertThat(emittedProgress).hasSize(1) + assertThat(emittedProgress.first()).isEqualTo(FetchProgress(1, 1)) + } + @Test fun syncDownload_downloadResources() = runBlocking { fhirEngine.syncDownload(AcceptLocalConflictResolver) { flowOf((listOf((TEST_PATIENT_2)))) } From ee04361697bd43236ca10384468ad3f22388dcf1 Mon Sep 17 00:00:00 2001 From: omarismail Date: Wed, 4 Oct 2023 14:33:44 +0100 Subject: [PATCH 08/11] simplify simplify simplify --- .../com/google/android/fhir/FhirEngine.kt | 4 +-- .../android/fhir/impl/FhirEngineImpl.kt | 34 ++++++++++++------- .../android/fhir/sync/FhirSynchronizer.kt | 28 ++++++--------- .../fhir/sync/upload/LocalChangeFetcher.kt | 14 ++++---- .../android/fhir/sync/upload/Uploader.kt | 22 ++++++------ .../google/android/fhir/testing/Utilities.kt | 12 +++---- .../android/fhir/impl/FhirEngineImplTest.kt | 19 ++++++----- .../android/fhir/sync/FhirSynchronizerTest.kt | 1 - .../AllChangesLocalChangeFetcherTest.kt | 6 ++-- 9 files changed, 72 insertions(+), 68 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt index 8a7315ebd2..4d476d165e 100644 --- a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt +++ b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt @@ -19,8 +19,8 @@ package com.google.android.fhir import com.google.android.fhir.db.ResourceNotFoundException import com.google.android.fhir.search.Search import com.google.android.fhir.sync.ConflictResolver -import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.SyncUploadProgress import com.google.android.fhir.sync.upload.UploadSyncResult import java.time.OffsetDateTime import kotlinx.coroutines.flow.Flow @@ -58,7 +58,7 @@ interface FhirEngine { suspend fun syncUpload( localChangesFetchMode: LocalChangesFetchMode, upload: (suspend (List) -> UploadSyncResult), - ): Flow + ): Flow /** * Synchronizes the [download] result in the database. The database will be updated to reflect the diff --git a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt index fa1021d0c1..c1519cd4a8 100644 --- a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt @@ -29,9 +29,9 @@ import com.google.android.fhir.search.execute import com.google.android.fhir.sync.ConflictResolver import com.google.android.fhir.sync.Resolved import com.google.android.fhir.sync.upload.DefaultResourceConsolidator -import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangeFetcherFactory import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.SyncUploadProgress import com.google.android.fhir.sync.upload.UploadSyncResult import java.time.OffsetDateTime import kotlinx.coroutines.flow.Flow @@ -128,27 +128,37 @@ internal class FhirEngineImpl(private val database: Database, private val contex override suspend fun syncUpload( localChangesFetchMode: LocalChangesFetchMode, upload: (suspend (List) -> UploadSyncResult), - ): Flow = flow { + ): Flow = flow { val resourceConsolidator = DefaultResourceConsolidator(database) val localChangeFetcher = LocalChangeFetcherFactory.byMode(localChangesFetchMode, database) - emit(FetchProgress(localChangeFetcher.total, localChangeFetcher.total)) + emit( + SyncUploadProgress( + remaining = localChangeFetcher.total, + initialTotal = localChangeFetcher.total, + ), + ) - var continueSync = true - while (continueSync && localChangeFetcher.hasNext()) { + while (localChangeFetcher.hasNext()) { val localChanges = localChangeFetcher.next() val uploadSyncResult = upload(localChanges) resourceConsolidator.consolidate(uploadSyncResult) - - continueSync = - when (uploadSyncResult) { - is UploadSyncResult.Success -> { - emit(localChangeFetcher.getProgress()) - true + when (uploadSyncResult) { + is UploadSyncResult.Success -> emit(localChangeFetcher.getProgress()) + is UploadSyncResult.Failure -> { + with(localChangeFetcher.getProgress()) { + emit( + SyncUploadProgress( + remaining = remaining, + initialTotal = initialTotal, + uploadError = uploadSyncResult.syncError, + ), + ) } - is UploadSyncResult.Failure -> false + break } + } } } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index 4925fad973..8a84b2332e 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -22,7 +22,6 @@ import com.google.android.fhir.FhirEngine import com.google.android.fhir.sync.download.DownloadState import com.google.android.fhir.sync.download.Downloader import com.google.android.fhir.sync.upload.LocalChangesFetchMode -import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.sync.upload.Uploader import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow @@ -121,23 +120,16 @@ internal class FhirSynchronizer( val exceptions = mutableListOf() val localChangesFetchMode = LocalChangesFetchMode.AllChanges fhirEngine - .syncUpload(localChangesFetchMode) { - when (val result = uploader.upload(it)) { - is UploadSyncResult.Success -> result - is UploadSyncResult.Failure -> { - exceptions.add(result.syncError) - result - } - } - } - .collect { - setSyncState( - SyncJobStatus.InProgress( - SyncOperation.UPLOAD, - it.initialTotal, - it.initialTotal - it.remaining, - ), - ) + .syncUpload(localChangesFetchMode) { uploader.upload(it) } + .collect { progress -> + progress.uploadError?.let { exceptions.add(it) } + ?: setSyncState( + SyncJobStatus.InProgress( + SyncOperation.UPLOAD, + progress.initialTotal, + progress.initialTotal - progress.remaining, + ), + ) } return if (exceptions.isEmpty()) { diff --git a/engine/src/main/java/com/google/android/fhir/sync/upload/LocalChangeFetcher.kt b/engine/src/main/java/com/google/android/fhir/sync/upload/LocalChangeFetcher.kt index 34f8eee4d4..8b337963e7 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/upload/LocalChangeFetcher.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/upload/LocalChangeFetcher.kt @@ -18,6 +18,7 @@ package com.google.android.fhir.sync.upload import com.google.android.fhir.LocalChange import com.google.android.fhir.db.Database +import com.google.android.fhir.sync.ResourceSyncException import kotlin.properties.Delegates /** @@ -40,15 +41,16 @@ internal interface LocalChangeFetcher { suspend fun next(): List /** - * Returns [FetchProgress], which contains the remaining changes left to upload and the initial - * total to upload. + * Returns [SyncUploadProgress], which contains the remaining changes left to upload and the + * initial total to upload. */ - suspend fun getProgress(): FetchProgress + suspend fun getProgress(): SyncUploadProgress } -data class FetchProgress( +data class SyncUploadProgress( val remaining: Int, val initialTotal: Int, + val uploadError: ResourceSyncException? = null, ) internal class AllChangesLocalChangeFetcher( @@ -65,8 +67,8 @@ internal class AllChangesLocalChangeFetcher( override suspend fun next(): List = database.getAllLocalChanges() - override suspend fun getProgress(): FetchProgress = - FetchProgress(database.getLocalChangesCount(), total) + override suspend fun getProgress(): SyncUploadProgress = + SyncUploadProgress(database.getLocalChangesCount(), total) } /** Represents the mode in which local changes should be fetched. */ diff --git a/engine/src/main/java/com/google/android/fhir/sync/upload/Uploader.kt b/engine/src/main/java/com/google/android/fhir/sync/upload/Uploader.kt index 51efc538ef..b5a8ee66d8 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/upload/Uploader.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/upload/Uploader.kt @@ -50,30 +50,30 @@ internal class Uploader(private val dataSource: DataSource) { val successfulResponses = mutableListOf() for (uploadRequest in requests) { - when (val result = attemptUpload(uploadRequest)) { - is UploadAttempt.Success -> successfulResponses.add(result.resource) - is UploadAttempt.Failure -> return UploadSyncResult.Failure(result.exception, token) + when (val result = handleUploadRequest(uploadRequest)) { + is UploadRequestResult.Success -> successfulResponses.add(result.resource) + is UploadRequestResult.Failure -> return UploadSyncResult.Failure(result.exception, token) } } return UploadSyncResult.Success(token, successfulResponses) } - private suspend fun attemptUpload(uploadRequest: UploadRequest): UploadAttempt { + private suspend fun handleUploadRequest(uploadRequest: UploadRequest): UploadRequestResult { return try { val response = dataSource.upload(uploadRequest) when { response is Bundle && response.type == Bundle.BundleType.TRANSACTIONRESPONSE -> - UploadAttempt.Success(response) + UploadRequestResult.Success(response) response is OperationOutcome && response.issue.isNotEmpty() -> - UploadAttempt.Failure( + UploadRequestResult.Failure( ResourceSyncException( uploadRequest.resource.resourceType, FHIRException(response.issueFirstRep.diagnostics), ), ) else -> - UploadAttempt.Failure( + UploadRequestResult.Failure( ResourceSyncException( uploadRequest.resource.resourceType, FHIRException("Unknown response for ${uploadRequest.resource.resourceType}"), @@ -82,14 +82,14 @@ internal class Uploader(private val dataSource: DataSource) { } } catch (e: Exception) { Timber.e(e) - UploadAttempt.Failure(ResourceSyncException(ResourceType.Bundle, e)) + UploadRequestResult.Failure(ResourceSyncException(ResourceType.Bundle, e)) } } - private sealed class UploadAttempt { - data class Success(val resource: Resource) : UploadAttempt() + private sealed class UploadRequestResult { + data class Success(val resource: Resource) : UploadRequestResult() - data class Failure(val exception: ResourceSyncException) : UploadAttempt() + data class Failure(val exception: ResourceSyncException) : UploadRequestResult() } } diff --git a/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt b/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt index 2b7c538c43..d24acda000 100644 --- a/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt +++ b/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt @@ -31,8 +31,8 @@ import com.google.android.fhir.sync.DownloadWorkManager import com.google.android.fhir.sync.download.BundleDownloadRequest import com.google.android.fhir.sync.download.DownloadRequest import com.google.android.fhir.sync.download.UrlDownloadRequest -import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.SyncUploadProgress import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.sync.upload.request.BundleUploadRequest import com.google.android.fhir.sync.upload.request.UploadRequest @@ -153,11 +153,11 @@ object TestFhirEngineImpl : FhirEngine { override suspend fun syncUpload( localChangesFetchMode: LocalChangesFetchMode, upload: suspend (List) -> UploadSyncResult, - ): Flow = flow { - emit(FetchProgress(1, 1)) - when (upload(getLocalChanges(ResourceType.Patient, "123"))) { - is UploadSyncResult.Success -> emit(FetchProgress(0, 1)) - is UploadSyncResult.Failure -> emit(FetchProgress(1, 1)) + ): Flow = flow { + emit(SyncUploadProgress(1, 1)) + when (val result = upload(getLocalChanges(ResourceType.Patient, "123"))) { + is UploadSyncResult.Success -> emit(SyncUploadProgress(0, 1)) + is UploadSyncResult.Failure -> emit(SyncUploadProgress(1, 1, result.syncError)) } } diff --git a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt index 0524780b8a..2229cb0f9d 100644 --- a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt +++ b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt @@ -30,8 +30,8 @@ import com.google.android.fhir.search.search import com.google.android.fhir.sync.AcceptLocalConflictResolver import com.google.android.fhir.sync.AcceptRemoteConflictResolver import com.google.android.fhir.sync.ResourceSyncException -import com.google.android.fhir.sync.upload.FetchProgress import com.google.android.fhir.sync.upload.LocalChangesFetchMode +import com.google.android.fhir.sync.upload.SyncUploadProgress import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.testing.assertResourceEquals import com.google.android.fhir.testing.assertResourceNotEquals @@ -317,7 +317,7 @@ class FhirEngineImplTest { @Test fun syncUpload_uploadLocalChange_success() = runTest { val localChanges = mutableListOf() - val emittedProgress = mutableListOf() + val emittedProgress = mutableListOf() fhirEngine .syncUpload(LocalChangesFetchMode.AllChanges) { @@ -338,25 +338,26 @@ class FhirEngineImplTest { } assertThat(emittedProgress).hasSize(2) - assertThat(emittedProgress.first()).isEqualTo(FetchProgress(1, 1)) - assertThat(emittedProgress.last()).isEqualTo(FetchProgress(0, 1)) + assertThat(emittedProgress.first()).isEqualTo(SyncUploadProgress(1, 1)) + assertThat(emittedProgress.last()).isEqualTo(SyncUploadProgress(0, 1)) } @Test fun syncUpload_uploadLocalChange_failure() = runBlocking { - val emittedProgress = mutableListOf() - + val emittedProgress = mutableListOf() + val uploadError = ResourceSyncException(ResourceType.Patient, FHIRException("Did not work")) fhirEngine .syncUpload(LocalChangesFetchMode.AllChanges) { UploadSyncResult.Failure( - ResourceSyncException(ResourceType.Patient, FHIRException("Did not work")), + uploadError, LocalChangeToken(it.flatMap { it.token.ids }), ) } .collect { emittedProgress.add(it) } - assertThat(emittedProgress).hasSize(1) - assertThat(emittedProgress.first()).isEqualTo(FetchProgress(1, 1)) + assertThat(emittedProgress).hasSize(2) + assertThat(emittedProgress.first()).isEqualTo(SyncUploadProgress(1, 1)) + assertThat(emittedProgress.last()).isEqualTo(SyncUploadProgress(1, 1, uploadError)) } @Test diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index 5f701477f9..ebfb432d46 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -141,7 +141,6 @@ class FhirSynchronizerTest { SyncJobStatus.Started, SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0), - SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0), SyncJobStatus.Failed(exceptions = listOf(error)), ) assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) diff --git a/engine/src/test/java/com/google/android/fhir/sync/upload/AllChangesLocalChangeFetcherTest.kt b/engine/src/test/java/com/google/android/fhir/sync/upload/AllChangesLocalChangeFetcherTest.kt index b14637f446..89a334cf20 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/upload/AllChangesLocalChangeFetcherTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/upload/AllChangesLocalChangeFetcherTest.kt @@ -64,18 +64,18 @@ class AllChangesLocalChangeFetcherTest { @Test fun `getProgress when all local changes are removed`() = runTest { database.deleteUpdates(listOf(TEST_PATIENT_1, TEST_PATIENT_2)) - assertThat(fetcher.getProgress()).isEqualTo(FetchProgress(0, 2)) + assertThat(fetcher.getProgress()).isEqualTo(SyncUploadProgress(0, 2)) } @Test fun `getProgress when half the local changes are removed`() = runTest { database.deleteUpdates(listOf(TEST_PATIENT_1)) - assertThat(fetcher.getProgress()).isEqualTo(FetchProgress(1, 2)) + assertThat(fetcher.getProgress()).isEqualTo(SyncUploadProgress(1, 2)) } @Test fun `getProgress when none of the local changes are removed`() = runTest { - assertThat(fetcher.getProgress()).isEqualTo(FetchProgress(2, 2)) + assertThat(fetcher.getProgress()).isEqualTo(SyncUploadProgress(2, 2)) } companion object { From bb53d72cd72d6cde7ac7a326fb4701aeac172ba0 Mon Sep 17 00:00:00 2001 From: omarismail Date: Wed, 4 Oct 2023 15:18:39 +0100 Subject: [PATCH 09/11] update kdoc --- .../main/java/com/google/android/fhir/FhirEngine.kt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt index 4d476d165e..cc0a971721 100644 --- a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt +++ b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt @@ -51,9 +51,15 @@ interface FhirEngine { suspend fun search(search: Search): List> /** - * Synchronizes the [upload] result in the database. [upload] operation may result in multiple - * calls to the server to upload the data. Result of each call will be emitted by [upload] and the - * api caller should [Flow.collect] it. + * Synchronizes the upload results in the database. + * + * The [upload] function may initiate multiple server calls. Each call's result can then be used + * to emit [UploadSyncResult]. The caller should collect these results using [Flow.collect]. + * + * @param localChangesFetchMode Specifies the mode to fetch local changes. + * @param upload A suspend function that takes a list of [LocalChange] and returns an + * [UploadSyncResult]. + * @return A [Flow] that emits the progress of the synchronization process. */ suspend fun syncUpload( localChangesFetchMode: LocalChangesFetchMode, From fcf55ac7a0d448c565734fae42e35243564e2275 Mon Sep 17 00:00:00 2001 From: omarismail Date: Thu, 5 Oct 2023 13:27:10 +0100 Subject: [PATCH 10/11] simplify further --- .../main/java/com/google/android/fhir/sync/FhirSynchronizer.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index 8a84b2332e..d196183d0f 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -120,7 +120,7 @@ internal class FhirSynchronizer( val exceptions = mutableListOf() val localChangesFetchMode = LocalChangesFetchMode.AllChanges fhirEngine - .syncUpload(localChangesFetchMode) { uploader.upload(it) } + .syncUpload(localChangesFetchMode, uploader::upload) .collect { progress -> progress.uploadError?.let { exceptions.add(it) } ?: setSyncState( From 225b0c5b83656f468b8b4cbc55eb20a24a2820ba Mon Sep 17 00:00:00 2001 From: omarismail Date: Thu, 5 Oct 2023 13:29:13 +0100 Subject: [PATCH 11/11] spotless --- .../android/fhir/sync/FhirSynchronizer.kt | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index d196183d0f..acac045a14 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -119,18 +119,16 @@ internal class FhirSynchronizer( private suspend fun upload(): SyncResult { val exceptions = mutableListOf() val localChangesFetchMode = LocalChangesFetchMode.AllChanges - fhirEngine - .syncUpload(localChangesFetchMode, uploader::upload) - .collect { progress -> - progress.uploadError?.let { exceptions.add(it) } - ?: setSyncState( - SyncJobStatus.InProgress( - SyncOperation.UPLOAD, - progress.initialTotal, - progress.initialTotal - progress.remaining, - ), - ) - } + fhirEngine.syncUpload(localChangesFetchMode, uploader::upload).collect { progress -> + progress.uploadError?.let { exceptions.add(it) } + ?: setSyncState( + SyncJobStatus.InProgress( + SyncOperation.UPLOAD, + progress.initialTotal, + progress.initialTotal - progress.remaining, + ), + ) + } return if (exceptions.isEmpty()) { SyncResult.Success()