Skip to content

Commit

Permalink
Fix reconnection timeout handler not working in the token provider ph…
Browse files Browse the repository at this point in the history
…ase (#3513)
  • Loading branch information
nuno-vieira authored Dec 3, 2024
1 parent 0402640 commit 7941389
Show file tree
Hide file tree
Showing 16 changed files with 239 additions and 101 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## StreamChat
### 🐞 Fixed
- Fix a rare infinite loop triggering a crash when handling database changes [#3508](https://github.com/GetStream/stream-chat-swift/pull/3508)
- Fix reconnection timeout handler not working in the token provider phase [#3513](https://github.com/GetStream/stream-chat-swift/pull/3513)

## StreamChatUI
### 🐞 Fixed
Expand Down
11 changes: 7 additions & 4 deletions Sources/StreamChat/ChatClient+Environment.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ extension ChatClient {
)
}

var reconnectionHandlerBuilder: (_ chatClientConfig: ChatClientConfig) -> StreamTimer? = {
guard let reconnectionTimeout = $0.reconnectionTimeout else { return nil }
return ScheduledStreamTimer(interval: reconnectionTimeout, fireOnStart: false, repeats: false)
}

var extensionLifecycleBuilder = NotificationExtensionLifecycle.init

var requestEncoderBuilder: (_ baseURL: URL, _ apiKey: APIKey) -> RequestEncoder = DefaultRequestEncoder.init
Expand Down Expand Up @@ -97,8 +102,7 @@ extension ChatClient {
_ extensionLifecycle: NotificationExtensionLifecycle,
_ backgroundTaskScheduler: BackgroundTaskScheduler?,
_ internetConnection: InternetConnection,
_ keepConnectionAliveInBackground: Bool,
_ reconnectionTimeoutHandler: StreamTimer?
_ keepConnectionAliveInBackground: Bool
) -> ConnectionRecoveryHandler = {
DefaultConnectionRecoveryHandler(
webSocketClient: $0,
Expand All @@ -109,8 +113,7 @@ extension ChatClient {
internetConnection: $5,
reconnectionStrategy: DefaultRetryStrategy(),
reconnectionTimerType: DefaultTimer.self,
keepConnectionAliveInBackground: $6,
reconnectionTimeoutHandler: $7
keepConnectionAliveInBackground: $6
)
}

Expand Down
48 changes: 45 additions & 3 deletions Sources/StreamChat/ChatClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public class ChatClient {
/// Used as a bridge to communicate between the host app and the notification extension. Holds the state for the app lifecycle.
let extensionLifecycle: NotificationExtensionLifecycle

/// The component responsible to timeout the user connection if it takes more time than the `ChatClientConfig.reconnectionTimeout`.
var reconnectionTimeoutHandler: StreamTimer?

/// The environment object containing all dependencies of this `Client` instance.
private let environment: Environment

Expand Down Expand Up @@ -219,12 +222,18 @@ public class ChatClient {
setupOfflineRequestQueue()
setupConnectionRecoveryHandler(with: environment)
validateIntegrity()

reconnectionTimeoutHandler = environment.reconnectionHandlerBuilder(config)
reconnectionTimeoutHandler?.onChange = { [weak self] in
self?.timeout()
}
}

deinit {
Self._activeLocalStorageURLs.mutate { $0.subtract(databaseContainer.persistentStoreDescriptions.compactMap(\.url)) }
completeConnectionIdWaiters(connectionId: nil)
completeTokenWaiters(token: nil)
reconnectionTimeoutHandler?.stop()
}

func setupTokenRefresher() {
Expand Down Expand Up @@ -254,8 +263,7 @@ public class ChatClient {
extensionLifecycle,
environment.backgroundTaskSchedulerBuilder(),
environment.internetConnection(eventNotificationCenter, environment.internetMonitor),
config.staysConnectedInBackground,
config.reconnectionTimeout.map { ScheduledStreamTimer(interval: $0, fireOnStart: false, repeats: false) }
config.staysConnectedInBackground
)
}

Expand Down Expand Up @@ -300,7 +308,9 @@ public class ChatClient {
tokenProvider: @escaping TokenProvider,
completion: ((Error?) -> Void)? = nil
) {
reconnectionTimeoutHandler?.start()
connectionRecoveryHandler?.start()
connectionRepository.initialize()

authenticationRepository.connectUser(
userInfo: userInfo,
Expand Down Expand Up @@ -393,7 +403,9 @@ public class ChatClient {
userInfo: UserInfo,
completion: ((Error?) -> Void)? = nil
) {
connectionRepository.initialize()
connectionRecoveryHandler?.start()
reconnectionTimeoutHandler?.start()
authenticationRepository.connectGuestUser(userInfo: userInfo, completion: { completion?($0) })
}

Expand All @@ -417,6 +429,8 @@ public class ChatClient {
/// Connects an anonymous user
/// - Parameter completion: The completion that will be called once the **first** user session for the given token is setup.
public func connectAnonymousUser(completion: ((Error?) -> Void)? = nil) {
connectionRepository.initialize()
reconnectionTimeoutHandler?.start()
connectionRecoveryHandler?.start()
authenticationRepository.connectAnonymousUser(
completion: { completion?($0) }
Expand Down Expand Up @@ -458,7 +472,7 @@ public class ChatClient {
completion()
}
authenticationRepository.clearTokenProvider()
authenticationRepository.cancelTimers()
authenticationRepository.reset()
}

/// Disconnects the chat client from the chat servers. No further updates from the servers
Expand Down Expand Up @@ -617,6 +631,15 @@ public class ChatClient {
completion?($0)
}
}

private func timeout() {
completeConnectionIdWaiters(connectionId: nil)
authenticationRepository.completeTokenCompletions(error: ClientError.ReconnectionTimeout())
completeTokenWaiters(token: nil)
authenticationRepository.reset()
let webSocketConnectionState = webSocketClient?.connectionState ?? .initialized
connectionRepository.disconnect(source: .timeout(from: webSocketConnectionState)) {}
}
}

extension ChatClient: AuthenticationRepositoryDelegate {
Expand Down Expand Up @@ -646,6 +669,17 @@ extension ChatClient: ConnectionStateDelegate {
)
connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state)
try? backgroundWorker(of: MessageSender.self).didUpdateConnectionState(state)

switch state {
case .connecting:
if reconnectionTimeoutHandler?.isRunning == false {
reconnectionTimeoutHandler?.start()
}
case .connected:
reconnectionTimeoutHandler?.stop()
default:
break
}
}
}

Expand Down Expand Up @@ -692,6 +726,14 @@ extension ClientError {
}
}

public final class ReconnectionTimeout: ClientError {
override public var localizedDescription: String {
"""
The reconnection process has timed out after surpassing the value from `ChatClientConfig.reconnectionTimeout`.
"""
}
}

public final class MissingToken: ClientError {}
final class WaiterTimeout: ClientError {}

Expand Down
29 changes: 18 additions & 11 deletions Sources/StreamChat/Repositories/AuthenticationRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ class AuthenticationRepository {
isGettingToken = false
}

func cancelTimers() {
func reset() {
connectionProviderTimer?.cancel()
tokenProviderTimer?.cancel()
tokenQueue.async(flags: .barrier) {
self._tokenExpirationRetryStrategy.resetConsecutiveFailures()
}
}

func logOutUser() {
Expand Down Expand Up @@ -280,6 +283,19 @@ class AuthenticationRepository {
updateToken(token: token, notifyTokenWaiters: true)
}

func completeTokenCompletions(error: Error?) {
let completionBlocks: [(Error?) -> Void]? = tokenQueue.sync(flags: .barrier) {
self._isGettingToken = false
let completions = self._tokenRequestCompletions
return completions
}
completionBlocks?.forEach { $0(error) }
tokenQueue.async(flags: .barrier) {
self._tokenRequestCompletions = []
self._consecutiveRefreshFailures = 0
}
}

private func updateToken(token: Token?, notifyTokenWaiters: Bool) {
let waiters: [String: (Result<Token, Error>) -> Void] = tokenQueue.sync(flags: .barrier) {
_currentToken = token
Expand Down Expand Up @@ -331,21 +347,12 @@ class AuthenticationRepository {
isGettingToken = true

let onCompletion: (Error?) -> Void = { [weak self] error in
guard let self = self else { return }
if let error = error {
log.error("Error when getting token: \(error)", subsystems: .authentication)
} else {
log.debug("Successfully retrieved token", subsystems: .authentication)
}

let completionBlocks: [(Error?) -> Void]? = self.tokenQueue.sync(flags: .barrier) {
self._isGettingToken = false
let completions = self._tokenRequestCompletions
self._tokenRequestCompletions = []
self._consecutiveRefreshFailures = 0
return completions
}
completionBlocks?.forEach { $0(error) }
self?.completeTokenCompletions(error: error)
}

guard consecutiveRefreshFailures < Constants.maximumTokenRefreshAttempts else {
Expand Down
12 changes: 4 additions & 8 deletions Sources/StreamChat/Repositories/ConnectionRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class ConnectionRepository {
self.timerType = timerType
}

func initialize() {
webSocketClient?.initialize()
}

/// Connects the chat client the controller represents to the chat servers.
///
/// When the connection is established, `ChatClient` starts receiving chat updates, and `currentUser` variable is available.
Expand Down Expand Up @@ -95,14 +99,6 @@ class ConnectionRepository {
return
}

if connectionId == nil {
if source == .userInitiated {
log.warning("The client is already disconnected. Skipping the `disconnect` call.")
}
completion()
return
}

// Disconnect the web socket
webSocketClient?.disconnect(source: source) { [weak self] in
// Reset `connectionId`. This would happen asynchronously by the callback from WebSocketClient anyway, but it's
Expand Down
2 changes: 1 addition & 1 deletion Sources/StreamChat/WebSocketClient/ConnectionStatus.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ enum WebSocketConnectionState: Equatable {
}
}

/// The initial state meaning that there was no atempt to connect yet.
/// The initial state meaning that the web socket engine is not yet connected or connecting.
case initialized

/// The web socket is not connected. Contains the source/reason why the disconnection has happened.
Expand Down
23 changes: 11 additions & 12 deletions Sources/StreamChat/WebSocketClient/WebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class WebSocketClient {
self.eventNotificationCenter = eventNotificationCenter
}

func initialize() {
connectionState = .initialized
}

/// Connects the web connect.
///
/// Calling this method has no effect is the web socket is already connected, or is in the connecting phase.
Expand Down Expand Up @@ -137,23 +141,18 @@ class WebSocketClient {
source: WebSocketConnectionState.DisconnectionSource = .userInitiated,
completion: @escaping () -> Void
) {
connectionState = .disconnecting(source: source)
engineQueue.async { [engine, eventsBatcher] in
engine?.disconnect()

eventsBatcher.processImmediately(completion: completion)
switch connectionState {
case .initialized, .disconnected, .disconnecting:
connectionState = .disconnected(source: source)
case .connecting, .waitingForConnectionId, .connected:
connectionState = .disconnecting(source: source)
}
}

func timeout() {
let previousState = connectionState
connectionState = .disconnected(source: .timeout(from: previousState))

engineQueue.async { [engine, eventsBatcher] in
engine?.disconnect()

eventsBatcher.processImmediately {}
eventsBatcher.processImmediately(completion: completion)
}
log.error("Connection timed out. `\(connectionState)", subsystems: .webSocket)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ final class DefaultConnectionRecoveryHandler: ConnectionRecoveryHandler {
private var reconnectionStrategy: RetryStrategy
private var reconnectionTimer: TimerControl?
private let keepConnectionAliveInBackground: Bool
private var reconnectionTimeoutHandler: StreamTimer?

// MARK: - Init

Expand All @@ -49,8 +48,7 @@ final class DefaultConnectionRecoveryHandler: ConnectionRecoveryHandler {
internetConnection: InternetConnection,
reconnectionStrategy: RetryStrategy,
reconnectionTimerType: Timer.Type,
keepConnectionAliveInBackground: Bool,
reconnectionTimeoutHandler: StreamTimer?
keepConnectionAliveInBackground: Bool
) {
self.webSocketClient = webSocketClient
self.eventNotificationCenter = eventNotificationCenter
Expand All @@ -61,7 +59,6 @@ final class DefaultConnectionRecoveryHandler: ConnectionRecoveryHandler {
self.reconnectionStrategy = reconnectionStrategy
self.reconnectionTimerType = reconnectionTimerType
self.keepConnectionAliveInBackground = keepConnectionAliveInBackground
self.reconnectionTimeoutHandler = reconnectionTimeoutHandler
}

func start() {
Expand All @@ -71,7 +68,6 @@ final class DefaultConnectionRecoveryHandler: ConnectionRecoveryHandler {
func stop() {
unsubscribeFromNotifications()
cancelReconnectionTimer()
reconnectionTimeoutHandler?.stop()
}

deinit {
Expand All @@ -94,11 +90,6 @@ private extension DefaultConnectionRecoveryHandler {
name: .internetConnectionAvailabilityDidChange,
object: nil
)

reconnectionTimeoutHandler?.onChange = { [weak self] in
self?.webSocketClient.timeout()
self?.cancelReconnectionTimer()
}
}

func unsubscribeFromNotifications() {
Expand Down Expand Up @@ -177,17 +168,13 @@ extension DefaultConnectionRecoveryHandler {
switch state {
case .connecting:
cancelReconnectionTimer()
if reconnectionTimeoutHandler?.isRunning == false {
reconnectionTimeoutHandler?.start()
}

case .connected:
extensionLifecycle.setAppState(isReceivingEvents: true)
reconnectionStrategy.resetConsecutiveFailures()
syncRepository.syncLocalState {
log.info("Local state sync completed", subsystems: .offlineSupport)
}
reconnectionTimeoutHandler?.stop()

case .disconnected:
extensionLifecycle.setAppState(isReceivingEvents: false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Foundation
/// Mock implementation of `ChatClientUpdater`
final class ConnectionRepository_Mock: ConnectionRepository, Spy {
enum Signature {
static let initialize = "initialize()"
static let connect = "connect(completion:)"
static let disconnect = "disconnect(source:completion:)"
static let forceConnectionInactiveMode = "forceConnectionStatusForInactiveModeIfNeeded()"
Expand Down Expand Up @@ -58,6 +59,10 @@ final class ConnectionRepository_Mock: ConnectionRepository, Spy {

// MARK: - Overrides

override func initialize() {
record()
}

override func connect(completion: ((Error?) -> Void)? = nil) {
record()
if let result = connectResult {
Expand Down
Loading

0 comments on commit 7941389

Please sign in to comment.