From ef047473b2a934fcd8fe23b40e9c2b6e65f0412b Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Thu, 28 Feb 2019 22:20:21 +0000 Subject: [PATCH 1/3] Add a failing test case for the race condition. --- ReactiveFeedbackTests/SystemTests.swift | 35 +++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/ReactiveFeedbackTests/SystemTests.swift b/ReactiveFeedbackTests/SystemTests.swift index 269bf8a..d185608 100644 --- a/ReactiveFeedbackTests/SystemTests.swift +++ b/ReactiveFeedbackTests/SystemTests.swift @@ -181,4 +181,39 @@ class SystemTests: XCTestCase { expect(value) == "initial_a" expect(startCount) == 2 } + + func test_should_not_miss_delivery_to_reducer_when_started_asynchronously() { + let creationScheduler = QueueScheduler() + let systemScheduler = QueueScheduler() + + let observedState: Atomic<[String]> = Atomic([]) + + let semaphore = DispatchSemaphore(value: 0) + + creationScheduler.schedule { + SignalProducer + .system( + initial: "initial", + scheduler: systemScheduler, + reduce: { (state: String, event: String) -> String in + return state + event + }, + feedbacks: [ + Feedback { scheduler, state in + return state + .take(first: 1) + .map(value: "_event") + .observe(on: scheduler) + .on(terminated: { semaphore.signal() }) + } + ] + ) + .startWithValues { state in + observedState.modify { $0.append(state) } + } + } + + semaphore.wait() + expect(observedState.value).toEventually(equal(["initial", "initial_event"])) + } } From 75523c5e15f04ad3dc9026e2c32488e85ef86e49 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Thu, 28 Feb 2019 22:30:32 +0000 Subject: [PATCH 2/3] Fix the race condition. --- ReactiveFeedback/SignalProducer+System.swift | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/ReactiveFeedback/SignalProducer+System.swift b/ReactiveFeedback/SignalProducer+System.swift index 4a49299..f590027 100644 --- a/ReactiveFeedback/SignalProducer+System.swift +++ b/ReactiveFeedback/SignalProducer+System.swift @@ -29,8 +29,23 @@ extension SignalProducer where Error == NoError { return SignalProducer(Signal.merge(events)) .scan(initial, reduce) - .prefix(value: initial) .on(value: stateObserver.send(value:)) + .prefix(value: initial) + .on(started: { + // NOTE: Due to the nature of `prefix` lazily starting the producer being prefixed, we cannot rely + // on `on(value:)` to ignite the feedbacks with the initial state. + // + // At the time `prefix(value:)` calls `on(value:)` for the initial value, the events-reducer + // producer has not yet been started yet. Consequentially, it would lead to dropped events + // when the system is instantiated on a queue different from the queue used for + // serializing events. + // + // Having said that, `prefix(value:)` is guaranteed to have started the prefixed producer as + // part of the synchronous producer starting process. So we can address the issue by applying + // `on(started:)` after `prefix(value:)` to ignite the system, while having `on(value:)` + // instead applied before `prefix(value:)` to keep the reducer-to-feedbacks path open. + stateObserver.send(value: initial) + }) } } From 6f96dbdcbc93893a4a1e449268fd4f864bd88a95 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Tue, 19 Mar 2019 11:07:33 +0000 Subject: [PATCH 3/3] Combine the two occasions of `on` into one. --- ReactiveFeedback/SignalProducer+System.swift | 34 +++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/ReactiveFeedback/SignalProducer+System.swift b/ReactiveFeedback/SignalProducer+System.swift index f590027..c171ee5 100644 --- a/ReactiveFeedback/SignalProducer+System.swift +++ b/ReactiveFeedback/SignalProducer+System.swift @@ -29,23 +29,25 @@ extension SignalProducer where Error == NoError { return SignalProducer(Signal.merge(events)) .scan(initial, reduce) - .on(value: stateObserver.send(value:)) + .on( + started: { + // NOTE: Due to the nature of `prefix` lazily starting the producer being prefixed, we cannot rely + // on `on(value:)` to ignite the feedbacks with the initial state. + // + // At the time `prefix(value:)` calls `on(value:)` for the initial value, the events-reducer + // producer has not yet been started yet. Consequentially, it would lead to dropped events + // when the system is instantiated on a queue different from the queue used for + // serializing events. + // + // Having said that, `prefix(value:)` is guaranteed to have started the prefixed producer as + // part of the synchronous producer starting process. So we can address the issue by applying + // `on(started:)` after `prefix(value:)` to ignite the system, while having `on(value:)` + // instead applied before `prefix(value:)` to keep the reducer-to-feedbacks path open. + stateObserver.send(value: initial) + }, + value: stateObserver.send(value:) + ) .prefix(value: initial) - .on(started: { - // NOTE: Due to the nature of `prefix` lazily starting the producer being prefixed, we cannot rely - // on `on(value:)` to ignite the feedbacks with the initial state. - // - // At the time `prefix(value:)` calls `on(value:)` for the initial value, the events-reducer - // producer has not yet been started yet. Consequentially, it would lead to dropped events - // when the system is instantiated on a queue different from the queue used for - // serializing events. - // - // Having said that, `prefix(value:)` is guaranteed to have started the prefixed producer as - // part of the synchronous producer starting process. So we can address the issue by applying - // `on(started:)` after `prefix(value:)` to ignite the system, while having `on(value:)` - // instead applied before `prefix(value:)` to keep the reducer-to-feedbacks path open. - stateObserver.send(value: initial) - }) } }