Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxRuby and concurrency #82

Closed
gizmomogwai opened this issue Feb 25, 2016 · 11 comments
Closed

RxRuby and concurrency #82

gizmomogwai opened this issue Feb 25, 2016 · 11 comments

Comments

@gizmomogwai
Copy link

There is already ticket #71, but I am stumbling over something else.

  1. When I create an Observable with e.g. RxRuby::Observable.timer(0, 1) the events of the stream are each in an own thread.
  2. From the documentation and sourcecode I could not find out what I have to do to create an own scheduler that is usable with observe_on. I tried to spy into it by providing an object with a method_missing implementation, but when I implemented schedule_recursive_with_state in my scheduler it was not called.
  3. Furthermore I could not find out how to transport all of the events created by the timer to one defined thread.
@ghost
Copy link

ghost commented Feb 25, 2016

Observable.timer() uses DefaultScheduler by default which creates new thread for each event. If you want timer events to be in the same thread, you might want to pass the CurrentThreadScheduler to the method like this; Observable.timer(0, 1, RxRuby::CurrentTheadScheduler.instance).
Does this solve some of your problems?

@gizmomogwai
Copy link
Author

Is there a scheduler that transports the events to a thread != the main thread?
e.g. (taken from the java api):

Observable.just("one", "two", "three", "four", "five")
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(/* an Observer */);

or https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/schedulers/ExecutorScheduler.java

I tried to implement an own Scheduler but failed. The code is in the lines of this:

class MyScheduler
  def initialize(looper)
    @looper = looper
  end

  def schedule_recursive_with_state(state, due_time, action)
    return @looper.enqueue(action)
  end
end

Looper is a tread that works on a Queue, enqueue would put the action to this queue (ignoring the due_time and state for now) and then action should be called from the Looper-Thread.

Could you give me some pointers on what i miss? I set up the whole chain like this:

s = MyScheduler.new(looper)
RxRuby::Observable.timer(0, 1).observe_on(s).subscribe(->(x){looper.enqueue(x)}, ->(e){puts "error #{e}"}, ->(){puts "finished"})
sleep(10)

@ghost
Copy link

ghost commented Feb 25, 2016

I see what you mean 😄

s = MyScheduler.new(looper)
RxRuby::Observable.timer(0, 1).observe_on(s).subscribe(->(x){looper.enqueue(x)}, ->(e){puts "error #{e}"}, ->(){puts "finished"})
sleep(10)

Could you give me what result(output) or error you got from the code?

@gizmomogwai
Copy link
Author

Hi @minamimonji, thanks for following up. I prepared this stripped down version, perhaps you could have a look. It tries to mimic java Executors.

require 'rx_ruby'

# simple thing that takes stuff from a queue and runs it
class Executor
  def initialize
    @queue = Queue.new
    @t = Thread.new do
      while true
        begin
          next_runnable = @queue.pop
          puts "calling invoke on #{next_runnable}"
          next_runnable.invoke
        rescue Exception => e
          puts e.backtrace
          puts e
        end
      end
    end
  end
  def enqueue(runnable)
    @queue.push(runnable)
  end
end

class T
  def method_missing(name, *args, &block)
    puts "called #{name} with #{args}"
  end
end

# that is my scheduler that is connected to an executor
class MyScheduler
  def initialize(executor)
    @executor = executor
  end

  def schedule_recursive_with_state(state, action)
    dt = Time.now.to_i
#############################
# usually i would expect to pass in state here as second parameter!
    res = RxRuby::ScheduledItem.new(self, T.new, dt, &action)
    @executor.enqueue(res)
  end
end
executor = Executor.new
scheduler = MyScheduler.new(executor)

#event every second
source = RxRuby::Observable.timer(0, 1)
#the events should be handled in my executor -> observe_on with a std. subscriber at the end
source.observe_on(scheduler).subscribe(->(x){puts "onEvent #{x}"}, ->(e){puts "error #{e}"}, ->(){puts "finished"})

sleep(10)

Here I have 2 problems:

  1. I do not what to do with the call MyScheduler.action that is intercepted from method_missing.
  2. I cannot find out how to call the stuff, that is passed into the scheduler.

@gizmomogwai
Copy link
Author

I tried more

require 'rx_ruby'

# simple thing that takes stuff from a queue and runs it
class Executor
  def initialize
    @queue = Queue.new
    @t = Thread.new do
      puts "ttt"
      puts "ExecutorThread #{Thread.current}"
      while true
        begin
          next_runnable = @queue.pop
          puts "calling invoke on #{next_runnable}"
          next_runnable.invoke
        rescue Exception => e
          puts e.backtrace
          puts e
        end
      end
    end
  end
  def enqueue(runnable)
    @queue.push(runnable)
  end
end


# that is my scheduler that is connected to an executor
class MyScheduler
  def initialize(executor)
    @executor = executor
  end

  def schedule_recursive_with_state(state, action)
    dt = Time.now.to_i
    res = RxRuby::ScheduledItem.new(self, T.new, dt, &action)
    @executor.enqueue(res)
  end
end
executor = Executor.new
scheduler = MyScheduler.new(executor)

#event every second
source = RxRuby::Observable.timer(0, 1)
#the events should be handled in my executor -> observe_on with a std. subscriber at the end
source.map do |x|
  puts "in map #{Thread.current}"
  x
end.observe_on(scheduler).subscribe(->(x){puts "onEvent #{x} in thread #{Thread.current}"}, ->(e){puts "error #{e}"}, ->(){puts "finished"})

sleep(10)

the debug output shows, that there are more events produced, the first event already arrived in the correct thread in the subscriber, but after this, something goes wrong.

could you please help me out here?

@ghost
Copy link

ghost commented Feb 29, 2016

@gizmomogwai Thanks for more info.
I haven't had time to look into it this weekend. I would take a look at it in a couple of days.

@gizmomogwai
Copy link
Author

@minamimonji thank ... i appreciate your efforts (a lot!!!)

@ghost
Copy link

ghost commented Mar 4, 2016

@gizmomogwai Could you run your code above with MyScheduler below?

class MyScheduler
  include RxRuby::Scheduler
  def initialize(executor)
    @executor = executor
  end

  def schedule_with_state(state, action)
    dt = Time.now.to_i
    res = RxRuby::ScheduledItem.new(self, state, dt, &action)
    @executor.enqueue(res)
    RxRuby::Subscription.empty
  end
end

The main differences are including the existing RuRuby::Scheduler to let it handle recursion, and implementing schedule_with_state instead of schedule_recursive_with_state.

@gizmomogwai
Copy link
Author

@minamimonji thanks a lot ... your suggestion solves my problem!
One last question: In this example one would have to return a real subscription to support unsubscribe.
What should be done in this case to be compliant with rx_ruby's philosophy?

@ghost
Copy link

ghost commented Mar 6, 2016

@gizmomogwai You might want to clean up objects, or threads when unsubscribe is called. In this case the thread in the Executor should be killed. You can write a subscription in MyScheduler like this,

def schedule_with_state(state, action)
  #...
  @executor.enqueue(res)
  RxRuby::Subscription.create{ @executor.terminate } # code in the block called when unsubscribe is called.
  # Note that you have to define Executor#terminate method to kill the thread `@t` in the `Executor`.
end

@gizmomogwai
Copy link
Author

Wow ... thanks a lot ... that really helps!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant