Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up SyncJobStatus and add FhirSynchronizer test #2184

Merged
merged 7 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import java.time.OffsetDateTime
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import timber.log.Timber

Expand Down Expand Up @@ -66,11 +65,18 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
),
)

val flow = MutableSharedFlow<SyncJobStatus>()
val synchronizer =
FhirSynchronizer(
applicationContext,
getFhirEngine(),
Uploader(dataSource),
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver(),
)

val job =
CoroutineScope(Dispatchers.IO).launch {
flow.collect {
synchronizer.syncState.collect {
// now send Progress to work manager so caller app can listen
setProgress(buildWorkData(it))

Expand All @@ -80,17 +86,7 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
}
}

Timber.v("Subscribed to flow for progress")
val result =
FhirSynchronizer(
applicationContext,
getFhirEngine(),
Uploader(dataSource),
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver(),
)
.apply { subscribe(flow) }
.synchronize()
val result = synchronizer.synchronize()
val output = buildWorkData(result)

// await/join is needed to collect states completely
Expand All @@ -105,15 +101,10 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
* [RetryConfiguration.maxRetries] set by user.
*/
val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0)
return when {
result is SyncJobStatus.Finished -> {
Result.success(output)
}
retries > runAttemptCount -> {
Result.retry()
}
return when (result) {
is SyncJobStatus.Finished -> Result.success(output)
else -> {
Result.failure(output)
if (retries > runAttemptCount) Result.retry() else Result.failure(output)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.google.android.fhir.sync.upload.UploadState
import com.google.android.fhir.sync.upload.Uploader
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.flow
import org.hl7.fhir.r4.model.ResourceType

Expand All @@ -52,32 +53,21 @@ internal class FhirSynchronizer(
private val downloader: Downloader,
private val conflictResolver: ConflictResolver,
) {
private var syncState: MutableSharedFlow<SyncJobStatus>? = null
private val datastoreUtil = DatastoreUtil(context)

private fun isSubscribed(): Boolean {
return syncState != null
}

fun subscribe(flow: MutableSharedFlow<SyncJobStatus>) {
if (isSubscribed()) {
throw IllegalStateException("Already subscribed to a flow")
}
private val _syncState = MutableSharedFlow<SyncJobStatus>()
val syncState: SharedFlow<SyncJobStatus> = _syncState

this.syncState = flow
}
private val datastoreUtil = DatastoreUtil(context)

private suspend fun setSyncState(state: SyncJobStatus) {
syncState?.emit(state)
}
private suspend fun setSyncState(state: SyncJobStatus) = _syncState.emit(state)

private suspend fun setSyncState(result: SyncResult): SyncJobStatus {
// todo: emit this properly instead of using datastore?
datastoreUtil.writeLastSyncTimestamp(result.timestamp)

val state =
when (result) {
is SyncResult.Success -> SyncJobStatus.Finished()
is SyncResult.Success -> SyncJobStatus.Finished
is SyncResult.Error -> SyncJobStatus.Failed(result.exceptions)
}

Expand All @@ -86,7 +76,7 @@ internal class FhirSynchronizer(
}

suspend fun synchronize(): SyncJobStatus {
setSyncState(SyncJobStatus.Started())
setSyncState(SyncJobStatus.Started)

return listOf(download(), upload())
.filterIsInstance<SyncResult.Error>()
Expand Down Expand Up @@ -123,7 +113,6 @@ internal class FhirSynchronizer(
return if (exceptions.isEmpty()) {
SyncResult.Success()
} else {
setSyncState(SyncJobStatus.Glitch(exceptions))
SyncResult.Error(exceptions)
}
}
Expand Down Expand Up @@ -151,7 +140,6 @@ internal class FhirSynchronizer(
return if (exceptions.isEmpty()) {
SyncResult.Success()
} else {
setSyncState(SyncJobStatus.Glitch(exceptions))
SyncResult.Error(exceptions)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sealed class SyncJobStatus {
val timestamp: OffsetDateTime = OffsetDateTime.now()

/** Sync job has been started on the client but the syncing is not necessarily in progress. */
class Started : SyncJobStatus()
object Started : SyncJobStatus()

/** Syncing in progress with the server. */
data class InProgress(
Expand All @@ -31,11 +31,8 @@ sealed class SyncJobStatus {
val completed: Int = 0,
) : SyncJobStatus()

/** Glitched but sync job is being retried. */
data class Glitch(val exceptions: List<ResourceSyncException>) : SyncJobStatus()

/** Sync job finished successfully. */
class Finished : SyncJobStatus()
object Finished : SyncJobStatus()

/** Sync job failed. */
data class Failed(val exceptions: List<ResourceSyncException>) : SyncJobStatus()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync

import androidx.test.core.app.ApplicationProvider
import com.google.android.fhir.LocalChangeToken
import com.google.android.fhir.sync.download.DownloadState
import com.google.android.fhir.sync.download.Downloader
import com.google.android.fhir.sync.upload.UploadState
import com.google.android.fhir.sync.upload.Uploader
import com.google.android.fhir.testing.TestFhirEngineImpl
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import org.hl7.fhir.r4.model.Patient
import org.hl7.fhir.r4.model.ResourceType
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.mockito.Mock
import org.mockito.Mockito.`when`
import org.mockito.MockitoAnnotations
import org.mockito.kotlin.any
import org.robolectric.RobolectricTestRunner

@RunWith(RobolectricTestRunner::class)
class FhirSynchronizerTest {

@Mock private lateinit var uploader: Uploader

@Mock private lateinit var downloader: Downloader

@Mock private lateinit var conflictResolver: ConflictResolver

private lateinit var fhirSynchronizer: FhirSynchronizer

@Before
fun setUp() {
MockitoAnnotations.openMocks(this)
fhirSynchronizer =
FhirSynchronizer(
ApplicationProvider.getApplicationContext(),
TestFhirEngineImpl,
uploader,
downloader,
conflictResolver,
)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `synchronize should return Success on successful download and upload`() =
runTest(UnconfinedTestDispatcher()) {
`when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10)))
`when`(uploader.upload(any()))
.thenReturn(
flowOf(
UploadState.Success(
LocalChangeToken(listOf()),
Patient(),
1,
1,
),
),
)

val emittedValues = mutableListOf<SyncJobStatus>()
backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } }

val result = fhirSynchronizer.synchronize()

assertThat(emittedValues)
.containsExactly(
SyncJobStatus.Started,
SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10),
SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1),
SyncJobStatus.Finished,
)

assertThat(SyncJobStatus.Finished::class.java).isEqualTo(result::class.java)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `synchronize should return Failed on failed download`() =
runTest(UnconfinedTestDispatcher()) {
val error = ResourceSyncException(ResourceType.Patient, Exception("Download error"))
`when`(downloader.download()).thenReturn(flowOf(DownloadState.Failure(error)))
`when`(uploader.upload(any()))
.thenReturn(
flowOf(
UploadState.Success(
LocalChangeToken(listOf()),
Patient(),
1,
1,
),
),
)

val emittedValues = mutableListOf<SyncJobStatus>()
backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } }

val result = fhirSynchronizer.synchronize()

assertThat(emittedValues)
.containsExactly(
SyncJobStatus.Started,
SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1),
SyncJobStatus.Failed(exceptions = listOf(error)),
)
assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java)
assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `synchronize should return Failed on failed upload`() =
runTest(UnconfinedTestDispatcher()) {
`when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 10, 10)))
val error = ResourceSyncException(ResourceType.Patient, Exception("Upload error"))
`when`(uploader.upload(any()))
.thenReturn(
flowOf(UploadState.Failure(error)),
)

val emittedValues = mutableListOf<SyncJobStatus>()
backgroundScope.launch { fhirSynchronizer.syncState.collect { emittedValues.add(it) } }

val result = fhirSynchronizer.synchronize()

assertThat(emittedValues)
.containsExactly(
SyncJobStatus.Started,
SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10),
SyncJobStatus.Failed(exceptions = listOf(error)),
)
assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java)
assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions)
}
}