Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible race condition in Subscription.scala #554

Closed
MickDavies opened this issue Dec 3, 2013 · 19 comments
Closed

Possible race condition in Subscription.scala #554

MickDavies opened this issue Dec 3, 2013 · 19 comments

Comments

@MickDavies
Copy link

Hi,

I'm looking at Subscription.scala because I'm doing the Coursera Reactive course.

The code below from Subscription.scala looks a bit suspect. I think there's a race condition that will happen especially if u takes a long time that could lead to multiple calls to u. I'm not sure if you consider this an issue as I am not sure if Subscriptions are shared across threads, but the use of the AtomicBoolean seems to imply this.

Would something like:

def unsubscribe() { if(unsubscribed.compareAndSet(false, true)) { u }}

be better - also looks like subclasses eg. BooleanSubscription may have this problem.

If you think this is an issue I can look to fix.

Regards

Mick

def apply(u: => Unit): Subscription = {
  new Subscription() {

    private val unsubscribed = new AtomicBoolean(false)
    def isUnsubscribed = unsubscribed.get()

    val asJavaSubscription = new rx.Subscription {
      def unsubscribe() { if(!unsubscribed.get()) { u ; unsubscribed.set(true) }}
    }
  }
}
@zsxwing
Copy link
Member

zsxwing commented Dec 4, 2013

A question: can unsubscribed be set to true before u finishes? If no, AtomicBoolean is not enough. I think we need a lock.

@benjchristensen
Copy link
Member

It doesn't need a lock, it just needs to use compareAndSet, getAndSet or some other similar atomic operation.

For example:

I'm somewhat curious as to why Scala adapters are reimplementing this logic rather than leveraging the Java core implementations.

@zsxwing
Copy link
Member

zsxwing commented Dec 4, 2013

Subscription in RxScala has one more method def isUnsubscribed: Boolean comparing with RxJava (https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala). If we only uss CAS, for example,

def unsubscribe() { if(unsubscribed.compareAndSet(false, true)) { u }}

in this implementation, isUnsubscribed can return true before u finishes. Is it OK? If so, I think we need a document to warn such problem.

The following implementation can guarantee isUnsubscribed always return false before u finishes.

def apply(u: => Unit): Subscription = {
    new Subscription() {

      private val lock = new ReentrantLock()
      private val unsubscribed = new AtomicBoolean(false)
      def isUnsubscribed = unsubscribed.get()

      val asJavaSubscription = new rx.Subscription {
        def unsubscribe() {
          lock.lock
          try {
            if (!unsubscribed.get()) {
              u;
              unsubscribed.set(true)
            }
          } finally {
            lock.unlock
          }
        }
      }
    }
  }

@zsxwing
Copy link
Member

zsxwing commented Dec 4, 2013

We need a lock because we need to guarantee the codes of the three operators get, u and set are a critical section.

@zsxwing
Copy link
Member

zsxwing commented Dec 4, 2013

@samuelgruetter can you help me confirm if the following conventions are right:

  • If unsubscribe returns, isUnsubscribed should return true.
  • If u does not finish, isUnsubscribed should return false.
  • u should only be called once.

@benjchristensen
Copy link
Member

Why are these not just wrapping Java implementations?

@samuelgruetter
Copy link
Contributor

isUnsubscribed was recently added to Subscription by @headinthebox , but I think this was a quick hack rather than the intended final solution. His idea was that Subscription should have this method, since all its subclasses have it anyways.
But I think we really should keep all logic in Java. So best would be to add isUnsubscribed to the Java Subscription as well, and have the default Subscriptions.create use a BooleanSubscription. And once it's fine in Java, just wrap it in Scala.

@MickDavies
Copy link
Author

So the upshot is. The implementation is wrong, but it is a temporary hack (probably) and the scala classes should really be wrappers so semantics reside in the java implementation with delegation from scala classes. Wrt to semantics, and question from zsxwing, the call to unsubscribe can return before u completes, which seems reasonable and seems to be the behaviour in the java classes.

@zsxwing
Copy link
Member

zsxwing commented Dec 4, 2013

@benjchristensen Could you help clarify the semantics of isUnsubscribed in SafeObservableSubscription?

@benjchristensen
Copy link
Member

I discussed this with @headinthebox last week and there isn't a very strong argument for making the major breaking change of adding isUnsubscribed to the Subscription interface. It greatly complicates people implementing the interface and creates little value.

Changing the Subscription interface makes it a non-functional interface so it can't be implemented with lambdas. It complicates the implementation for the many cases where someone doesn't need to check isUnsubscribed. It breaks most codebases using RxJava in a serious way to change this interface. It is not required behavior for a Subscription. Last of all, subscriptions are supposed to be idempotent anyways so unsubscribe can be called multiple times safely without a check to isUnsubscribed.

If the isUnsubscribed behavior is needed, there are many implementations of Subscription that include it:

Considering all of this, what is trying to be achieved by adding this method to the Subscription interface?

@benjchristensen
Copy link
Member

@zsxwing SafeObservableSubscription is managing a trinary state to know when it is unsubscribed.

null => not subscribed to yet
non-null and != to UNSUBSCRIBED => subscribed
non-null and == to UNSUBSCRIBED => unsubscribed

This is so that a race-condition for using wrap correctly unsubscribes while wrapping if it is already unsubscribed and uses compareAndSet in wrap to safely do this logic.

This is important so the unsubscribe action is propagated to the actual subscription either when unsubscribe is called or when wrap is called after already having received an unsubscribe request.

@samuelgruetter
Copy link
Contributor

@benjchristensen that makes sense, so I'll remove isUnsubscribed from Scala Subscription and hope everyone is happy with it?

@benjchristensen
Copy link
Member

Yes, I think it makes sense for Subscription to remain a functional interface with the single unsubscribe method in Java and for Scala to match that.

@zsxwing
Copy link
Member

zsxwing commented Dec 5, 2013

@benjchristensen So if SafeObservableSubscription.isUnsubscribed returns true, it means that either the actualSubscription has been unsubscribed, or it's a promise that it will be unsubscribed in the near future automatically. Am I right?

@benjchristensen
Copy link
Member

In practice yes. It is answering whether the outer SafeObservableSubscription has been unsubscribed - which it has been if it returns true. It the wrap method has not yet been called, then it will unsubscribe in the future to whatever is wrapped.

@headinthebox
Copy link
Contributor

For Scala it is OK to have isUnsubscribed since you just call Subscription{ block } making it as convenient as a functional interface.

@samuelgruetter
Copy link
Contributor

Well @headinthebox if you really want isUnsubscribed in Scala Subscription, you can have it ;-) But then Scala subscription should wrap a Java BooleanSubscription instead of implementing itself the "logic" of remembering if already unsubscribed.

@headinthebox
Copy link
Contributor

@samuelgruetter

  • If the implementation of BooleanSubscription is fixed.
  • What I won't see is what type to give to asJavaSubscription in that case.
  • The extra code in Scala is pretty thin.

@benjchristensen
Copy link
Member

Fixed in #598

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants