Skip to content

Commit

Permalink
Implement CHA-RL4b7
Browse files Browse the repository at this point in the history
(The first such point in the spec referenced in 25e5052, which
accidentally contains two spec points with this identifier.)

Here we implement the spec’s concept of “transient disconnect timeout”.
I’ll cancel these timeouts (where the spec says to do so) in a separate
commit.
  • Loading branch information
lawrence-forooghian committed Oct 17, 2024
1 parent 49b986c commit 0a22572
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 3 deletions.
59 changes: 56 additions & 3 deletions Sources/AblyChat/RoomLifecycleManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
await self.init(
status: nil,
pendingDiscontinuityEvents: nil,
idsOfContributorsWithTransientDisconnectTimeout: nil,
contributors: contributors,
logger: logger,
clock: clock
Expand All @@ -77,13 +78,15 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
internal init(
testsOnly_status status: Status? = nil,
testsOnly_pendingDiscontinuityEvents pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]? = nil,
testsOnly_idsOfContributorsWithTransientDisconnectTimeout idsOfContributorsWithTransientDisconnectTimeout: Set<Contributor.ID>? = nil,
contributors: [Contributor],
logger: InternalLogger,
clock: SimpleClock
) async {
await self.init(
status: status,
pendingDiscontinuityEvents: pendingDiscontinuityEvents,
idsOfContributorsWithTransientDisconnectTimeout: idsOfContributorsWithTransientDisconnectTimeout,
contributors: contributors,
logger: logger,
clock: clock
Expand All @@ -94,13 +97,18 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
private init(
status: Status?,
pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]?,
idsOfContributorsWithTransientDisconnectTimeout: Set<Contributor.ID>?,
contributors: [Contributor],
logger: InternalLogger,
clock: SimpleClock
) async {
self.status = status ?? .initialized
self.contributors = contributors
contributorAnnotations = .init(contributors: contributors, pendingDiscontinuityEvents: pendingDiscontinuityEvents ?? [:])
contributorAnnotations = .init(
contributors: contributors,
pendingDiscontinuityEvents: pendingDiscontinuityEvents ?? [:],
idsOfContributorsWithTransientDisconnectTimeout: idsOfContributorsWithTransientDisconnectTimeout ?? []
)
self.logger = logger
self.clock = clock

Expand Down Expand Up @@ -198,17 +206,36 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {

/// Stores manager state relating to a given contributor.
private struct ContributorAnnotation {
class TransientDisconnectTimeout: Identifiable {
/// A unique identifier for this timeout. This allows test cases to assert that one timeout has not been replaced by another.
var id = UUID()
/// The task that sleeps until the timeout period passes and then performs the timeout’s side effects. This will be `nil` if you have created a transient disconnect timeout using the `testsOnly_idsOfContributorsWithTransientDisconnectTimeout` manager initializer parameter.
var task: Task<Void, Error>?
}

// TODO: Not clear whether there can be multiple or just one (asked in https://github.com/ably/specification/pull/200/files#r1781927850)
var pendingDiscontinuityEvents: [ARTErrorInfo] = []
var transientDisconnectTimeout: TransientDisconnectTimeout?

var hasTransientDisconnectTimeout: Bool {
transientDisconnectTimeout != nil
}
}

/// Provides a `Dictionary`-like interface for storing manager state about individual contributors.
private struct ContributorAnnotations {
private var storage: [Contributor.ID: ContributorAnnotation]

init(contributors: [Contributor], pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]) {
init(
contributors: [Contributor],
pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]],
idsOfContributorsWithTransientDisconnectTimeout: Set<Contributor.ID>
) {
storage = contributors.reduce(into: [:]) { result, contributor in
result[contributor.id] = .init(pendingDiscontinuityEvents: pendingDiscontinuityEvents[contributor.id] ?? [])
result[contributor.id] = .init(
pendingDiscontinuityEvents: pendingDiscontinuityEvents[contributor.id] ?? [],
transientDisconnectTimeout: idsOfContributorsWithTransientDisconnectTimeout.contains(contributor.id) ? .init() : nil
)
}
}

Expand Down Expand Up @@ -276,6 +303,7 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
/// - the manager has recorded all pending discontinuity events provoked by the state change (you can retrieve these using ``testsOnly_pendingDiscontinuityEventsForContributor(at:)``)
/// - the manager has performed all status changes provoked by the state change
/// - the manager has performed all contributor actions provoked by the state change, namely calls to ``RoomLifecycleContributorChannel.detach()`` or ``RoomLifecycleContributor.emitDiscontinuity(_:)``
/// - the manager has recorded all transient disconnect timeouts provoked by the state change (you can retrieve this information using ``testsOnly_hasTransientDisconnectTimeout(for:) or ``testsOnly_idOfTransientDisconnectTimeout(for:)``)
internal func testsOnly_subscribeToHandledContributorStateChanges() -> Subscription<ARTChannelStateChange> {
let subscription = Subscription<ARTChannelStateChange>(bufferingPolicy: .unbounded)
stateChangeHandledSubscriptions.append(subscription)
Expand All @@ -285,6 +313,14 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
internal func testsOnly_pendingDiscontinuityEvents(for contributor: Contributor) -> [ARTErrorInfo] {
contributorAnnotations[contributor].pendingDiscontinuityEvents
}

internal func testsOnly_hasTransientDisconnectTimeout(for contributor: Contributor) -> Bool {
contributorAnnotations[contributor].hasTransientDisconnectTimeout
}

internal func testsOnly_idOfTransientDisconnectTimeout(for contributor: Contributor) -> UUID? {
contributorAnnotations[contributor].transientDisconnectTimeout?.id
}
#endif

/// Implements CHA-RL4b’s contributor state change handling.
Expand Down Expand Up @@ -364,6 +400,23 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {

changeStatus(to: .suspended(error: reason))
}
case .attaching:
if !hasOperationInProgress, !contributorAnnotations[contributor].hasTransientDisconnectTimeout {
// CHA-RL4b7
let transientDisconnectTimeout = ContributorAnnotation.TransientDisconnectTimeout()
contributorAnnotations[contributor].transientDisconnectTimeout = transientDisconnectTimeout
logger.log(message: "Starting transient disconnect timeout \(transientDisconnectTimeout.id) for \(contributor)", level: .debug)
transientDisconnectTimeout.task = Task {
do {
try await clock.sleep(timeInterval: 5)
} catch {
logger.log(message: "Transient disconnect timeout \(transientDisconnectTimeout.id) for \(contributor) was interrupted, error \(error)", level: .debug)
}
logger.log(message: "Transient disconnect timeout \(transientDisconnectTimeout.id) for \(contributor) completed", level: .debug)
contributorAnnotations[contributor].transientDisconnectTimeout = nil
changeStatus(to: .attachingDueToContributorStateChange(error: stateChange.reason))
}
}
default:
break
}
Expand Down
27 changes: 27 additions & 0 deletions Tests/AblyChatTests/Mocks/MockSimpleClock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,36 @@ import Foundation

/// A mock implementation of ``SimpleClock`` which records its arguments but does not actually sleep.
actor MockSimpleClock: SimpleClock {
private let sleepBehavior: SleepBehavior

enum SleepBehavior {
case success
case fromFunction(@Sendable () async -> Void)
}

init(sleepBehavior: SleepBehavior? = nil) {
self.sleepBehavior = sleepBehavior ?? .success
_sleepCallArgumentsAsyncSequence = AsyncStream<TimeInterval>.makeStream()
}

private(set) var sleepCallArguments: [TimeInterval] = []

/// Emits an element each time ``sleep(timeInterval:)`` is called.
var sleepCallArgumentsAsyncSequence: AsyncStream<TimeInterval> {
_sleepCallArgumentsAsyncSequence.stream
}

private let _sleepCallArgumentsAsyncSequence: (stream: AsyncStream<TimeInterval>, continuation: AsyncStream<TimeInterval>.Continuation)

func sleep(timeInterval: TimeInterval) async throws {
sleepCallArguments.append(timeInterval)
_sleepCallArgumentsAsyncSequence.continuation.yield(timeInterval)

switch sleepBehavior {
case .success:
break
case let .fromFunction(function):
await function()
}
}
}
105 changes: 105 additions & 0 deletions Tests/AblyChatTests/RoomLifecycleManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,39 @@ struct RoomLifecycleManagerTests {
}
}

/// A mock implementation of a `SimpleClock`’s `sleep(timeInterval:)` operation. Its ``complete(result:)`` method allows you to signal to the mock that the sleep should complete.
final class SignallableSleepOperation: Sendable {
private let continuation: AsyncStream<Void>.Continuation

/// When this behavior is set as a ``MockSimpleClock``’s `sleepBehavior`, calling ``complete(result:)`` will cause the corresponding `sleep(timeInterval:)` to complete with the result passed to that method.
let behavior: MockSimpleClock.SleepBehavior

init() {
let (stream, continuation) = AsyncStream.makeStream(of: Void.self)
self.continuation = continuation

behavior = .fromFunction {
await (stream.first { _ in true })!
}
}

/// Causes the async function embedded in ``behavior`` to return.
func complete() {
continuation.yield(())
}
}

private func createManager(
forTestingWhatHappensWhenCurrentlyIn status: RoomLifecycleManager<MockRoomLifecycleContributor>.Status? = nil,
forTestingWhatHappensWhenHasPendingDiscontinuityEvents pendingDiscontinuityEvents: [MockRoomLifecycleContributor.ID: [ARTErrorInfo]]? = nil,
forTestingWhatHappensWhenHasTransientDisconnectTimeoutForTheseContributorIDs idsOfContributorsWithTransientDisconnectTimeout: Set<MockRoomLifecycleContributor.ID>? = nil,
contributors: [MockRoomLifecycleContributor] = [],
clock: SimpleClock = MockSimpleClock()
) async -> RoomLifecycleManager<MockRoomLifecycleContributor> {
await .init(
testsOnly_status: status,
testsOnly_pendingDiscontinuityEvents: pendingDiscontinuityEvents,
testsOnly_idsOfContributorsWithTransientDisconnectTimeout: idsOfContributorsWithTransientDisconnectTimeout,
contributors: contributors,
logger: TestLogger(),
clock: clock
Expand Down Expand Up @@ -976,6 +1000,87 @@ struct RoomLifecycleManagerTests {
}
}

// @spec CHA-RL4b6
func contributorAttachingEvent_withNoOperationInProgress_withTransientDisconnectTimeout() async throws {
// Given: A RoomLifecycleManager, with no operation in progress, with a transient disconnect timeout for the contributor mentioned in "When:"
let contributor = createContributor()
let manager = await createManager(
forTestingWhatHappensWhenCurrentlyIn: .initialized, // arbitrary no-operation-in-progress
forTestingWhatHappensWhenHasTransientDisconnectTimeoutForTheseContributorIDs: [contributor.id],
contributors: [contributor]
)

let idOfExistingTransientDisconnectTimeout = try #require(await manager.testsOnly_idOfTransientDisconnectTimeout(for: contributor))

// When: A contributor emits an ATTACHING event
let contributorStateChange = ARTChannelStateChange(
current: .attaching,
previous: .detached, // arbitrary
event: .attaching,
reason: nil // arbitrary
)

await waitForManager(manager, toHandleContributorStateChange: contributorStateChange) {
await contributor.channel.emitStateChange(contributorStateChange)
}

// Then: It does not set a new transient disconnect timeout (this is my interpretation of CHA-RL4b6’s “no action is needed”, i.e. that the spec point intends to just be the contrapositive of CHA-RL4b7)
#expect(await manager.testsOnly_idOfTransientDisconnectTimeout(for: contributor) == idOfExistingTransientDisconnectTimeout)
}

// @specPartial CHA-RL4b7 - This is marked as specPartial because at time of writing the spec point CHA-RL4b7 has been accidentally duplicated to specify two separate behaviours. This test is for the first of those two. TODO: change this one to @spec once spec fixed (see discussion in https://github.com/ably/specification/pull/200#discussion_r1763770348)
@Test(
arguments: [
nil,
ARTErrorInfo.create(withCode: 123, message: ""), // arbitrary non-nil
]
)
func contributorAttachingEvent_withNoOperationInProgress_withNoTransientDisconnectTimeout(contributorStateChangeReason: ARTErrorInfo?) async throws {
// Given: A RoomLifecycleManager, with no operation in progress, with no transient disconnect timeout for the contributor mentioned in "When:"
let contributor = createContributor()
let sleepOperation = SignallableSleepOperation()
let clock = MockSimpleClock(sleepBehavior: sleepOperation.behavior)
let manager = await createManager(
forTestingWhatHappensWhenCurrentlyIn: .initialized, // arbitrary no-operation-in-progress
contributors: [contributor],
clock: clock
)

// When: (1) A contributor emits an ATTACHING event
let contributorStateChange = ARTChannelStateChange(
current: .attaching,
previous: .detached, // arbitrary
event: .attaching,
reason: contributorStateChangeReason
)

async let maybeClockSleepArgument = clock.sleepCallArgumentsAsyncSequence.first { _ in true }

await waitForManager(manager, toHandleContributorStateChange: contributorStateChange) {
await contributor.channel.emitStateChange(contributorStateChange)
}

// Then: The manager records a 5 second transient disconnect timeout for this contributor
#expect(try #require(await maybeClockSleepArgument) == 5)
#expect(await manager.testsOnly_hasTransientDisconnectTimeout(for: contributor))

// and When: This transient disconnect timeout completes

let roomStatusSubscription = await manager.onChange(bufferingPolicy: .unbounded)
async let maybeRoomAttachingStatusChange = roomStatusSubscription.attachingElements().first { _ in true }

sleepOperation.complete()

// Then:
// 1. The room status transitions to ATTACHING, using the `reason` from the contributor ATTACHING change in (1)
// 2. The manager no longer has a transient disconnect timeout for this contributor

let roomAttachingStatusChange = try #require(await maybeRoomAttachingStatusChange)
#expect(roomAttachingStatusChange.error == contributorStateChangeReason)

#expect(await !manager.testsOnly_hasTransientDisconnectTimeout(for: contributor))
}

// @specOneOf(1/2) CHA-RL4b8
@Test
func contributorAttachedEvent_withNoOperationInProgress_roomNotAttached_allContributorsAttached() async throws {
Expand Down

0 comments on commit 0a22572

Please sign in to comment.