Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x Behavioral changes of asynchronous subscriber when adding a trivial map operator #6859

Closed
qwwdfsad opened this issue Jan 22, 2020 · 4 comments

Comments

@qwwdfsad
Copy link

qwwdfsad commented Jan 22, 2020

I am not sure whether it is a bug, intended behaviour or (leaking?) implementation detail, so posting it in the form of code snippet first, sorry for the inconvenience.

Consider the following subscriber:

private class TrivialSubscriber : Subscriber<Int> {
    private val executor = Executors.newSingleThreadExecutor()
    private var subscription: Subscription? = null

    override fun onSubscribe(s: Subscription) {
        subscription = s
        subscription!!.request(1) // Initial request
    }

    override fun onNext(integer: Int) {
        // "async" consumption
        executor.submit {
            println("Received $integer")
            subscription!!.request(1) // Consumed -> request one more
        }
    }

    override fun onError(t: Throwable) {}
    override fun onComplete() {
        executor.shutdown()
    }
}

and the following usage:

Flowable.create<Int>({ emitter ->
        repeat(10) {
            emitter.onNext(it)
            println("Emitted: $it")
            Thread.sleep(2000)
        }
        emitter.onComplete()
    }, BackpressureStrategy.BUFFER)
    .map { value ->
        println("Mapped $value")
        value
    }
    .subscribeOn(Schedulers.computation())
    .blockingSubscribe(TrivialSubscriber())

With map operator, the whole emitter output is firstly buffered, and only then mapped and collected. Thus the output looks like this (0 output is omitted):

Emitted: 1
Emitted: 2
...
Emitted: 9
Mapped 1
Received 1
...
Mapped 9
Received 9

If one removes map (which seems to be a trivial nop-op), the output changes drastically:

Emitted: 1
Received 1
...
Emitted: 9
Received 9

changing both the order of events and memory usage pattern. Is it an intended behaviour? Could you please elaborate on why it is happening or point out the relevant part of the documentation?

Please note that this is a watered-down reproducer of Kotlin/kotlinx.coroutines#1766

@akarnokd
Copy link
Member

Could you please elaborate on why it is happening or point out the relevant part of the documentation?

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#subscribeOn-io.reactivex.Scheduler-

"If there is a create(FlowableOnSubscribe, BackpressureStrategy) type source up in the chain, it is recommended to use subscribeOn(scheduler, false) instead to avoid same-pool deadlock because requests may pile up behind an eager/blocking emitter."

If one removes map (which seems to be a trivial nop-op), the output changes drastically

We detect FlowableCreate directly upstream and use subscribeOn(scheduler, false) to preemptively workaround certain use cases.

    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return subscribeOn(scheduler, !(this instanceof FlowableCreate));
    }

Original issue: #4735

@qwwdfsad
Copy link
Author

Thanks, makes sense.
Just curious, could you please clarify, why you prefered this instanceof FlowableCreate check instead of traversing HasUpstreamPublisher and checking it with instanceof?

@akarnokd
Copy link
Member

The chain could be arbitrary long, may not even implement the interface or mess up an intermediate operator's thread confinement.

@qwwdfsad
Copy link
Author

Thanks for the help. Feel free to close the issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants