Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Surprising scheduler behaviour #6696

Closed
dariuszseweryn opened this issue Oct 31, 2019 · 12 comments
Closed

2.x: Surprising scheduler behaviour #6696

dariuszseweryn opened this issue Oct 31, 2019 · 12 comments

Comments

@dariuszseweryn
Copy link

Brief description

Given a shared upstream and two flows using .observeOn() before .filter() may change the original emission order.

RxJava Version

2.2.13

Code sample

private val subject = PublishSubject.create<Int>().toSerialized()
private val singleScheduler = Schedulers.single()

fun filterSubjectForIdOnScheduler(id: Int): Observable<Int> {
    return subject
        .observeOn(singleScheduler)
        .filter {
            Log.e("filter", "id = $id value = $it")
            it == id
        }
}

fun main() {
    Observable.merge(
        filterSubjectForIdOnScheduler(1),
        filterSubjectForIdOnScheduler(2)
    )
        .subscribe { Log.e("subscribe", "value = $it") }

    subject.onNext(2)
    subject.onNext(1)
}

Actual (surprising) result

14491-14855 E/filter: id = 1 value = 2
14491-14855 E/filter: id = 1 value = 1
14491-14855 E/subscribe: value = 1
14491-14855 E/filter: id = 2 value = 2
14491-14855 E/subscribe: value = 2
14491-14855 E/filter: id = 2 value = 1

Expected result

14491-14855 E/filter: id = 1 value = 2
14491-14855 E/filter: id = 2 value = 2
14491-14855 E/subscribe: value = 2
14491-14855 E/filter: id = 1 value = 1
14491-14855 E/subscribe: value = 1
14491-14855 E/filter: id = 2 value = 1

Notes

  1. The code originally used a Schedulers.from(Executors.newSingleThreadExecutor()) — I thought that somehow the executors may be optimised in a way that batches individual runnables and cycles them before cycling the queue of observers. I found Schedulers.single() which states (emphasis mine):
 * Returns a default, shared, single-thread-backed {@link Scheduler} instance for work
 * requiring >>>strongly-sequential<<< execution on the same background thread.
  1. Instead of using .observeOn() calling via scheduleDirect yields expected results:
singleScheduler.scheduleDirect { subject.onNext(2) }
singleScheduler.scheduleDirect { subject.onNext(1) }
// or
singleScheduler.scheduleDirect {
    subject.onNext(2)
    subject.onNext(1)
}
  1. It seems that the scheduler has two queues. One for emissions and one for observers. The emissions queue seems to be cycled before the queue of observers where sequential processing seemingly would need different priority of cycling (notify all observers about the first emission before proceeding to a next one).

Question

Is the actual result an expected one?
If so — could you explain why? Is it possible to alter the flow without reordering operators and achieve expected results?

@akarnokd
Copy link
Member

observeOn keeps the single thread occupied as long as it sees work, in this case, the values 2 and 1 in quick succession. Try delay(0) which schedules events individually.

@dariuszseweryn
Copy link
Author

Try delay(0) which schedules events individually.

Do you mean .delay(0, TimeUnit.SECONDS)? I have tried adding it before and after .observeOn() but it seems to work the same.

@akarnokd
Copy link
Member

Replace observeOn with delay(0, TimeUnit.SECONDS).

@dariuszseweryn
Copy link
Author

Using .delay(0, TimeUnit.SECONDS, singleScheduler) instead of .observeOn(singleScheduler) changed the behaviour to one that matches the expected result.

For me this looks awkward — as a user I would like to observe on a specified scheduler in sequential order. Currently it seems to violate Law of least surprise. How do you think? Or maybe current implementation has some obvious performance advantage that I am unaware of?

@akarnokd
Copy link
Member

observeOn is designed to do as much work in a drain run as possible.

Reactive concurrency is complicated and scheduling is an orthogonal concept (when vs. where), hence a perfectly sequential scheduler used with an operator can result in non-intuitive event signaling pattern downstream. Also your type of lockstepping and coordination is rare.

@dariuszseweryn
Copy link
Author

Is this well described somewhere so I could study the topic a bit more?

Reactive concurrency is complicated and scheduling is an orthogonal concept (when vs. where), hence a perfectly sequential scheduler used with an operator can result in non-intuitive event signaling pattern downstream.

Should this be mentioned somewhere? It looks like an important gotcha.

Also your type of lockstepping and coordination is rare.

I use this approach to relief the calling thread as quickly as possible since it is bound to native code and prone to break the underlying system that I do not control (here modelled as the subject).

@akarnokd
Copy link
Member

See my blog and the operator writing guide.

Should this be mentioned somewhere?

The observeOn Javadoc can be updated to mention its eagerness.

dariuszseweryn added a commit to dariuszseweryn/RxAndroidBle that referenced this issue Oct 31, 2019
Having several observers on a specific, single threaded scheduler and several subsequent emissions from another thread the scheduler distributes events sequentially between observers. The non-intuitive part is that it first distributes all events to the first observer, then all events to the second and so on… This may end up with observers being notified about events in a wrong order. See ReactiveX/RxJava#6696
@dariuszseweryn
Copy link
Author

dariuszseweryn commented Nov 4, 2019

A helper question — do you know why schedulers are designed this way by default (instead of being basically equivalent to .delay(0, timeUnit, scheduler)?

@akarnokd
Copy link
Member

akarnokd commented Nov 4, 2019

Scheduler is an abstraction over crossing an asynchronous boundary by providing methods to run work somewhere. It doesn't know what or how much work it means. observeOn is designed to be low overhead (both time and allocation) when moving data between threads. delay has to schedule each item individually which adds a lot of allocation.

@dariuszseweryn
Copy link
Author

I think this topic is cleared and appropriate Javadoc change has landed so I am closing. Thank you very much for the help.

I do have some other questions about surface'ing reactive and non-reactive world.

  1. Difference in how Observables are subscribed and disposed when using .subscribeOn() and without it — in the first situation the Observable may or may not be subscribed depending on a race condition and in the second it will always be subscribed. Funny thing is that TestScheduler seems to work as no .subscribeOn() is present rather than other schedulers.
  2. UndeliverableException — this is something that is seemingly impossible to mitigate in a multi-threaded environment as there always may be a race condition under which the Observer will already be unsubscribed.

I would like to discuss those matters or find a way to learn on how to deal with them. Should I create separate issues or...?

@akarnokd
Copy link
Member

akarnokd commented Nov 19, 2019

  1. subscribeOn always signals onSubscribe but the subscription towards the upstream should be properly depend on when TestScheduler::triggerActions() is invoked.
  2. I don't think there is anything to discuss about this. Errors must not get lost upon disposing and Rx can't decide which to ignore. https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling

@dariuszseweryn
Copy link
Author

  1. Apparently there are some optimisations which make observable.subscribeOn().subscribe().dispose() subscribing/not subscribing to the observable depending on a race condition. I see some weird behaviour in my tests but I assume I got something wrong. Will dig deeper and keep you posted if I find something funny.
  2. I know this, I was hoping that it may be somehow handled by the code that uses RxJava 2. Current design does not allow for using RxJava 2 as an implementation detail. It cannot really be encapsulated due to static handlers.

dariuszseweryn added a commit to dariuszseweryn/RxAndroidBle that referenced this issue Nov 22, 2019
)

Having several observers on a specific, single threaded scheduler and several subsequent emissions from another thread the scheduler distributes events sequentially between observers. The non-intuitive part is that it first distributes all events to the first observer, then all events to the second and so on… This may end up with observers being notified about events in a wrong order. See ReactiveX/RxJava#6696
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants