Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align Autorageshake with web implementation #5639

Merged
merged 7 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5596.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Align auto-reporting of decryption errors implementation with web client.
1 change: 1 addition & 0 deletions changelog.d/5639.sdk
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Include original event in live decryption listeners.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ interface LiveEventListener {

fun onPaginatedEvent(roomId: String, event: Event)

fun onEventDecrypted(eventId: String, roomId: String, clearEvent: JsonDict)
fun onEventDecrypted(event: Event, clearEvent: JsonDict)

fun onEventDecryptionError(eventId: String, roomId: String, throwable: Throwable)
fun onEventDecryptionError(event: Event, throwable: Throwable)

fun onLiveToDeviceEvent(event: Event)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ internal class StreamEventsManager @Inject constructor() {
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onEventDecrypted(event.eventId ?: "", event.roomId ?: "", result.clearEvent)
it.onEventDecrypted(event, result.clearEvent)
}
}
}
Expand All @@ -82,7 +82,7 @@ internal class StreamEventsManager @Inject constructor() {
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onEventDecryptionError(event.eventId ?: "", event.roomId ?: "", error)
it.onEventDecryptionError(event, error)
}
}
}
Expand Down
35 changes: 20 additions & 15 deletions vector/src/main/java/im/vector/app/AutoRageShaker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package im.vector.app

import android.content.SharedPreferences
import androidx.lifecycle.asFlow
import im.vector.app.core.di.ActiveSessionHolder
import im.vector.app.features.rageshake.BugReporter
import im.vector.app.features.rageshake.ReportType
import im.vector.app.features.session.coroutineScope
import im.vector.app.features.settings.VectorPreferences
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand All @@ -34,6 +36,7 @@ import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.initsync.SyncStatusService
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
Expand Down Expand Up @@ -62,10 +65,11 @@ class AutoRageShaker @Inject constructor(

private val e2eDetectedFlow = MutableSharedFlow<E2EMessageDetected>(replay = 0)
private val matchingRSRequestFlow = MutableSharedFlow<Event>(replay = 0)

private var hasSynced = false
private var preferenceEnabled = false
fun initialize() {
observeActiveSession()
enable(vectorPreferences.labsAutoReportUISI())
preferenceEnabled = vectorPreferences.labsAutoReportUISI()
// It's a singleton...
vectorPreferences.subscribeToChanges(this)

Expand All @@ -74,7 +78,7 @@ class AutoRageShaker @Inject constructor(
e2eDetectedFlow
.onEach {
sendRageShake(it)
delay(2_000)
delay(60_000)
}
.catch { cause ->
Timber.w(cause, "Failed to RS")
Expand All @@ -84,7 +88,7 @@ class AutoRageShaker @Inject constructor(
matchingRSRequestFlow
.onEach {
sendMatchingRageShake(it)
delay(2_000)
delay(60_000)
}
.catch { cause ->
Timber.w(cause, "Failed to send matching rageshake")
Expand All @@ -93,14 +97,7 @@ class AutoRageShaker @Inject constructor(
}

override fun onSharedPreferenceChanged(sharedPreferences: SharedPreferences?, key: String?) {
enable(vectorPreferences.labsAutoReportUISI())
}

var _enabled = false
fun enable(enabled: Boolean) {
if (enabled == _enabled) return
_enabled = enabled
detector.enabled = enabled
preferenceEnabled = vectorPreferences.labsAutoReportUISI()
}

private fun observeActiveSession() {
Expand All @@ -115,7 +112,6 @@ class AutoRageShaker @Inject constructor(
}

fun decryptionErrorDetected(target: E2EMessageDetected) {
if (target.source == UISIEventSource.INITIAL_SYNC) return
if (activeSessionHolder.getSafeActiveSession()?.sessionId != currentActiveSessionId) return
val shouldSendRS = synchronized(alreadyReportedUisi) {
val reportInfo = ReportInfo(target.roomId, target.sessionId)
Expand Down Expand Up @@ -148,7 +144,6 @@ class AutoRageShaker @Inject constructor(
append("\"room_id\": \"${target.roomId}\",")
append("\"sender_key\": \"${target.senderKey}\",")
append("\"device_id\": \"${target.senderDeviceId}\",")
append("\"source\": \"${target.source}\",")
append("\"user_id\": \"${target.senderUserId}\",")
append("\"session_id\": \"${target.sessionId}\"")
append("}")
Expand Down Expand Up @@ -245,6 +240,9 @@ class AutoRageShaker @Inject constructor(
override val reciprocateToDeviceEventType: String
get() = AUTO_RS_REQUEST

override val enabled: Boolean
get() = [email protected] && [email protected]

override fun uisiDetected(source: E2EMessageDetected) {
decryptionErrorDetected(source)
}
Expand All @@ -261,7 +259,14 @@ class AutoRageShaker @Inject constructor(
return
}
this.currentActiveSessionId = sessionId
this.detector.enabled = _enabled

hasSynced = session.hasAlreadySynced()
session.getSyncStatusLive()
.asFlow()
.onEach {
hasSynced = it !is SyncStatusService.Status.Progressing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exaclty what we want no? Looks a bit strange. Also we are subscribing to the flow for the all session lifecycle no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"For the lifetime of the session, confirm we are not in an initial sync", Being outside an initial sync I thought would be reasonably equivalent to after an initial sync.

Web uses the sync token, so equivalent to hasAlreadySynced. We could keep a reference to the active sessions and call hasAlreadySynced synchronously in enabled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ganfra or @bmarty what would be the best way to check that an initial sync has already been done?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is acceptable.
Maybe we should rename SyncStatusService.Status.Progressing to SyncStatusService.Status.InitialSyncProgressing for clarity.

It's historical, previously we did not track the incremental sync status.

}
.launchIn(session.coroutineScope)
activeSessionIds.add(sessionId)
session.addListener(this)
session.addEventStreamListener(detector)
Expand Down
109 changes: 38 additions & 71 deletions vector/src/main/java/im/vector/app/UISIDetector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package im.vector.app

import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.LiveEventListener
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.toModel
Expand All @@ -26,23 +27,17 @@ import java.util.Timer
import java.util.TimerTask
import java.util.concurrent.Executors

enum class UISIEventSource {
INITIAL_SYNC,
INCREMENTAL_SYNC,
PAGINATION
}

data class E2EMessageDetected(
val eventId: String,
val roomId: String,
val senderUserId: String,
val senderDeviceId: String,
val senderKey: String,
val sessionId: String,
val source: UISIEventSource) {
val sessionId: String
) {

companion object {
fun fromEvent(event: Event, roomId: String, source: UISIEventSource): E2EMessageDetected {
fun fromEvent(event: Event, roomId: String): E2EMessageDetected {
val encryptedContent = event.content.toModel<EncryptedEventContent>()

return E2EMessageDetected(
Expand All @@ -51,8 +46,7 @@ data class E2EMessageDetected(
senderUserId = event.senderId ?: "",
senderDeviceId = encryptedContent?.deviceId ?: "",
senderKey = encryptedContent?.senderKey ?: "",
sessionId = encryptedContent?.sessionId ?: "",
source = source
sessionId = encryptedContent?.sessionId ?: ""
)
}
}
Expand All @@ -61,37 +55,24 @@ data class E2EMessageDetected(
class UISIDetector : LiveEventListener {

interface UISIDetectorCallback {
val enabled: Boolean
val reciprocateToDeviceEventType: String
fun uisiDetected(source: E2EMessageDetected)
fun uisiReciprocateRequest(source: Event)
}

var callback: UISIDetectorCallback? = null

private val trackedEvents = mutableListOf<Pair<E2EMessageDetected, TimerTask>>()
private val trackedEvents = mutableMapOf<String, TimerTask>()
private val executor = Executors.newSingleThreadExecutor()
private val timer = Timer()
private val timeoutMillis = 30_000L
var enabled = false

override fun onLiveEvent(roomId: String, event: Event) {
if (!enabled) return
if (!event.isEncrypted()) return
executor.execute {
handleEventReceived(E2EMessageDetected.fromEvent(event, roomId, UISIEventSource.INCREMENTAL_SYNC))
}
}
private val enabled: Boolean get() = callback?.enabled.orFalse()

override fun onPaginatedEvent(roomId: String, event: Event) {
if (!enabled) return
if (!event.isEncrypted()) return
executor.execute {
handleEventReceived(E2EMessageDetected.fromEvent(event, roomId, UISIEventSource.PAGINATION))
}
}

override fun onEventDecrypted(eventId: String, roomId: String, clearEvent: JsonDict) {
if (!enabled) return
override fun onEventDecrypted(event: Event, clearEvent: JsonDict) {
val eventId = event.eventId
val roomId = event.roomId
if (!enabled || eventId == null || roomId == null) return
executor.execute {
unTrack(eventId, roomId)
}
Expand All @@ -104,57 +85,43 @@ class UISIDetector : LiveEventListener {
}
}

override fun onEventDecryptionError(eventId: String, roomId: String, throwable: Throwable) {
if (!enabled) return
executor.execute {
unTrack(eventId, roomId)?.let {
triggerUISI(it)
}
// if (throwable is MXCryptoError.OlmError) {
// if (throwable.olmException.message == "UNKNOWN_MESSAGE_INDEX") {
// unTrack(eventId, roomId)?.let {
// triggerUISI(it)
// }
// }
// }
}
}
override fun onEventDecryptionError(event: Event, throwable: Throwable) {
val eventId = event.eventId
val roomId = event.roomId
if (!enabled || eventId == null || roomId == null) return

private fun handleEventReceived(detectorEvent: E2EMessageDetected) {
if (!enabled) return
if (trackedEvents.any { it.first == detectorEvent }) {
Timber.w("## UISIDetector: Event ${detectorEvent.eventId} is already tracked")
} else {
// track it and start timer
val timeoutTask = object : TimerTask() {
override fun run() {
executor.execute {
unTrack(detectorEvent.eventId, detectorEvent.roomId)
Timber.v("## UISIDetector: Timeout on ${detectorEvent.eventId} ")
triggerUISI(detectorEvent)
}
val trackerId: String = trackerId(eventId, roomId)
if (trackedEvents.containsKey(trackerId)) {
Timber.w("## UISIDetector: Event $eventId is already tracked")
return
}
// track it and start timer
val timeoutTask = object : TimerTask() {
override fun run() {
executor.execute {
unTrack(eventId, roomId)
Timber.v("## UISIDetector: Timeout on $eventId")
triggerUISI(E2EMessageDetected.fromEvent(event, roomId))
}
}
trackedEvents.add(detectorEvent to timeoutTask)
timer.schedule(timeoutTask, timeoutMillis)
}
trackedEvents[trackerId] = timeoutTask
timer.schedule(timeoutTask, timeoutMillis)
}

override fun onLiveEvent(roomId: String, event: Event) { }

override fun onPaginatedEvent(roomId: String, event: Event) { }

private fun trackerId(eventId: String, roomId: String): String = "$roomId-$eventId"

private fun triggerUISI(source: E2EMessageDetected) {
if (!enabled) return
Timber.i("## UISIDetector: Unable To Decrypt $source")
callback?.uisiDetected(source)
}

private fun unTrack(eventId: String, roomId: String): E2EMessageDetected? {
val index = trackedEvents.indexOfFirst { it.first.eventId == eventId && it.first.roomId == roomId }
return if (index != -1) {
trackedEvents.removeAt(index).let {
it.second.cancel()
it.first
}
} else {
null
}
private fun unTrack(eventId: String, roomId: String) {
trackedEvents.remove(trackerId(eventId, roomId))?.cancel()
}
}