Skip to content

Commit

Permalink
fix: infinite loop on message send failure [WPB-5789] (#2330)
Browse files Browse the repository at this point in the history
* fix: infinite loop on message send failure

* review fixes
  • Loading branch information
Garzas authored Dec 27, 2023
1 parent 7b94e98 commit 7c64581
Showing 1 changed file with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ internal class MessageSenderImpl internal constructor(
target: BroadcastMessageTarget
): Either<CoreFailure, Unit> =
withContext(scope.coroutineContext) {
attemptToBroadcastWithProteus(message, target).map { }
attemptToBroadcastWithProteus(message, target, remainingAttempts = 2).map { }
}

override suspend fun sendClientDiscoveryMessage(message: Message.Regular): Either<CoreFailure, String> = attemptToSend(
Expand All @@ -232,7 +232,7 @@ internal class MessageSenderImpl internal constructor(

is Conversation.ProtocolInfo.Proteus, is Conversation.ProtocolInfo.Mixed -> {
// TODO(messaging): make this thread safe (per user)
attemptToSendWithProteus(message, messageTarget)
attemptToSendWithProteus(message, messageTarget, remainingAttempts = 1)
}
}
}
Expand All @@ -246,7 +246,8 @@ internal class MessageSenderImpl internal constructor(

private suspend fun attemptToSendWithProteus(
message: Message.Sendable,
messageTarget: MessageTarget
messageTarget: MessageTarget,
remainingAttempts: Int
): Either<CoreFailure, String> {
val conversationId = message.conversationId
val target = when (messageTarget) {
Expand All @@ -272,7 +273,7 @@ internal class MessageSenderImpl internal constructor(
is MessageTarget.Conversation ->
MessageTarget.Conversation((messageTarget.usersToIgnore + usersWithoutSessions.users).toSet())
}
trySendingProteusEnvelope(envelope, message, updatedMessageTarget)
trySendingProteusEnvelope(envelope, message, updatedMessageTarget, remainingAttempts)
}
}
}
Expand All @@ -290,7 +291,8 @@ internal class MessageSenderImpl internal constructor(

private suspend fun attemptToBroadcastWithProteus(
message: BroadcastMessage,
target: BroadcastMessageTarget
target: BroadcastMessageTarget,
remainingAttempts: Int,
): Either<CoreFailure, String> {
return userRepository.getAllRecipients().flatMap { (teamRecipients, otherRecipients) ->
val (option, recipients) = getBroadcastParams(
Expand All @@ -306,7 +308,7 @@ internal class MessageSenderImpl internal constructor(
.flatMap { _ ->
messageEnvelopeCreator
.createOutgoingBroadcastEnvelope(recipients, message)
.flatMap { envelope -> tryBroadcastProteusEnvelope(envelope, message, option, target) }
.flatMap { envelope -> tryBroadcastProteusEnvelope(envelope, message, option, target, remainingAttempts) }
}
}
}
Expand Down Expand Up @@ -347,13 +349,20 @@ internal class MessageSenderImpl internal constructor(
private suspend fun trySendingProteusEnvelope(
envelope: MessageEnvelope,
message: Message.Sendable,
messageTarget: MessageTarget
messageTarget: MessageTarget,
remainingAttempts: Int
): Either<CoreFailure, String> =
messageRepository
.sendEnvelope(message.conversationId, envelope, messageTarget)
.fold({
handleProteusError(it, "Send", message.toLogString(), message.conversationId) {
attemptToSendWithProteus(message, messageTarget)
handleProteusError(
failure = it,
action = "Send",
messageLogString = message.toLogString(),
conversationId = message.conversationId,
remainingAttempts = remainingAttempts
) { remainingAttempts ->
attemptToSendWithProteus(message, messageTarget, remainingAttempts)
}
}, { messageSent ->
logger.i("Message Send Success: { \"message\" : \"${message.toLogString()}\" }")
Expand All @@ -370,15 +379,17 @@ internal class MessageSenderImpl internal constructor(
envelope: MessageEnvelope,
message: BroadcastMessage,
option: BroadcastMessageOption,
target: BroadcastMessageTarget
target: BroadcastMessageTarget,
remainingAttempts: Int
): Either<CoreFailure, String> =
messageRepository
.broadcastEnvelope(envelope, option)
.fold({
handleProteusError(it, "Broadcast", message.toLogString(), null) {
handleProteusError(it, "Broadcast", message.toLogString(), null, remainingAttempts = 1) {
attemptToBroadcastWithProteus(
message,
target
target,
remainingAttempts
)
}
}, {
Expand All @@ -391,8 +402,9 @@ internal class MessageSenderImpl internal constructor(
action: String, // Send or Broadcast
messageLogString: String,
conversationId: ConversationId?,
retry: suspend () -> Either<CoreFailure, String>
) =
remainingAttempts: Int,
retry: suspend (remainingAttempts: Int) -> Either<CoreFailure, String>
): Either<CoreFailure, String> =
when (failure) {
is ProteusSendMessageFailure -> {
logger.w(
Expand All @@ -401,8 +413,19 @@ internal class MessageSenderImpl internal constructor(
messageSendFailureHandler
.handleClientsHaveChangedFailure(failure, conversationId)
.flatMap {
logger.w("Retrying After Proteus $action Failure: { \"message\" : \"${messageLogString}\"}")
retry()
if (remainingAttempts > 0) {
logger.w(
"Retrying (remaining attempts: $remainingAttempts) after Proteus $action " +
"Failure: { \"message\" : \"${messageLogString}\"}"
)
retry(remainingAttempts - 1)
} else {
logger.e(
"No remaining attempts to retry after Proteus $action " +
"Failure: { \"message\" : \"${messageLogString}\"}"
)
Either.Left(failure)
}
}
.onFailure {
val logLine = "Fatal Proteus $action Failure: { \"message\" : \"${messageLogString}\"" +
Expand Down Expand Up @@ -478,5 +501,4 @@ internal class MessageSenderImpl internal constructor(
else {
messageRepository.persistRecipientsDeliveryFailure(message.conversationId, message.id, messageSent.failedToConfirmClients)
}

}

0 comments on commit 7c64581

Please sign in to comment.