-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix a race condition when a system is started on a different queue from its event serialising queue. #38
Conversation
6fcc41f
to
b29d979
Compare
b29d979
to
75523c5
Compare
I really would like to understand this fix, since we extensively use ReactiveFeedback in our codebase, but I can't even understand the first paragraph. 😢 Do you think you could create a small diagram (like a timeline), so I can have a mental image of the events? |
(beautiful diagrams, thanks! ❤️ ) (I am assuming S0 and S1, are related to State0 and State1?) I am looking at scenario B, does it mean that the prefix |
@RuiAAPeres The reducer is constructed with the The reducer wouldn’t process any state directly, only events produced by feedbacks in response to the latest state. Then by processing an event, it computes a new state that gets fed back into the feedbacks. |
@@ -29,8 +29,23 @@ extension SignalProducer where Error == NoError { | |||
|
|||
return SignalProducer<Event, NoError>(Signal.merge(events)) | |||
.scan(initial, reduce) | |||
.prefix(value: initial) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying that prefix
operator starts source producer before you even subscribe to prefix
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. LHS is started only after RHS (a producer emitting one value) completes, and that's the core of the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RHS (the prefixed initial) sends out a value, feedbacks receive such value and produce an event, and the said event may be dequeued by the feedback loop queue before LHS (the reducer) starts on the initialising queue.
|
||
let observedState: Atomic<[String]> = Atomic([]) | ||
|
||
let semaphore = DispatchSemaphore(value: 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon my ignorance, but why do we need the aid of semaphore for this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reproducing scenarios that require specific timing order often require manual synchronisation and sometimes repeated runs to lower chance of false positives, because the OS scheduler need not do you a favour when scheduling threads, and we can only create a controlled environment at best effort with these primitives.
Alternatively, this can be done by TestScheduler
, but only if advance()
supports draining tasks one-by-one.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to not forget to fix Property+System.swift
... #38 (comment)
Also, we probably need to
state <~ SignalProducer.system(...).skip(first: 2)
instead sinceinitial
will be emitted twice.
@Inamy That doesn’t sound right to me. It shouldn’t emit the initial value twice afterwards. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andersio #38 (comment)
My bad. It doesn't seem required, so I will approve now 👍
Resolve #37.
Why?
Due to the nature of
prefix
lazily starting the producer being prefixed, we cannot rely onon(value:)
to ignite the feedbacks with the initial state. At the timeprefix(value:)
callson(value:)
for the initial value, the events-reducer producer has not yet been started yet. Consequentially, it would lead to dropped events when the system is instantiated on a queue different from the queue used for serializing events.Explanation
The current operator application order is
prefix(value:)
followed byon(value:)
.Recall that
prefix(value:)
is essentiallya concat b
. Sinceb
would not start beforea
completes, theevents -> reducer
part is not started untila
has finished sending out the prefix value.When
prefix(value:)
sends out the initial value,on(value:)
sends the initial value to the state signal, which in turn updates all feedback signals. But since the events-reducer producer hasn’t started yet (which is theflatMap(.concat)
semantic), all events generated by feedbacks could potentially be delivered to the void, when the queue instantiating the system runs behind the queue serialising all feedback events.How to fix it?
Having said that,
prefix(value:)
is guaranteed to have started the prefixed producer as part of the synchronous producer starting process. So we can address the issue by applyingon(started:)
afterprefix(value:)
to ignite the system, while havingon(value:)
applied instead beforeprefix(value:)
to still keep the reducer-to-feedbacks path open.