Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a race condition when a system is started on a different queue from its event serialising queue. #38

Merged
merged 3 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion ReactiveFeedback/SignalProducer+System.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,23 @@ extension SignalProducer where Error == NoError {

return SignalProducer<Event, NoError>(Signal.merge(events))
.scan(initial, reduce)
.prefix(value: initial)
Copy link
Contributor

@sergdort sergdort Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that prefix operator starts source producer before you even subscribe to prefix?

Copy link
Contributor Author

@andersio andersio Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. LHS is started only after RHS (a producer emitting one value) completes, and that's the core of the issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RHS (the prefixed initial) sends out a value, feedbacks receive such value and produce an event, and the said event may be dequeued by the feedback loop queue before LHS (the reducer) starts on the initialising queue.

.on(value: stateObserver.send(value:))
.prefix(value: initial)
.on(started: {
// NOTE: Due to the nature of `prefix` lazily starting the producer being prefixed, we cannot rely
// on `on(value:)` to ignite the feedbacks with the initial state.
//
// At the time `prefix(value:)` calls `on(value:)` for the initial value, the events-reducer
// producer has not yet been started yet. Consequentially, it would lead to dropped events
// when the system is instantiated on a queue different from the queue used for
// serializing events.
//
// Having said that, `prefix(value:)` is guaranteed to have started the prefixed producer as
// part of the synchronous producer starting process. So we can address the issue by applying
// `on(started:)` after `prefix(value:)` to ignite the system, while having `on(value:)`
// instead applied before `prefix(value:)` to keep the reducer-to-feedbacks path open.
stateObserver.send(value: initial)
})
}
}

Expand Down
35 changes: 35 additions & 0 deletions ReactiveFeedbackTests/SystemTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,39 @@ class SystemTests: XCTestCase {
expect(value) == "initial_a"
expect(startCount) == 2
}

func test_should_not_miss_delivery_to_reducer_when_started_asynchronously() {
let creationScheduler = QueueScheduler()
let systemScheduler = QueueScheduler()

let observedState: Atomic<[String]> = Atomic([])

let semaphore = DispatchSemaphore(value: 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon my ignorance, but why do we need the aid of semaphore for this test?

Copy link
Contributor Author

@andersio andersio Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reproducing scenarios that require specific timing order often require manual synchronisation and sometimes repeated runs to lower chance of false positives, because the OS scheduler need not do you a favour when scheduling threads, and we can only create a controlled environment at best effort with these primitives.

Alternatively, this can be done by TestScheduler, but only if advance() supports draining tasks one-by-one.


creationScheduler.schedule {
SignalProducer<String, NoError>
.system(
initial: "initial",
scheduler: systemScheduler,
reduce: { (state: String, event: String) -> String in
return state + event
},
feedbacks: [
Feedback { scheduler, state in
return state
.take(first: 1)
.map(value: "_event")
.observe(on: scheduler)
.on(terminated: { semaphore.signal() })
}
]
)
.startWithValues { state in
observedState.modify { $0.append(state) }
}
}

semaphore.wait()
expect(observedState.value).toEventually(equal(["initial", "initial_event"]))
}
}