Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix case of lost messages: Do not delete events from the last forward chunk #5878

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5878.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a case where paginating forwards would lose messages
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ import org.matrix.android.sdk.internal.extensions.assertIsManaged
import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
import timber.log.Timber

internal fun ChunkEntity.moveEventsFrom(chunkToMerge: ChunkEntity, direction: PaginationDirection) {
assertIsManaged()
val localRealm = this.realm
val eventsToMerge = if (direction == PaginationDirection.FORWARDS) {
chunkToMerge.timelineEvents.sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.ASCENDING)
} else {
chunkToMerge.timelineEvents.sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
}
eventsToMerge.forEach {
if (addTimelineEventFromMove(localRealm, it, direction)) {
chunkToMerge.timelineEvents.remove(it)
}
}
}

internal fun ChunkEntity.merge(roomId: String, chunkToMerge: ChunkEntity, direction: PaginationDirection) {
assertIsManaged()
val localRealm = this.realm
Expand Down Expand Up @@ -165,6 +180,17 @@ private fun ChunkEntity.addTimelineEventFromMerge(realm: Realm, timelineEventEnt
timelineEvents.add(copied)
}

private fun ChunkEntity.addTimelineEventFromMove(realm: Realm, event: TimelineEventEntity, direction: PaginationDirection): Boolean {
val eventId = event.eventId
if (timelineEvents.find(eventId) != null) {
return false
}
event.displayIndex = nextDisplayIndex(direction)
handleThreadSummary(realm, eventId, event)
timelineEvents.add(event)
return true
}

/**
* Upon copy of the timeline events we should update the latestMessage TimelineEventEntity with the new one.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ internal class TokenChunkEventPersistor @Inject constructor(
}
}
val optimizedThreadSummaryMap = hashMapOf<String, EventEntity>()
var hasNewEvents = false
var existingChunkToLink: ChunkEntity? = null
run processTimelineEvents@{
eventList.forEach { event ->
if (event.eventId == null || event.senderId == null) {
Expand All @@ -194,18 +196,36 @@ internal class TokenChunkEventPersistor @Inject constructor(
// If it exists, we want to stop here, just link the prevChunk
val existingChunk = existingTimelineEvent?.chunk?.firstOrNull()
if (existingChunk != null) {
// If we haven't found a single new event yet, we don't want to link in the pagination direction, as that might cause a
// timeline loop if the other chunk is in the other direction.
if (!hasNewEvents) {
Timber.i("Skip adding event $eventId, already exists")
// Only skip this event, but still process other events.
// Remember this chunk, since in case we don't find any new events, we still want to link this in pagination direction
// e.g. in order to link a chunk to the /sync chunk
if (existingChunkToLink == null) {
existingChunkToLink = existingChunk
}
return@forEach
}
when (direction) {
PaginationDirection.BACKWARDS -> {
if (currentChunk.nextChunk == existingChunk) {
Timber.w("Avoid double link, shouldn't happen in an ideal world")
Timber.i("Avoid double link")
// Do not stop processing here: even though this event already exists in an already linked chunk,
// we still may have new events to add
return@forEach
} else {
currentChunk.prevChunk = existingChunk
existingChunk.nextChunk = currentChunk
}
}
PaginationDirection.FORWARDS -> {
if (currentChunk.prevChunk == existingChunk) {
Timber.w("Avoid double link, shouldn't happen in an ideal world")
Timber.i("Avoid double link")
// Do not stop processing here: even though this event already exists in an already linked chunk,
// we still may have new events to add
return@forEach
} else {
currentChunk.nextChunk = existingChunk
existingChunk.prevChunk = currentChunk
Expand All @@ -215,6 +235,10 @@ internal class TokenChunkEventPersistor @Inject constructor(
// Stop processing here
return@processTimelineEvents
}

// existingChunk == null => this is a new event we haven't seen before
hasNewEvents = true

val ageLocalTs = event.unsignedData?.age?.let { now - it }
var eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
if (event.type == EventType.STATE_ROOM_MEMBER && event.stateKey != null) {
Expand Down Expand Up @@ -243,6 +267,27 @@ internal class TokenChunkEventPersistor @Inject constructor(
}
}
}
val existingChunk = existingChunkToLink
if (!hasNewEvents && existingChunk != null) {
when (direction) {
PaginationDirection.BACKWARDS -> {
if (currentChunk.nextChunk == existingChunk) {
Timber.w("Avoid double link, shouldn't happen in an ideal world")
} else {
currentChunk.prevChunk = existingChunk
existingChunk.nextChunk = currentChunk
}
}
PaginationDirection.FORWARDS -> {
if (currentChunk.prevChunk == existingChunk) {
Timber.w("Avoid double link, shouldn't happen in an ideal world")
} else {
currentChunk.nextChunk = existingChunk
existingChunk.prevChunk = currentChunk
}
}
}
}
if (currentChunk.isValid) {
RoomEntity.where(realm, roomId).findFirst()?.addIfNecessary(currentChunk)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.matrix.android.sdk.internal.crypto.DefaultCryptoService
import org.matrix.android.sdk.internal.database.helper.addIfNecessary
import org.matrix.android.sdk.internal.database.helper.addTimelineEvent
import org.matrix.android.sdk.internal.database.helper.createOrUpdate
import org.matrix.android.sdk.internal.database.helper.moveEventsFrom
import org.matrix.android.sdk.internal.database.helper.updateThreadSummaryIfNeeded
import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.database.mapper.toEntity
Expand Down Expand Up @@ -368,6 +369,15 @@ internal class RoomSyncHandler @Inject constructor(
aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity {
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
if (isLimited && lastChunk != null) {
Timber.i("Deleting last forward chunk (${lastChunk.identifier()})")
// Add events that oldPrev may have dropped since they were already in lastChunk
val oldPrev = lastChunk.prevChunk
if (oldPrev != null && oldPrev.nextToken != lastChunk.prevToken) {
// If the tokens mismatch, this means we have chained them due to duplicated events.
// In this case, we need to make sure to re-add possibly dropped events (which would have
// been duplicates otherwise)
oldPrev.moveEventsFrom(lastChunk, PaginationDirection.FORWARDS)
}
lastChunk.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = true)
}
val chunkEntity = if (!isLimited && lastChunk != null) {
Expand Down