From 7c645819d40ac5514db969adc15f698df4769754 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20=C5=BBerko?= Date: Wed, 27 Dec 2023 23:18:46 +0100 Subject: [PATCH] fix: infinite loop on message send failure [WPB-5789] (#2330) * fix: infinite loop on message send failure * review fixes --- .../logic/feature/message/MessageSender.kt | 56 +++++++++++++------ 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSender.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSender.kt index 9f1d8526861..a78061bf16f 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSender.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/message/MessageSender.kt @@ -211,7 +211,7 @@ internal class MessageSenderImpl internal constructor( target: BroadcastMessageTarget ): Either = withContext(scope.coroutineContext) { - attemptToBroadcastWithProteus(message, target).map { } + attemptToBroadcastWithProteus(message, target, remainingAttempts = 2).map { } } override suspend fun sendClientDiscoveryMessage(message: Message.Regular): Either = attemptToSend( @@ -232,7 +232,7 @@ internal class MessageSenderImpl internal constructor( is Conversation.ProtocolInfo.Proteus, is Conversation.ProtocolInfo.Mixed -> { // TODO(messaging): make this thread safe (per user) - attemptToSendWithProteus(message, messageTarget) + attemptToSendWithProteus(message, messageTarget, remainingAttempts = 1) } } } @@ -246,7 +246,8 @@ internal class MessageSenderImpl internal constructor( private suspend fun attemptToSendWithProteus( message: Message.Sendable, - messageTarget: MessageTarget + messageTarget: MessageTarget, + remainingAttempts: Int ): Either { val conversationId = message.conversationId val target = when (messageTarget) { @@ -272,7 +273,7 @@ internal class MessageSenderImpl internal constructor( is MessageTarget.Conversation -> MessageTarget.Conversation((messageTarget.usersToIgnore + usersWithoutSessions.users).toSet()) } - trySendingProteusEnvelope(envelope, message, updatedMessageTarget) + trySendingProteusEnvelope(envelope, message, updatedMessageTarget, remainingAttempts) } } } @@ -290,7 +291,8 @@ internal class MessageSenderImpl internal constructor( private suspend fun attemptToBroadcastWithProteus( message: BroadcastMessage, - target: BroadcastMessageTarget + target: BroadcastMessageTarget, + remainingAttempts: Int, ): Either { return userRepository.getAllRecipients().flatMap { (teamRecipients, otherRecipients) -> val (option, recipients) = getBroadcastParams( @@ -306,7 +308,7 @@ internal class MessageSenderImpl internal constructor( .flatMap { _ -> messageEnvelopeCreator .createOutgoingBroadcastEnvelope(recipients, message) - .flatMap { envelope -> tryBroadcastProteusEnvelope(envelope, message, option, target) } + .flatMap { envelope -> tryBroadcastProteusEnvelope(envelope, message, option, target, remainingAttempts) } } } } @@ -347,13 +349,20 @@ internal class MessageSenderImpl internal constructor( private suspend fun trySendingProteusEnvelope( envelope: MessageEnvelope, message: Message.Sendable, - messageTarget: MessageTarget + messageTarget: MessageTarget, + remainingAttempts: Int ): Either = messageRepository .sendEnvelope(message.conversationId, envelope, messageTarget) .fold({ - handleProteusError(it, "Send", message.toLogString(), message.conversationId) { - attemptToSendWithProteus(message, messageTarget) + handleProteusError( + failure = it, + action = "Send", + messageLogString = message.toLogString(), + conversationId = message.conversationId, + remainingAttempts = remainingAttempts + ) { remainingAttempts -> + attemptToSendWithProteus(message, messageTarget, remainingAttempts) } }, { messageSent -> logger.i("Message Send Success: { \"message\" : \"${message.toLogString()}\" }") @@ -370,15 +379,17 @@ internal class MessageSenderImpl internal constructor( envelope: MessageEnvelope, message: BroadcastMessage, option: BroadcastMessageOption, - target: BroadcastMessageTarget + target: BroadcastMessageTarget, + remainingAttempts: Int ): Either = messageRepository .broadcastEnvelope(envelope, option) .fold({ - handleProteusError(it, "Broadcast", message.toLogString(), null) { + handleProteusError(it, "Broadcast", message.toLogString(), null, remainingAttempts = 1) { attemptToBroadcastWithProteus( message, - target + target, + remainingAttempts ) } }, { @@ -391,8 +402,9 @@ internal class MessageSenderImpl internal constructor( action: String, // Send or Broadcast messageLogString: String, conversationId: ConversationId?, - retry: suspend () -> Either - ) = + remainingAttempts: Int, + retry: suspend (remainingAttempts: Int) -> Either + ): Either = when (failure) { is ProteusSendMessageFailure -> { logger.w( @@ -401,8 +413,19 @@ internal class MessageSenderImpl internal constructor( messageSendFailureHandler .handleClientsHaveChangedFailure(failure, conversationId) .flatMap { - logger.w("Retrying After Proteus $action Failure: { \"message\" : \"${messageLogString}\"}") - retry() + if (remainingAttempts > 0) { + logger.w( + "Retrying (remaining attempts: $remainingAttempts) after Proteus $action " + + "Failure: { \"message\" : \"${messageLogString}\"}" + ) + retry(remainingAttempts - 1) + } else { + logger.e( + "No remaining attempts to retry after Proteus $action " + + "Failure: { \"message\" : \"${messageLogString}\"}" + ) + Either.Left(failure) + } } .onFailure { val logLine = "Fatal Proteus $action Failure: { \"message\" : \"${messageLogString}\"" + @@ -478,5 +501,4 @@ internal class MessageSenderImpl internal constructor( else { messageRepository.persistRecipientsDeliveryFailure(message.conversationId, message.id, messageSent.failedToConfirmClients) } - }