Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid accessing realm instance from suspend functions #5592

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5592.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve/fix crashes on messages decryption
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.realm.Realm
import io.realm.RealmQuery
import io.realm.Sort
import io.realm.kotlin.createObject
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult
Expand Down Expand Up @@ -127,7 +128,7 @@ private fun EventEntity.toTimelineEventEntity(roomMemberContentsByUser: HashMap<
return timelineEventEntity
}

internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
internal fun ThreadSummaryEntity.Companion.createOrUpdate(
threadSummaryType: ThreadSummaryUpdateType,
realm: Realm,
roomId: String,
Expand All @@ -153,10 +154,18 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
}

val rootThreadEventEntity = createEventEntity(roomId, rootThreadEvent, realm).also {
decryptIfNeeded(cryptoService, it, roomId)
try {
decryptIfNeeded(cryptoService, it, roomId)
} catch (e: InterruptedException) {
Timber.i("Decryption got interrupted")
}
}
val latestThreadEventEntity = createLatestEventEntity(roomId, rootThreadEvent, roomMemberContentsByUser, realm)?.also {
decryptIfNeeded(cryptoService, it, roomId)
try {
decryptIfNeeded(cryptoService, it, roomId)
} catch (e: InterruptedException) {
Timber.i("Decryption got interrupted")
}
}
val isUserParticipating = rootThreadEvent.unsignedData.relations.latestThread.isUserParticipating == true || rootThreadEvent.senderId == userId
roomMemberContentsByUser.addSenderState(realm, roomId, rootThreadEvent.senderId)
Expand Down Expand Up @@ -204,14 +213,15 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
}
}

private suspend fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) {
private fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) {
cryptoService ?: return
val event = eventEntity.asDomain()
if (event.isEncrypted() && event.mxDecryptionResult == null && event.eventId != null) {
try {
Timber.i("###THREADS ThreadSummaryHelper request decryption for eventId:${event.eventId}")
// Event from sync does not have roomId, so add it to the event first
val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "")
// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching
val result = runBlocking { cryptoService.decryptEvent(event.copy(roomId = roomId), "") }
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ internal class TimelineEventDecryptor @Inject constructor(
executor?.execute {
Realm.getInstance(realmConfiguration).use { realm ->
try {
runBlocking {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the aim is to ensure the realm instance is only executed on a single thread it might be safer to keep the runBlocking here next to the instance creation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid making the function processDecryptRequest suspended. However, I can see the benefit you describe.

processDecryptRequest(request, realm)
}
processDecryptRequest(request, realm)
} catch (e: InterruptedException) {
Timber.i("Decryption got interrupted")
}
Expand All @@ -121,7 +119,7 @@ internal class TimelineEventDecryptor @Inject constructor(
}
}

private suspend fun processDecryptRequest(request: DecryptionRequest, realm: Realm) {
private fun processDecryptRequest(request: DecryptionRequest, realm: Realm) {
val event = request.event
val timelineId = request.timelineId

Expand All @@ -132,7 +130,8 @@ internal class TimelineEventDecryptor @Inject constructor(
return
}
try {
val result = cryptoService.decryptEvent(request.event, timelineId)
// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching
val result = runBlocking { cryptoService.decryptEvent(request.event, timelineId) }
Timber.v("Successfully decrypted event ${event.eventId}")
realm.executeTransaction {
val eventId = event.eventId ?: return@executeTransaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
}

suspend fun handle(realm: Realm,
roomsSyncResponse: RoomsSyncResponse,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter? = null) {
fun handle(realm: Realm,
roomsSyncResponse: RoomsSyncResponse,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter? = null) {
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator, reporter)
Expand All @@ -120,11 +120,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
}
// PRIVATE METHODS *****************************************************************************

private suspend fun handleRoomSync(realm: Realm,
handlingStrategy: HandlingStrategy,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter?) {
private fun handleRoomSync(realm: Realm,
handlingStrategy: HandlingStrategy,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter?) {
val insertType = if (isInitialSync) {
EventInsertType.INITIAL_SYNC
} else {
Expand Down Expand Up @@ -157,11 +157,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
realm.insertOrUpdate(rooms)
}

private suspend fun insertJoinRoomsFromInitSync(realm: Realm,
handlingStrategy: HandlingStrategy.JOINED,
syncLocalTimeStampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter?) {
private fun insertJoinRoomsFromInitSync(realm: Realm,
handlingStrategy: HandlingStrategy.JOINED,
syncLocalTimeStampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter?) {
val bestChunkSize = computeBestChunkSize(
listSize = handlingStrategy.data.keys.size,
limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
Expand Down Expand Up @@ -199,12 +199,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
}
}

private suspend fun handleJoinedRoom(realm: Realm,
roomId: String,
roomSync: RoomSync,
insertType: EventInsertType,
syncLocalTimestampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
private fun handleJoinedRoom(realm: Realm,
roomId: String,
roomSync: RoomSync,
insertType: EventInsertType,
syncLocalTimestampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
Timber.v("Handle join sync for room $roomId")
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC

Expand Down Expand Up @@ -353,15 +353,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
return roomEntity
}

private suspend fun handleTimelineEvents(realm: Realm,
roomId: String,
roomEntity: RoomEntity,
eventList: List<Event>,
prevToken: String? = null,
isLimited: Boolean = true,
insertType: EventInsertType,
syncLocalTimestampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity {
private fun handleTimelineEvents(realm: Realm,
roomId: String,
roomEntity: RoomEntity,
eventList: List<Event>,
prevToken: String? = null,
isLimited: Boolean = true,
insertType: EventInsertType,
syncLocalTimestampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity {
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
if (isLimited && lastChunk != null) {
lastChunk.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = true)
Expand Down Expand Up @@ -389,8 +389,10 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
liveEventService.get().dispatchLiveEventReceived(event, roomId, isInitialSync)

if (event.isEncrypted() && !isInitialSync) {
runBlocking {
try {
decryptIfNeeded(event, roomId)
} catch (e: InterruptedException) {
Timber.i("Decryption got interrupted")
}
}
var contentToInject: String? = null
Expand Down Expand Up @@ -421,7 +423,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
roomId = roomId,
eventEntity = eventEntity,
direction = PaginationDirection.FORWARDS,
roomMemberContentsByUser = roomMemberContentsByUser)
roomMemberContentsByUser = roomMemberContentsByUser
)
if (lightweightSettingsStorage.areThreadMessagesEnabled()) {
eventEntity.rootThreadEventId?.let {
// This is a thread event
Expand All @@ -437,7 +440,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
threadEventEntity = eventEntity,
roomMemberContentsByUser = roomMemberContentsByUser,
userId = userId,
roomEntity = roomEntity)
roomEntity = roomEntity
)
}
} ?: run {
// This is a normal event or a root thread one
Expand Down Expand Up @@ -475,7 +479,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
roomId = roomId,
realm = realm,
chunkEntity = chunkEntity,
currentUserId = userId)
currentUserId = userId
)
}

// posting new events to timeline if any is registered
Expand Down Expand Up @@ -505,10 +510,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
}
}

private suspend fun decryptIfNeeded(event: Event, roomId: String) {
private fun decryptIfNeeded(event: Event, roomId: String) {
try {
// Event from sync does not have roomId, so add it to the event first
val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "")
// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching
val result = runBlocking { cryptoService.decryptEvent(event.copy(roomId = roomId), "") }
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
Expand Down