Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency issues #71

Open
stugol opened this issue Jul 15, 2015 · 12 comments
Open

Concurrency issues #71

stugol opened this issue Jul 15, 2015 · 12 comments

Comments

@stugol
Copy link

stugol commented Jul 15, 2015

I'm trying to use Rx in my Ruby project, but I'm having difficulty with concurrency. I have two threads, each pushing to its own observable. I am piping these two observables into a single Subject, and subscribing to that. However, the subscription is now being called in parallel, rather than having the messages queue up. I tried using a Scheduler, but they all appear to be broken:

   /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/core/scheduled_observer.rb:50:in `ensure_active': undefined method `schedule_recursive_with_state' for RX::CurrentThreadScheduler:Class (NoMethodError)

Incidentally, the concurrency issue doesn't seem to occur if I call Merge instead of piping them into a Subject.

How can I get help for this issue, please?

Cheers,
Stuart.

@ghost
Copy link

ghost commented Dec 28, 2015

@stugol Can you present tests or snippets which reproduce the problem so that we could take a look?

@stugol
Copy link
Author

stugol commented Dec 28, 2015

After six months? Are you serious?

Maybe if you had responded within a sensible period of time, I would still have the code. As it stands however, I probably gave up on ever getting it resolved, and changed the code to work a different way.

I guess I could try to reproduce the problem from scratch. If I do this, are you going to respond quickly, or will it be another colossal waste of time?

@ghost
Copy link

ghost commented Dec 28, 2015

Well, I was one of the watchers at that time. No active communications were made for a long time in the project. So I have decided to make it progress.

If you rewrite the code, I would appreciate it! If you don't, it's ok.

@mattpodwysocki
Copy link
Contributor

@stugol that's my fault, not @minamimonji so please don't take it out on him. I am the maintainer of the project and I was lax in my work here as I was focused on RxJS and other libraries as my primary libraries and was hoping to get help to make this one a success.

@stugol
Copy link
Author

stugol commented Dec 30, 2015

Sure, no problem. I'll see what I can whip up.

@stugol
Copy link
Author

stugol commented Jan 22, 2016

Well, I've not had much time recently to attempt this, but I had a go this evening. I've not (yet) managed to reproduce the bug; primarily because all the other bugs keep getting in the way...

/var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/subjects/subject.rb:52:in `block in on_error': private method `check_disposed' called for #<RX::Subject:0x00000001b9e190> (NoMethodError)
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/subjects/subject.rb:51:in `synchronize'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/subjects/subject.rb:51:in `on_error'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/core/auto_detach_observer.rb:22:in `on_error_core'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/core/observer.rb:106:in `call'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/core/observer.rb:106:in `fail'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/core/observable.rb:76:in `rescue in schedule_subscribe'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/core/observable.rb:73:in `schedule_subscribe'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/concurrency/scheduled_item.rb:29:in `call'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/concurrency/scheduled_item.rb:29:in `invoke'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/concurrency/current_thread_scheduler.rb:67:in `run_trampoline'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/concurrency/current_thread_scheduler.rb:40:in `schedule_relative_with_state'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/concurrency/local_scheduler.rb:20:in `schedule_with_state'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/core/observable.rb:38:in `_subscribe'
        from /var/lib/gems/2.1.0/gems/Rx.rb-0.0.1/lib/rx/core/observable.rb:18:in `subscribe'

It would also help if [a] there was any documentation; and [b] the examples actually worked.

@stugol
Copy link
Author

stugol commented Jan 22, 2016

Alright, it would seem I was getting conflicts with previous versions of the library. It runs now. Although not correctly.

require "rx_ruby"
def produce_async_values
    RxRuby::Observable.create { |observer|
        Thread.new { loop { observer.on_next 1 } }
    }
end
subject = RxRuby::Subject.new
produce_async_values.subscribe subject
produce_async_values.subscribe subject
subject.subscribe_on_next { |n| puts n; sleep 0.2 }
sleep 2

This code should produce a stream of 1s, each on their own line. However, we seem to be getting race conditions, which shouldn't happen!

Actual output:

1
1
11

1
1
1
1
1
1
11

1
1
1
1
1
1
1
1

@stugol
Copy link
Author

stugol commented Jan 22, 2016

If I replace the two subscribe calls with produce_async_values.merge(produce_async_values).subscribe subject, the code works correctly.

However, these two techniques should produce identical output. If I pipe multiple streams into a subject, it should yield their values one at a time.

@ghost
Copy link

ghost commented Jan 22, 2016

@stugol thank you!
We'll see what is happening on the code.

@ghost
Copy link

ghost commented Jan 27, 2016

The on_next in the merge in surrounded by synchronize so that it prevents race conditions. However, the normal execution of the on_next in a subscription is not surrounded by synchronize. This causes race conditions.

@mattpodwysocki From the viewpoint of the Reactive Extensions, is this race condition expected? Or should we fix it?

@mattpodwysocki
Copy link
Contributor

@minamimonji race conditions should not be expected at all so we're probably missing some mutex blocks

@ghost
Copy link

ghost commented Mar 9, 2016

@stugol sorry for taking time to response. I made some changes to Subject to synchronize on_next, on_error, and on_complete. Can you give it a try? Here is the branch.
https://github.com/minamimonji/RxRuby/tree/fix_conccurency_issue

@mattpodwysocki Can you review it and make some comments?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants