From da6aa54ff2a993f147d87d31a0217cf3757d66ed Mon Sep 17 00:00:00 2001 From: omarismail Date: Mon, 18 Sep 2023 18:09:56 +0100 Subject: [PATCH 1/3] Clean up SyncJobStatus and add FhirSynchronizer test --- .../fhir/demo/MainActivityViewModel.kt | 22 ++- .../android/fhir/sync/FhirSynchronizer.kt | 6 +- .../google/android/fhir/sync/SyncJobStatus.kt | 11 +- .../android/fhir/sync/FhirSynchronizerTest.kt | 181 ++++++++++++++++++ 4 files changed, 201 insertions(+), 19 deletions(-) create mode 100644 engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt diff --git a/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt b/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt index b09e223330..168398bc53 100644 --- a/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt +++ b/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt @@ -31,6 +31,7 @@ import com.google.android.fhir.sync.SyncJobStatus import java.time.format.DateTimeFormatter import java.util.concurrent.TimeUnit import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted @@ -48,6 +49,8 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica val pollState: Flow get() = _pollState + private var syncJob: Job? = null + init { viewModelScope.launch { Sync.periodicSync( @@ -55,8 +58,8 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica periodicSyncConfiguration = PeriodicSyncConfiguration( syncConstraints = Constraints.Builder().build(), - repeat = RepeatInterval(interval = 15, timeUnit = TimeUnit.MINUTES) - ) + repeat = RepeatInterval(interval = 15, timeUnit = TimeUnit.MINUTES), + ), ) .shareIn(this, SharingStarted.Eagerly, 10) .collect { _pollState.emit(it) } @@ -64,18 +67,21 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica } fun triggerOneTimeSync() { - viewModelScope.launch { - Sync.oneTimeSync(getApplication()) - .shareIn(this, SharingStarted.Eagerly, 10) - .collect { _pollState.emit(it) } - } + syncJob?.cancel() + + syncJob = + viewModelScope.launch { + Sync.oneTimeSync(getApplication()) + .shareIn(this, SharingStarted.Eagerly, 10) + .collect { _pollState.emit(it) } + } } /** Emits last sync time. */ fun updateLastSyncTimestamp() { val formatter = DateTimeFormatter.ofPattern( - if (DateFormat.is24HourFormat(getApplication())) formatString24 else formatString12 + if (DateFormat.is24HourFormat(getApplication())) formatString24 else formatString12, ) _lastSyncTimestampLiveData.value = Sync.getLastSyncTimestamp(getApplication())?.toLocalDateTime()?.format(formatter) ?: "" diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index f2f6de79b0..de31c208bf 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -77,7 +77,7 @@ internal class FhirSynchronizer( val state = when (result) { - is SyncResult.Success -> SyncJobStatus.Finished() + is SyncResult.Success -> SyncJobStatus.Finished is SyncResult.Error -> SyncJobStatus.Failed(result.exceptions) } @@ -86,7 +86,7 @@ internal class FhirSynchronizer( } suspend fun synchronize(): SyncJobStatus { - setSyncState(SyncJobStatus.Started()) + setSyncState(SyncJobStatus.Started) return listOf(download(), upload()) .filterIsInstance() @@ -123,7 +123,6 @@ internal class FhirSynchronizer( return if (exceptions.isEmpty()) { SyncResult.Success() } else { - setSyncState(SyncJobStatus.Glitch(exceptions)) SyncResult.Error(exceptions) } } @@ -151,7 +150,6 @@ internal class FhirSynchronizer( return if (exceptions.isEmpty()) { SyncResult.Success() } else { - setSyncState(SyncJobStatus.Glitch(exceptions)) SyncResult.Error(exceptions) } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt b/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt index 647dae7ec0..8c063a28df 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/SyncJobStatus.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2022-2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,20 +22,17 @@ sealed class SyncJobStatus { val timestamp: OffsetDateTime = OffsetDateTime.now() /** Sync job has been started on the client but the syncing is not necessarily in progress. */ - class Started : SyncJobStatus() + object Started : SyncJobStatus() /** Syncing in progress with the server. */ data class InProgress( val syncOperation: SyncOperation, val total: Int = 0, - val completed: Int = 0 + val completed: Int = 0, ) : SyncJobStatus() - /** Glitched but sync job is being retried. */ - data class Glitch(val exceptions: List) : SyncJobStatus() - /** Sync job finished successfully. */ - class Finished : SyncJobStatus() + object Finished : SyncJobStatus() /** Sync job failed. */ data class Failed(val exceptions: List) : SyncJobStatus() diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt new file mode 100644 index 0000000000..af2cf87f11 --- /dev/null +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -0,0 +1,181 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.android.fhir.sync + +import androidx.test.core.app.ApplicationProvider +import com.google.android.fhir.LocalChangeToken +import com.google.android.fhir.sync.download.DownloadState +import com.google.android.fhir.sync.download.Downloader +import com.google.android.fhir.sync.upload.UploadState +import com.google.android.fhir.sync.upload.Uploader +import com.google.android.fhir.testing.TestFhirEngineImpl +import com.google.common.truth.Truth.assertThat +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import org.hl7.fhir.r4.model.Patient +import org.hl7.fhir.r4.model.ResourceType +import org.junit.Assert.assertThrows +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.mockito.Mock +import org.mockito.Mockito.`when` +import org.mockito.MockitoAnnotations +import org.mockito.kotlin.any +import org.robolectric.RobolectricTestRunner + +@RunWith(RobolectricTestRunner::class) +class FhirSynchronizerTest { + + @Mock private lateinit var uploader: Uploader + + @Mock private lateinit var downloader: Downloader + + @Mock private lateinit var conflictResolver: ConflictResolver + + private lateinit var fhirSynchronizer: FhirSynchronizer + + @Before + fun setUp() { + MockitoAnnotations.openMocks(this) + fhirSynchronizer = + FhirSynchronizer( + ApplicationProvider.getApplicationContext(), + TestFhirEngineImpl, + uploader, + downloader, + conflictResolver, + ) + } + + @Test + fun `subscribe should throw exception on second subscription`() { + val flow1 = MutableSharedFlow() + val flow2 = MutableSharedFlow() + fhirSynchronizer.subscribe(flow1) + + val exception = + assertThrows(IllegalStateException::class.java) { fhirSynchronizer.subscribe(flow2) } + + assertThat(exception.localizedMessage).isEqualTo("Already subscribed to a flow") + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Success on successful download and upload`() = + runTest(UnconfinedTestDispatcher()) { + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) + `when`(uploader.upload(any())) + .thenReturn( + flowOf( + UploadState.Success( + LocalChangeToken(listOf()), + Patient(), + 1, + 1, + ), + ), + ) + + val testFlow = MutableSharedFlow() + fhirSynchronizer.subscribe(testFlow) + + val emittedValues = mutableListOf() + backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), + SyncJobStatus.Finished, + ) + + assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Failed on failed download`() = + runTest(UnconfinedTestDispatcher()) { + val error = ResourceSyncException(ResourceType.Patient, Exception("Download error")) + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error))) + `when`(uploader.upload(any())) + .thenReturn( + flowOf( + UploadState.Success( + LocalChangeToken(listOf()), + Patient(), + 1, + 1, + ), + ), + ) + + val testFlow = MutableSharedFlow() + fhirSynchronizer.subscribe(testFlow) + + val emittedValues = mutableListOf() + backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1), + SyncJobStatus.Failed(exceptions = listOf(error)), + ) + assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) + assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `synchronize should return Failed on failed upload`() = + runTest(UnconfinedTestDispatcher()) { + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10))) + val error = ResourceSyncException(ResourceType.Patient, Exception("Upload error")) + `when`(uploader.upload(any())) + .thenReturn( + flowOf(UploadState.Failure(error)), + ) + + val testFlow = MutableSharedFlow() + fhirSynchronizer.subscribe(testFlow) + + val emittedValues = mutableListOf() + backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + + val result = fhirSynchronizer.synchronize() + + assertThat(emittedValues) + .containsExactly( + SyncJobStatus.Started, + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10), + SyncJobStatus.Failed(exceptions = listOf(error)), + ) + assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) + assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) + } +} From 3b71a2302cd58c914fdc8b71443a69644741a6b7 Mon Sep 17 00:00:00 2001 From: omarismail Date: Fri, 22 Sep 2023 14:57:51 +0100 Subject: [PATCH 2/3] remove changes in MAVM --- .../fhir/demo/MainActivityViewModel.kt | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt b/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt index 168398bc53..b09e223330 100644 --- a/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt +++ b/demo/src/main/java/com/google/android/fhir/demo/MainActivityViewModel.kt @@ -31,7 +31,6 @@ import com.google.android.fhir.sync.SyncJobStatus import java.time.format.DateTimeFormatter import java.util.concurrent.TimeUnit import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted @@ -49,8 +48,6 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica val pollState: Flow get() = _pollState - private var syncJob: Job? = null - init { viewModelScope.launch { Sync.periodicSync( @@ -58,8 +55,8 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica periodicSyncConfiguration = PeriodicSyncConfiguration( syncConstraints = Constraints.Builder().build(), - repeat = RepeatInterval(interval = 15, timeUnit = TimeUnit.MINUTES), - ), + repeat = RepeatInterval(interval = 15, timeUnit = TimeUnit.MINUTES) + ) ) .shareIn(this, SharingStarted.Eagerly, 10) .collect { _pollState.emit(it) } @@ -67,21 +64,18 @@ class MainActivityViewModel(application: Application) : AndroidViewModel(applica } fun triggerOneTimeSync() { - syncJob?.cancel() - - syncJob = - viewModelScope.launch { - Sync.oneTimeSync(getApplication()) - .shareIn(this, SharingStarted.Eagerly, 10) - .collect { _pollState.emit(it) } - } + viewModelScope.launch { + Sync.oneTimeSync(getApplication()) + .shareIn(this, SharingStarted.Eagerly, 10) + .collect { _pollState.emit(it) } + } } /** Emits last sync time. */ fun updateLastSyncTimestamp() { val formatter = DateTimeFormatter.ofPattern( - if (DateFormat.is24HourFormat(getApplication())) formatString24 else formatString12, + if (DateFormat.is24HourFormat(getApplication())) formatString24 else formatString12 ) _lastSyncTimestampLiveData.value = Sync.getLastSyncTimestamp(getApplication())?.toLocalDateTime()?.format(formatter) ?: "" From a71b07b742cd2e6af6e07b7f2ee72ca345e70912 Mon Sep 17 00:00:00 2001 From: omarismail Date: Tue, 26 Sep 2023 16:19:34 +0100 Subject: [PATCH 3/3] refactor more --- .../android/fhir/sync/FhirSyncWorker.kt | 35 +++++++------------ .../android/fhir/sync/FhirSynchronizer.kt | 20 +++-------- .../android/fhir/sync/FhirSynchronizerTest.kt | 29 ++------------- 3 files changed, 21 insertions(+), 63 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt index e189daf48a..fe5a78206b 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt @@ -33,7 +33,6 @@ import java.time.OffsetDateTime import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel -import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.launch import timber.log.Timber @@ -66,11 +65,18 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter ), ) - val flow = MutableSharedFlow() + val synchronizer = + FhirSynchronizer( + applicationContext, + getFhirEngine(), + Uploader(dataSource), + DownloaderImpl(dataSource, getDownloadWorkManager()), + getConflictResolver(), + ) val job = CoroutineScope(Dispatchers.IO).launch { - flow.collect { + synchronizer.syncState.collect { // now send Progress to work manager so caller app can listen setProgress(buildWorkData(it)) @@ -80,17 +86,7 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter } } - Timber.v("Subscribed to flow for progress") - val result = - FhirSynchronizer( - applicationContext, - getFhirEngine(), - Uploader(dataSource), - DownloaderImpl(dataSource, getDownloadWorkManager()), - getConflictResolver(), - ) - .apply { subscribe(flow) } - .synchronize() + val result = synchronizer.synchronize() val output = buildWorkData(result) // await/join is needed to collect states completely @@ -105,15 +101,10 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter * [RetryConfiguration.maxRetries] set by user. */ val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0) - return when { - result is SyncJobStatus.Finished -> { - Result.success(output) - } - retries > runAttemptCount -> { - Result.retry() - } + return when (result) { + is SyncJobStatus.Finished -> Result.success(output) else -> { - Result.failure(output) + if (retries > runAttemptCount) Result.retry() else Result.failure(output) } } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index de31c208bf..c878634010 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -26,6 +26,7 @@ import com.google.android.fhir.sync.upload.UploadState import com.google.android.fhir.sync.upload.Uploader import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.flow import org.hl7.fhir.r4.model.ResourceType @@ -52,24 +53,13 @@ internal class FhirSynchronizer( private val downloader: Downloader, private val conflictResolver: ConflictResolver, ) { - private var syncState: MutableSharedFlow? = null - private val datastoreUtil = DatastoreUtil(context) - - private fun isSubscribed(): Boolean { - return syncState != null - } - fun subscribe(flow: MutableSharedFlow) { - if (isSubscribed()) { - throw IllegalStateException("Already subscribed to a flow") - } + private val _syncState = MutableSharedFlow() + val syncState: SharedFlow = _syncState - this.syncState = flow - } + private val datastoreUtil = DatastoreUtil(context) - private suspend fun setSyncState(state: SyncJobStatus) { - syncState?.emit(state) - } + private suspend fun setSyncState(state: SyncJobStatus) = _syncState.emit(state) private suspend fun setSyncState(result: SyncResult): SyncJobStatus { // todo: emit this properly instead of using datastore? diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index af2cf87f11..a280cec4a3 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -25,14 +25,12 @@ import com.google.android.fhir.sync.upload.Uploader import com.google.android.fhir.testing.TestFhirEngineImpl import com.google.common.truth.Truth.assertThat import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.launch import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.runTest import org.hl7.fhir.r4.model.Patient import org.hl7.fhir.r4.model.ResourceType -import org.junit.Assert.assertThrows import org.junit.Before import org.junit.Test import org.junit.runner.RunWith @@ -66,18 +64,6 @@ class FhirSynchronizerTest { ) } - @Test - fun `subscribe should throw exception on second subscription`() { - val flow1 = MutableSharedFlow() - val flow2 = MutableSharedFlow() - fhirSynchronizer.subscribe(flow1) - - val exception = - assertThrows(IllegalStateException::class.java) { fhirSynchronizer.subscribe(flow2) } - - assertThat(exception.localizedMessage).isEqualTo("Already subscribed to a flow") - } - @OptIn(ExperimentalCoroutinesApi::class) @Test fun `synchronize should return Success on successful download and upload`() = @@ -95,11 +81,8 @@ class FhirSynchronizerTest { ), ) - val testFlow = MutableSharedFlow() - fhirSynchronizer.subscribe(testFlow) - val emittedValues = mutableListOf() - backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } } val result = fhirSynchronizer.synchronize() @@ -132,11 +115,8 @@ class FhirSynchronizerTest { ), ) - val testFlow = MutableSharedFlow() - fhirSynchronizer.subscribe(testFlow) - val emittedValues = mutableListOf() - backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } } val result = fhirSynchronizer.synchronize() @@ -161,11 +141,8 @@ class FhirSynchronizerTest { flowOf(UploadState.Failure(error)), ) - val testFlow = MutableSharedFlow() - fhirSynchronizer.subscribe(testFlow) - val emittedValues = mutableListOf() - backgroundScope.launch { testFlow.collect { emittedValues.add(it) } } + backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } } val result = fhirSynchronizer.synchronize()