Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observeOn doesn't behave as expected (Clojure) #625

Closed
ghost opened this issue Dec 15, 2013 · 9 comments
Closed

observeOn doesn't behave as expected (Clojure) #625

ghost opened this issue Dec 15, 2013 · 9 comments

Comments

@ghost
Copy link

ghost commented Dec 15, 2013

(ns rx.lang.clojure.examples.rx-examples
  (:require [rx.lang.clojure.interop :as rx])
  (:import rx.Observable
           rx.subscriptions.Subscriptions
           rx.schedulers.Schedulers
           java.util.concurrent.TimeUnit
))

(println (Thread/currentThread)) ; (*)
(-> (Observable/interval 100 TimeUnit/MILLISECONDS #_(Schedulers/currentThread)) 
    (.take 5)
    (.observeOn (Schedulers/currentThread))
    (.subscribe (rx/action [v]
                             ; this file is created, but the content do not match (*)
                            (spit "/tmp/tid.txt" (Thread/currentThread)) 
                           (println v))) ; this output never reaches the repl
    )

So, Observable/interval spawns a new thread and the subscribed callback is invoked in that thread by default. I need to use observeOn to ensure the cb
is invoked in the current thread so that the println output makes it to the repl.

/tmp/tid.txt does get created, so the cb is called but the println output
never makes it to the repl.
If i schedule Observable/interval in the current thread (commented out) the
output is visible (though the repl hangs afterwards for some reason), but
the observeOn itself doesn't seem to do the job it's meant to.
The contents of /tmp/tid/txt don't match the current thread reported
by the println either.

Is this a bug or am I doing something wrong?

@benjchristensen
Copy link
Member

(though the repl hangs afterwards for some reason)

I believe this is related to a bug where CurrentThreadScheduler can not be unsubscribed from since it is all single-threaded and thus not async and never returns the Subscription so take(5) is meaningless to it.

We should be able to unsubscribe from a CurrentThread scheduled Observable though and @headinthebox and I were looking at this last week to determine how to do so. We looked at the Rx.Net implementation and it's very non-elegant to make it work (thread-locals and other such fun). It's on my plate to tackle. In the meantime for anything that is infinite such as interval you'll want to use something with real concurrency such as NewThread or Schedulers.threadPoolForComputation().

The contents of /tmp/tid/txt don't match the current thread reported

The file contains this: Thread[RxComputationThreadPool-6,5,main] and that is correct since interval uses the thread-pool by default.

The (Schedulers/currentThread) Scheduler tells it to run on the "current" thread, whatever it is using a trampoline. It is the same as using Immediate which uses the "current" thread, except it does trampolining.

If you change to use (Schedulers/immediate) for the interval you'll get what you're expecting (and it will block on the current thread, sleeping for each interval since there is no concurrency):

(-> (Observable/interval 100 TimeUnit/MILLISECONDS (Schedulers/immediate)) 
    (.take 5)
    (.observeOn (Schedulers/currentThread))
    (.subscribe (rx/action [v]
                             ; this file is created, but the content do not match (*)
                            (spit "/tmp/tid.txt" (Thread/currentThread)) 
                           (println v))) ; this output never reaches the repl
    )

; outputs to file: Thread[nREPL-worker-2,5,main]

It now stays on the main thread and observeOn in effect does nothing since it is told to use Schedulers/currentThread so it stays on the main thread.

Generally observeOn is meant for moving the work from one thread to another, such as from a background worker thread to a UI thread for rendering. Using observeOn with ImmediateScheduler or CurrentThreadScheduler doesn't really make sense as it ends up being a pass-thru.

@ghost
Copy link
Author

ghost commented Jan 2, 2014

Thanks for the detailed answer.

The fact that Interval runs in the current thread by default is a little surprising, but I can live
with that (and I'm guessing it's dictated by compat with .Net if nothing else).

The example you provided does get the output to the repl (it hangs, but that's expected now)
But I don't need the interval to run in the current thread, just the callback and I still haven't
found the secret handshake for that.

As I understand it observeOn determines where the cb is run. Since I wasn't getting
any output, I guessed that the cb ended up running on the wrong thread and made it explicit
where it should run. IIRC the default is that cb's are run in the same thread as the generated event?

Given your feedback, I expected this to work:

(-> (Observable/interval 100 TimeUnit/MILLISECONDS (Schedulers/newThread)
    (.take 5)
    (.observeOn (Schedulers/immediate)) ; at least one of these should work, depending on 
    #_(.observeOn (Schedulers/currentThread)) ; the specific nature of
    #_(no-op) ; my misunderstanding
    (.subscribe (rx/action [v]
                             ; this file is created, but the content do not match (*)
                            (spit "/tmp/tid.txt" (Thread/currentThread)) 
                           (println v))) ; this output never reaches the repl
    )

Which I read as: the interval runs in a new thread (Forever, for now), while the observeOn call
ensures the cb is invoked in the current thread. That should reward me with output to the repl.

...It doesn't block, but no output is generated.

@benjchristensen
Copy link
Member

The fact that Interval runs in the current thread by default is a little surprising

The interval operator by default runs on a separate thread. You would you have to pass Schedulers/immediate or Schedulers/currentThread to make it run on the current thread.

But I don't need the interval to run in the current thread, just the callback and I still haven't
found the secret handshake for that.

ObserveOn has no way to automatically make something jump back to the "main" thread the REPL is on. That would require the main thread offering an event-loop or something to hook into and a Scheduler implementation that knows how to schedule work onto it similar to the Android and Swing modules which know how to schedule work into their UI event-loops.

while the observeOn call ensures the cb is invoked in the current thread.

The observeOn operator schedules work on whatever Scheduler it is given. Think of it as moving the work from one thread to another (or one Scheduler to another).

Thus, it doesn't determine anything, it just puts the work where you tell it.

Thus, in your code, Observable.interval and take are running on the new thread you tell it to use: (Schedulers/newThread) then you tell it to observeOn the immediate Scheduler which is a no-op and it will continue using the (Schedulers/newThread) you provided at the beginning so subscribe will still run on the new thread.


Here are two examples, one blocking another non-blocking:

(-> (Observable/interval 100 TimeUnit/MILLISECONDS)
    (.take 5)
    (.subscribe (rx/action [v]
               (println "non-blocking" v (java.lang.Thread/currentThread)))))


(-> (Observable/interval 100 TimeUnit/MILLISECONDS)
  (.take 5)
  (.toBlockingObservable)
  (.forEach (rx/action [v] (println "blocking" v (java.lang.Thread/currentThread)))))

The REPL shows this:

(-> (Observable/interval 100 TimeUnit/MILLISECONDS)
  (.take 5)
  (.toBlockingObservable)
  (.forEach (rx/action [v] (println "blocking" v (java.lang.Thread/currentThread)))))
nil
(-> (Observable/interval 100 TimeUnit/MILLISECONDS)
    (.take 5)
    (.subscribe (rx/action [v]
               (println "non-blocking" v (java.lang.Thread/currentThread)))))
#<SafeObservableSubscription rx.operators.SafeObservableSubscription@5034037e>

The console shows this:

blocking 0 #<Thread Thread[RxComputationThreadPool-4,5,main]>
blocking 1 #<Thread Thread[RxComputationThreadPool-4,5,main]>
blocking 2 #<Thread Thread[RxComputationThreadPool-4,5,main]>
blocking 3 #<Thread Thread[RxComputationThreadPool-4,5,main]>
blocking 4 #<Thread Thread[RxComputationThreadPool-4,5,main]>
non-blocking 0 #<Thread Thread[RxComputationThreadPool-2,5,main]>
non-blocking 1 #<Thread Thread[RxComputationThreadPool-2,5,main]>
non-blocking 2 #<Thread Thread[RxComputationThreadPool-2,5,main]>
non-blocking 3 #<Thread Thread[RxComputationThreadPool-2,5,main]>
non-blocking 4 #<Thread Thread[RxComputationThreadPool-2,5,main]>

If I want it to emit to the REPL I would need to have it all run on the REPL thread, then I get this:

(-> (Observable/interval 100 TimeUnit/MILLISECONDS (Schedulers/currentThread))
    (.take 5)
    (.subscribe (rx/action [v]
               (println "non-blocking" v (java.lang.Thread/currentThread)))))
non-blocking 0 #<Thread Thread[nREPL-worker-2,5,main]>
non-blocking 1 #<Thread Thread[nREPL-worker-2,5,main]>
non-blocking 2 #<Thread Thread[nREPL-worker-2,5,main]>
non-blocking 3 #<Thread Thread[nREPL-worker-2,5,main]>
non-blocking 4 #<Thread Thread[nREPL-worker-2,5,main]>

This then causes the bug with CurrentThreadScheduler that it never unsubscribes and hangs the REPL.

I don't know enough about how the Clojure REPL works (I'm a beginner with Clojure) to know if there is a way to hook a Scheduler into it. If so that would be a great contribution to the rxjava-clojure module.

Now to see how observeOn behaves ...

(-> (Observable/interval 100 TimeUnit/MILLISECONDS) ; run on default which is Schedulers.threadPoolForComputation()
    (.take 5)
    ; print out the value and what thread it is on before doing `observeOn`
    (.doOnNext (rx/action [v] (println "interval emitted" v "on thread" (java.lang.Thread/currentThread))))
    (.observeOn (Schedulers/newThread)) ; move it to a new thread from here onwords
    (.subscribe (rx/action [v]
               ; receive the output and show what thread it is one
               (println "  -> output" v "on thread" (java.lang.Thread/currentThread)))))

The REPL shows:

(-> (Observable/interval 100 TimeUnit/MILLISECONDS) ; run on default which is Schedulers.threadPoolForComputation()
    (.take 5)
    ; print out the value and what thread it is on before doing `observeOn`
    (.doOnNext (rx/action [v] (println "interval emitted" v "on thread" (java.lang.Thread/currentThread))))
    (.observeOn (Schedulers/newThread)) ; move it to a new thread from here onwords
    (.subscribe (rx/action [v]
               ; receive the output and show what thread it is one
               (println "  -> output" v "on thread" (java.lang.Thread/currentThread)))))
#<SafeObservableSubscription rx.operators.SafeObservableSubscription@6df85c2c>

The console shows:

interval emitted 0 on thread #<Thread Thread[RxComputationThreadPool-6,5,main]>
  -> output 0 on thread #<Thread Thread[RxNewThreadScheduler-4,5,main]>
interval emitted 1 on thread #<Thread Thread[RxComputationThreadPool-6,5,main]>
  -> output 1 on thread #<Thread Thread[RxNewThreadScheduler-4,5,main]>
interval emitted 2 on thread #<Thread Thread[RxComputationThreadPool-6,5,main]>
  -> output 2 on thread #<Thread Thread[RxNewThreadScheduler-4,5,main]>
interval emitted 3 on thread #<Thread Thread[RxComputationThreadPool-6,5,main]>
  -> output 3 on thread #<Thread Thread[RxNewThreadScheduler-4,5,main]>
interval emitted 4 on thread #<Thread Thread[RxComputationThreadPool-6,5,main]>
  -> output 4 on thread #<Thread Thread[RxNewThreadScheduler-4,5,main]>

If I want to emit something to the REPL without using println I can return a List containing the output like this:

(-> (Observable/interval 100 TimeUnit/MILLISECONDS) ; run on default which is Schedulers.threadPoolForComputation()
    (.take 5)
    (.observeOn (Schedulers/newThread)) ; move it to a new thread from here onwords
    (.map (rx/fn [v]
               ; receive the output and show what thread it is one
               (str "  -> output " v " on thread " (java.lang.Thread/currentThread))))
    (.toList)
    (.toBlockingObservable)
    (.single))

This now returns the ArrayList in the REPL:

(-> (Observable/interval 100 TimeUnit/MILLISECONDS) ; run on default which is Schedulers.threadPoolForComputation()
    (.take 5)
    (.observeOn (Schedulers/newThread)) ; move it to a new thread from here onwords
    (.map (rx/fn [v]
               ; receive the output and show what thread it is one
               (str "  -> output " v " on thread " (java.lang.Thread/currentThread))))
    (.toList)
    (.toBlockingObservable)
    (.single))
#<ArrayList [  -> output 0 on thread Thread[RxNewThreadScheduler-8,5,main],   -> output 1 on thread Thread[RxNewThreadScheduler-8,5,main],   -> output 2 on thread Thread[RxNewThreadScheduler-8,5,main],   -> output 3 on thread Thread[RxNewThreadScheduler-8,5,main],   -> output 4 on thread Thread[RxNewThreadScheduler-8,5,main]]>

Hope this is helpful.

@benjchristensen
Copy link
Member

/cc @daveray in case you can provide better insight (particularly regarding REPL/console output)

@ghost
Copy link
Author

ghost commented Jan 3, 2014

That's a fantastic answer.

Noting that doOnNext is a recent addition, I needed to pull and recompile.

I realize now part of the issue is that I use emacs/cider so println output in another thread ends
up in the nrepl buffer rather then the repl buffer where I expected it.

Yes, it doesn't make sense to ask for work to be schedueled in the currentThread without something
there to process schedueled work, my bad.

Still puzzled by how in your last example the result gets returned to the current thread.
does each call after observeOn block the current thread in turn? is .single magical?

In any case that was very helpful and realizing the behavior I described is not a bug,
I'll close this now.

Thanks again.

@ghost ghost closed this as completed Jan 3, 2014
@benjchristensen
Copy link
Member

Still puzzled by how in your last example the result gets returned to the current thread.
does each call after observeOn block the current thread in turn? is .single magical?

It's the call to (.toBlockingObservable) that makes it block. Anytime you want to go from non-blocking to blocking you can use toBlockingObservable which then exposes a variety of blocking operators. They are useful when doing unit tests, playing around, REPLs etc or when bridging from async to sync in a mixed codebase.

@benjchristensen
Copy link
Member

See here for more info about BlockingObservable: https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators

@ghost
Copy link
Author

ghost commented Jan 3, 2014

Right, but everything you've shown me so far suggests that the blocking observable will still
live in another thread. I can only hope that I'm not the only person who finds all this quite subtle,
I'll read up on it rather then taking up more of your time. truly admire your dedication.

Happy new year.

@benjchristensen
Copy link
Member

Yes, it happens on the background thread where it is scheduled. The toBlockingObservable doesn't change that. It just blocks and returns type T instead of Observable<T> or Subscription.

Outside of a REPL it might make more sense where the assignment is explicit.

// this returns a non-blocking Observable and is lazy, so not running yet
Observable<Long> i = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);

// this executes the above and returns a Subscription and does so asynchronously (non-blocking)
Subscription s = i.subscribe({ v -> println(v)});

// or I can block and fetch the last value
long lastValue = i.toBlockingObservable().last();

The toBlockingObservable does not change where the work is done, but it blocks the current thread to wait for the value which then is assigned and unblocks the current thread which then has it.

This is what the REPL is doing in my example where it gets the list of values returned to the REPL thread itself and it blocks the REPL thread until the value is returned.

The return type explains what is going on.

jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
* Make this config explicitly immutable

* Explicit doc for config field

* Temporary change, to trigger warning from sonar.

* Make this config explicitly immutable

* Fix few other sonar complaints

* Disable non-relevant check in sonar

* Remove unused import

* Propagate right info about thread interruption to callers via exceptions and Thread.interrupted flag

* Fixes after merge with master

* Additional tests and code cleanup

* Tests fix

* Update documentation about thread interruption contract.
Update release notes.

* Make common exception type for RateLimiter and Bulkhead

* Code review comments fixed
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant