Skip to content

Commit

Permalink
fixed issues with reporting sync state events from different threads
Browse files Browse the repository at this point in the history
  • Loading branch information
artkoenig committed Jul 1, 2022
1 parent 242cc28 commit 03da067
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 32 deletions.
1 change: 1 addition & 0 deletions changelog.d/6341.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed issues with reporting sync state events from different threads
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ interface SyncService {
fun getSyncStateLive(): LiveData<SyncState>

/**
* Get the [SyncRequestState] as a LiveData.
* Get the [SyncRequestState] as a SharedFlow.
*/
fun getSyncRequestStateLive(): LiveData<SyncRequestState>
fun getSyncRequestStateFlow(): SharedFlow<SyncRequestState>

/**
* This method returns a flow of SyncResponse. New value will be pushed through the sync thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.matrix.android.sdk.internal.session.sync

import androidx.lifecycle.LiveData
import org.matrix.android.sdk.api.session.sync.SyncRequestState
import org.matrix.android.sdk.api.session.sync.SyncService
import org.matrix.android.sdk.internal.di.SessionId
import org.matrix.android.sdk.internal.di.WorkManagerProvider
Expand Down Expand Up @@ -75,9 +73,7 @@ internal class DefaultSyncService @Inject constructor(

override fun getSyncState() = getSyncThread().currentState()

override fun getSyncRequestStateLive(): LiveData<SyncRequestState> {
return syncRequestStateTracker.syncRequestState
}
override fun getSyncRequestStateFlow() = syncRequestStateTracker.syncRequestState

override fun hasAlreadySynced(): Boolean {
return syncTokenStore.getLastToken() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,26 @@

package org.matrix.android.sdk.internal.session.sync

import androidx.lifecycle.MutableLiveData
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.session.sync.InitialSyncStep
import org.matrix.android.sdk.api.session.sync.SyncRequestState
import org.matrix.android.sdk.internal.session.SessionScope
import javax.inject.Inject

@SessionScope
internal class SyncRequestStateTracker @Inject constructor() :
ProgressReporter {
internal class SyncRequestStateTracker @Inject constructor(
private val coroutineScope: CoroutineScope
) : ProgressReporter {

val syncRequestState = MutableLiveData<SyncRequestState>()
val syncRequestState = MutableSharedFlow<SyncRequestState>()

private var rootTask: TaskInfo? = null

// Only to be used for incremental sync
fun setSyncRequestState(newSyncRequestState: SyncRequestState.IncrementalSyncRequestState) {
syncRequestState.postValue(newSyncRequestState)
emitSyncState(newSyncRequestState)
}

/**
Expand All @@ -42,7 +45,9 @@ internal class SyncRequestStateTracker @Inject constructor() :
initialSyncStep: InitialSyncStep,
totalProgress: Int
) {
endAll()
if (rootTask != null) {
endAll()
}
rootTask = TaskInfo(initialSyncStep, totalProgress, null, 1F)
reportProgress(0F)
}
Expand Down Expand Up @@ -71,7 +76,7 @@ internal class SyncRequestStateTracker @Inject constructor() :
// Update the progress of the leaf and all its parents
leaf.setProgress(progress)
// Then update the live data using leaf wording and root progress
syncRequestState.postValue(SyncRequestState.InitialSyncProgressing(leaf.initialSyncStep, root.currentProgress.toInt()))
emitSyncState(SyncRequestState.InitialSyncProgressing(leaf.initialSyncStep, root.currentProgress.toInt()))
}
}
}
Expand All @@ -86,13 +91,19 @@ internal class SyncRequestStateTracker @Inject constructor() :
// And close it
endedTask.parent.child = null
} else {
syncRequestState.postValue(SyncRequestState.Idle)
emitSyncState(SyncRequestState.Idle)
}
}
}

fun endAll() {
rootTask = null
syncRequestState.postValue(SyncRequestState.Idle)
emitSyncState(SyncRequestState.Idle)
}

private fun emitSyncState(state: SyncRequestState) {
coroutineScope.launch {
syncRequestState.emit(state)
}
}
}
4 changes: 1 addition & 3 deletions vector/src/main/java/im/vector/app/AppStateHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package im.vector.app

import androidx.lifecycle.DefaultLifecycleObserver
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.asFlow
import arrow.core.Option
import im.vector.app.core.di.ActiveSessionHolder
import im.vector.app.core.utils.BehaviorDataSource
Expand Down Expand Up @@ -147,8 +146,7 @@ class AppStateHandler @Inject constructor(
}

private fun observeSyncStatus(session: Session) {
session.syncService().getSyncRequestStateLive()
.asFlow()
session.syncService().getSyncRequestStateFlow()
.filterIsInstance<SyncRequestState.IncrementalSyncDone>()
.map { session.spaceService().getRootSpaceSummaries().size }
.distinctUntilChanged()
Expand Down
4 changes: 1 addition & 3 deletions vector/src/main/java/im/vector/app/AutoRageShaker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package im.vector.app

import android.content.SharedPreferences
import androidx.lifecycle.asFlow
import im.vector.app.core.di.ActiveSessionHolder
import im.vector.app.features.rageshake.BugReporter
import im.vector.app.features.rageshake.ReportType
Expand Down Expand Up @@ -261,8 +260,7 @@ class AutoRageShaker @Inject constructor(
this.currentActiveSessionId = sessionId

hasSynced = session.syncService().hasAlreadySynced()
session.syncService().getSyncRequestStateLive()
.asFlow()
session.syncService().getSyncRequestStateFlow()
.onEach {
hasSynced = it !is SyncRequestState.InitialSyncProgressing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package im.vector.app.features.analytics.accountdata

import androidx.lifecycle.asFlow
import com.airbnb.mvrx.MavericksViewModelFactory
import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
Expand Down Expand Up @@ -66,7 +65,7 @@ class AnalyticsAccountDataViewModel @AssistedInject constructor(

private fun observeInitSync() {
combine(
session.syncService().getSyncRequestStateLive().asFlow(),
session.syncService().getSyncRequestStateFlow(),
analytics.getUserConsent(),
analytics.getAnalyticsId()
) { status, userConsent, analyticsId ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package im.vector.app.features.home

import androidx.lifecycle.asFlow
import com.airbnb.mvrx.Mavericks
import com.airbnb.mvrx.MavericksViewModelFactory
import com.airbnb.mvrx.ViewModelContext
Expand Down Expand Up @@ -218,8 +217,7 @@ class HomeActivityViewModel @AssistedInject constructor(
private fun observeInitialSync() {
val session = activeSessionHolder.getSafeActiveSession() ?: return

session.syncService().getSyncRequestStateLive()
.asFlow()
session.syncService().getSyncRequestStateFlow()
.onEach { status ->
when (status) {
is SyncRequestState.Idle -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ class HomeDetailViewModel @AssistedInject constructor(
copy(syncState = syncState)
}

session.syncService().getSyncRequestStateLive()
.asFlow()
session.syncService().getSyncRequestStateFlow()
.filterIsInstance<SyncRequestState.IncrementalSyncRequestState>()
.setOnEach {
copy(incrementalSyncRequestState = it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package im.vector.app.features.home.room.detail

import android.net.Uri
import androidx.annotation.IdRes
import androidx.lifecycle.asFlow
import com.airbnb.mvrx.Async
import com.airbnb.mvrx.Fail
import com.airbnb.mvrx.Loading
Expand Down Expand Up @@ -1130,8 +1129,7 @@ class TimelineViewModel @AssistedInject constructor(
copy(syncState = syncState)
}

session.syncService().getSyncRequestStateLive()
.asFlow()
session.syncService().getSyncRequestStateFlow()
.filterIsInstance<SyncRequestState.IncrementalSyncRequestState>()
.setOnEach {
copy(incrementalSyncRequestState = it)
Expand Down

0 comments on commit 03da067

Please sign in to comment.