Skip to content

Commit

Permalink
Clean up SyncJobStatus and add FhirSynchronizer test (#2184)
Browse files Browse the repository at this point in the history
* Clean up SyncJobStatus and add FhirSynchronizer test

* remove changes in MAVM

* refactor more
  • Loading branch information
omarismail94 authored Sep 26, 2023
1 parent a89fb9a commit 996d13a
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 46 deletions.
35 changes: 13 additions & 22 deletions engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import java.time.OffsetDateTime
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import timber.log.Timber

Expand Down Expand Up @@ -66,11 +65,18 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
),
)

val flow = MutableSharedFlow<SyncJobStatus>()
val synchronizer =
FhirSynchronizer(
applicationContext,
getFhirEngine(),
Uploader(dataSource),
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver(),
)

val job =
CoroutineScope(Dispatchers.IO).launch {
flow.collect {
synchronizer.syncState.collect {
// now send Progress to work manager so caller app can listen
setProgress(buildWorkData(it))

Expand All @@ -80,17 +86,7 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
}
}

Timber.v("Subscribed to flow for progress")
val result =
FhirSynchronizer(
applicationContext,
getFhirEngine(),
Uploader(dataSource),
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver(),
)
.apply { subscribe(flow) }
.synchronize()
val result = synchronizer.synchronize()
val output = buildWorkData(result)

// await/join is needed to collect states completely
Expand All @@ -105,15 +101,10 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
* [RetryConfiguration.maxRetries] set by user.
*/
val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0)
return when {
result is SyncJobStatus.Finished -> {
Result.success(output)
}
retries > runAttemptCount -> {
Result.retry()
}
return when (result) {
is SyncJobStatus.Finished -> Result.success(output)
else -> {
Result.failure(output)
if (retries > runAttemptCount) Result.retry() else Result.failure(output)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.google.android.fhir.sync.upload.UploadState
import com.google.android.fhir.sync.upload.Uploader
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.flow
import org.hl7.fhir.r4.model.ResourceType

Expand All @@ -52,32 +53,21 @@ internal class FhirSynchronizer(
private val downloader: Downloader,
private val conflictResolver: ConflictResolver,
) {
private var syncState: MutableSharedFlow<SyncJobStatus>? = null
private val datastoreUtil = DatastoreUtil(context)

private fun isSubscribed(): Boolean {
return syncState != null
}

fun subscribe(flow: MutableSharedFlow<SyncJobStatus>) {
if (isSubscribed()) {
throw IllegalStateException("Already subscribed to a flow")
}
private val _syncState = MutableSharedFlow<SyncJobStatus>()
val syncState: SharedFlow<SyncJobStatus> = _syncState

this.syncState = flow
}
private val datastoreUtil = DatastoreUtil(context)

private suspend fun setSyncState(state: SyncJobStatus) {
syncState?.emit(state)
}
private suspend fun setSyncState(state: SyncJobStatus) = _syncState.emit(state)

private suspend fun setSyncState(result: SyncResult): SyncJobStatus {
// todo: emit this properly instead of using datastore?
datastoreUtil.writeLastSyncTimestamp(result.timestamp)

val state =
when (result) {
is SyncResult.Success -> SyncJobStatus.Finished()
is SyncResult.Success -> SyncJobStatus.Finished
is SyncResult.Error -> SyncJobStatus.Failed(result.exceptions)
}

Expand All @@ -86,7 +76,7 @@ internal class FhirSynchronizer(
}

suspend fun synchronize(): SyncJobStatus {
setSyncState(SyncJobStatus.Started())
setSyncState(SyncJobStatus.Started)

return listOf(download(), upload())
.filterIsInstance<SyncResult.Error>()
Expand Down Expand Up @@ -123,7 +113,6 @@ internal class FhirSynchronizer(
return if (exceptions.isEmpty()) {
SyncResult.Success()
} else {
setSyncState(SyncJobStatus.Glitch(exceptions))
SyncResult.Error(exceptions)
}
}
Expand Down Expand Up @@ -151,7 +140,6 @@ internal class FhirSynchronizer(
return if (exceptions.isEmpty()) {
SyncResult.Success()
} else {
setSyncState(SyncJobStatus.Glitch(exceptions))
SyncResult.Error(exceptions)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sealed class SyncJobStatus {
val timestamp: OffsetDateTime = OffsetDateTime.now()

/** Sync job has been started on the client but the syncing is not necessarily in progress. */
class Started : SyncJobStatus()
object Started : SyncJobStatus()

/** Syncing in progress with the server. */
data class InProgress(
Expand All @@ -31,11 +31,8 @@ sealed class SyncJobStatus {
val completed: Int = 0,
) : SyncJobStatus()

/** Glitched but sync job is being retried. */
data class Glitch(val exceptions: List<ResourceSyncException>) : SyncJobStatus()

/** Sync job finished successfully. */
class Finished : SyncJobStatus()
object Finished : SyncJobStatus()

/** Sync job failed. */
data class Failed(val exceptions: List<ResourceSyncException>) : SyncJobStatus()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync

import androidx.test.core.app.ApplicationProvider
import com.google.android.fhir.LocalChangeToken
import com.google.android.fhir.sync.download.DownloadState
import com.google.android.fhir.sync.download.Downloader
import com.google.android.fhir.sync.upload.UploadState
import com.google.android.fhir.sync.upload.Uploader
import com.google.android.fhir.testing.TestFhirEngineImpl
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import org.hl7.fhir.r4.model.Patient
import org.hl7.fhir.r4.model.ResourceType
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.mockito.Mock
import org.mockito.Mockito.`when`
import org.mockito.MockitoAnnotations
import org.mockito.kotlin.any
import org.robolectric.RobolectricTestRunner

@RunWith(RobolectricTestRunner::class)
class FhirSynchronizerTest {

@Mock private lateinit var uploader: Uploader

@Mock private lateinit var downloader: Downloader

@Mock private lateinit var conflictResolver: ConflictResolver

private lateinit var fhirSynchronizer: FhirSynchronizer

@Before
fun setUp() {
MockitoAnnotations.openMocks(this)
fhirSynchronizer =
FhirSynchronizer(
ApplicationProvider.getApplicationContext(),
TestFhirEngineImpl,
uploader,
downloader,
conflictResolver,
)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `synchronize should return Success on successful download and upload`() =
runTest(UnconfinedTestDispatcher()) {
`when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10)))
`when`(uploader.upload(any()))
.thenReturn(
flowOf(
UploadState.Success(
LocalChangeToken(listOf()),
Patient(),
1,
1,
),
),
)

val emittedValues = mutableListOf<SyncJobStatus>()
backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } }

val result = fhirSynchronizer.synchronize()

assertThat(emittedValues)
.containsExactly(
SyncJobStatus.Started,
SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10),
SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1),
SyncJobStatus.Finished,
)

assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `synchronize should return Failed on failed download`() =
runTest(UnconfinedTestDispatcher()) {
val error = ResourceSyncException(ResourceType.Patient, Exception("Download error"))
`when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error)))
`when`(uploader.upload(any()))
.thenReturn(
flowOf(
UploadState.Success(
LocalChangeToken(listOf()),
Patient(),
1,
1,
),
),
)

val emittedValues = mutableListOf<SyncJobStatus>()
backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } }

val result = fhirSynchronizer.synchronize()

assertThat(emittedValues)
.containsExactly(
SyncJobStatus.Started,
SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1),
SyncJobStatus.Failed(exceptions = listOf(error)),
)
assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java)
assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `synchronize should return Failed on failed upload`() =
runTest(UnconfinedTestDispatcher()) {
`when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10)))
val error = ResourceSyncException(ResourceType.Patient, Exception("Upload error"))
`when`(uploader.upload(any()))
.thenReturn(
flowOf(UploadState.Failure(error)),
)

val emittedValues = mutableListOf<SyncJobStatus>()
backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } }

val result = fhirSynchronizer.synchronize()

assertThat(emittedValues)
.containsExactly(
SyncJobStatus.Started,
SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10),
SyncJobStatus.Failed(exceptions = listOf(error)),
)
assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java)
assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions)
}
}

0 comments on commit 996d13a

Please sign in to comment.