From 5e52332b38b4a9a18c03ae14033148d6ec196a6d Mon Sep 17 00:00:00 2001 From: Omar Ismail <44980219+omarismail94@users.noreply.github.com> Date: Tue, 26 Sep 2023 17:55:34 +0100 Subject: [PATCH] Clean up SyncJobStatus and add FhirSynchronizer test (#2184) * Clean up SyncJobStatus and add FhirSynchronizer test * remove changes in MAVM * refactor more --- .../android/fhir/sync/FhirSyncWorker.kt | 35 ++-- .../android/fhir/sync/FhirSynchronizer.kt | 26 +-- .../google/android/fhir/sync/SyncJobStatus.kt | 7 +- .../android/fhir/sync/FhirSynchronizerTest.kt | 158 ++++++++++++++++++ 4 files changed, 180 insertions(+), 46 deletions(-) create mode 100644 engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt index e189daf48a..fe5a78206b 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt @@ -33,7 +33,6 @@ import java.time.OffsetDateTime import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel -import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.launch import timber.log.Timber @@ -66,11 +65,18 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter ), ) - val flow = MutableSharedFlow() + val synchronizer = + FhirSynchronizer( + applicationContext, + getFhirEngine(), + Uploader(dataSource), + DownloaderImpl(dataSource, getDownloadWorkManager()), + getConflictResolver(), + ) val job = CoroutineScope(Dispatchers.IO).launch { - flow.collect { + synchronizer.syncState.collect { // now send Progress to work manager so caller app can listen setProgress(buildWorkData(it)) @@ -80,17 +86,7 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter } } - Timber.v("Subscribed to flow for progress") - val result = - FhirSynchronizer( - applicationContext, - getFhirEngine(), - Uploader(dataSource), - DownloaderImpl(dataSource, getDownloadWorkManager()), - getConflictResolver(), - ) - .apply { subscribe(flow) } - .synchronize() + val result = synchronizer.synchronize() val output = buildWorkData(result) // await/join is needed to collect states completely @@ -105,15 +101,10 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter * [RetryConfiguration.maxRetries] set by user. */ val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0) - return when { - result is SyncJobStatus.Finished -> { - Result.success(output) - } - retries > runAttemptCount -> { - Result.retry() - } + return when (result) { + is SyncJobStatus.Finished -> Result.success(output) else -> { - Result.failure(output) + if (retries > runAttemptCount) Result.retry() else Result.failure(output) } } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index f2f6de79b0..c878634010 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -26,6 +26,7 @@ import com.google.android.fhir.sync.upload.UploadState import com.google.android.fhir.sync.upload.Uploader import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.flow import org.hl7.fhir.r4.model.ResourceType @@ -52,24 +53,13 @@ internal class FhirSynchronizer( private val downloader: Downloader, private val conflictResolver: ConflictResolver, ) { - private var syncState: MutableSharedFlow? = null - private val datastoreUtil = DatastoreUtil(context) - - private fun isSubscribed(): Boolean { - return syncState != null - } - fun subscribe(flow: MutableSharedFlow) { - if (isSubscribed()) { - throw IllegalStateException("Already subscribed to a flow") - } + private val _syncState = MutableSharedFlow() + val syncState: SharedFlow = _syncState - this.syncState = flow - } + private val datastoreUtil = DatastoreUtil(context) - private suspend fun setSyncState(state: SyncJobStatus) { - syncState?.emit(state) - } + private suspend fun setSyncState(state: SyncJobStatus) = _syncState.emit(state) private suspend fun setSyncState(result: SyncResult): SyncJobStatus { // todo: emit this properly instead of using datastore? @@ -77,7 +67,7 @@ internal class FhirSynchronizer( val state = when (result) { - is SyncResult.Success -> SyncJobStatus.Finished() + is SyncResult.Success -> SyncJobStatus.Finished is SyncResult.Error -> SyncJobStatus.Failed(result.exceptions) } @@ -86,7 +76,7 @@ internal class FhirSynchronizer( } suspend fun synchronize(): SyncJobStatus { - setSyncState(SyncJobStatus.Started()) + setSyncState(SyncJobStatus.Started) return listOf(download(), upload()) .filterIsInstance() @@ -123,7 +113,6 @@ internal class FhirSynchronizer( return if (exceptions.isEmpty()) { SyncResult.Success() } else { - setSyncState(SyncJobStatus.Glitch(exceptions)) SyncResult.Error(exceptions) } } @@ -151,7 +140,6 @@ internal class FhirSynchronizer( return if (exceptions.isEmpty()) { SyncResult.Success() } else { - setSyncState(SyncJobStatus.Glitch(exceptions)) SyncResult.Error(exceptions) } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt b/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt index f567483a68..8c063a28df 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt @@ -22,7 +22,7 @@ sealed class SyncJobStatus { val timestamp: OffsetDateTime = OffsetDateTime.now() /** Sync job has been started on the client but the syncing is not necessarily in progress. */ - class Started : SyncJobStatus() + object Started : SyncJobStatus() /** Syncing in progress with the server. */ data class InProgress( @@ -31,11 +31,8 @@ sealed class SyncJobStatus { val completed: Int = 0, ) : SyncJobStatus() - /** Glitched but sync job is being retried. */ - data class Glitch(val exceptions: List) : SyncJobStatus() - /** Sync job finished successfully. */ - class Finished : SyncJobStatus() + object Finished : SyncJobStatus() /** Sync job failed. */ data class Failed(val exceptions: List) : SyncJobStatus() diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt new file mode 100644 index 0000000000..a280cec4a3 --- /dev/null +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -0,0 +1,158 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.android.fhir.sync + +import androidx.test.core.app.ApplicationProvider +import com.google.android.fhir.LocalChangeToken +import com.google.android.fhir.sync.download.DownloadState +import com.google.android.fhir.sync.download.Downloader +import com.google.android.fhir.sync.upload.UploadState +import com.google.android.fhir.sync.upload.Uploader +import com.google.android.fhir.testing.TestFhirEngineImpl +import com.google.common.truth.Truth.assertThat +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import org.hl7.fhir.r4.model.Patient +import org.hl7.fhir.r4.model.ResourceType +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.mockito.Mock +import org.mockito.Mockito.`when` +import org.mockito.MockitoAnnotations +import org.mockito.kotlin.any +import org.robolectric.RobolectricTestRunner + +@RunWith(RobolectricTestRunner::class) +class FhirSynchronizerTest { + + @Mock private lateinit var uploader: Uploader + + @Mock private lateinit var downloader: Downloader + + @Mock private lateinit var conflictResolver: ConflictResolver + + private lateinit var fhirSynchronizer: FhirSynchronizer + + @Before + fun setUp() { + MockitoAnnotations.openMocks(this) + fhirSynchronizer = + FhirSynchronizer( + ApplicationProvider.getApplicationContext(), + TestFhirEngineImpl, + uploader, + downloader, + conflictResolver, + ) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Success on successful download and upload`() = + runTest(UnconfinedTestDispatcher()) { + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) + `when`(uploader.upload(any())) + .thenReturn( + flowOf( + UploadState.Success( + LocalChangeToken(listOf()), + Patient(), + 1, + 1, + ), + ), + ) + + val emittedValues = mutableListOf() + backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), + SyncJobStatus.Finished, + ) + + assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Failed on failed download`() = + runTest(UnconfinedTestDispatcher()) { + val error = ResourceSyncException(ResourceType.Patient, Exception("Download error")) + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error))) + `when`(uploader.upload(any())) + .thenReturn( + flowOf( + UploadState.Success( + LocalChangeToken(listOf()), + Patient(), + 1, + 1, + ), + ), + ) + + val emittedValues = mutableListOf() + backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), + SyncJobStatus.Failed(exceptions = listOf(error)), + ) + assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) + assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Failed on failed upload`() = + runTest(UnconfinedTestDispatcher()) { + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) + val error = ResourceSyncException(ResourceType.Patient, Exception("Upload error")) + `when`(uploader.upload(any())) + .thenReturn( + flowOf(UploadState.Failure(error)), + ) + + val emittedValues = mutableListOf() + backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.Failed(exceptions = listOf(error)), + ) + assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) + assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) + } +}