Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple getting a subscription from subscribing #547

Closed
abersnaze opened this issue Dec 1, 2013 · 13 comments
Closed

Decouple getting a subscription from subscribing #547

abersnaze opened this issue Dec 1, 2013 · 13 comments
Milestone

Comments

@abersnaze
Copy link
Contributor

I've been holding on to this for a while now because it's a major departure and I wanted to make sure that it was really necessary. Given all of the issues we've seen recently with Schedulers and other operators I think it's time to discuss.

Right now the core method of Subscription subscribe(Observer) just doesn't work well for synchronous source Observables because downstream operations don't get a Subscription until after the Observable has been force fed all the data. For example

Observable.from([1,2,3]).map({ /* expensive operation here */ }).take(1)

The subscribers to this chain of observables will see the correct result be the author might not realize that there is unnecessary computation going on because the expensive operation is also done for 2 and 3 and thrown away be the take operator.

The solution that I'm proposing is to split subscribing to an Observable into two phases. The first step is to get the subscription with a new method on Observable called:

public PartialSubscription<T> getSubscription() {...}

Then define PartialSubscription with all of the subscribe methods to start the sending of data to the observer:

public class PartialSubscription<T> extends Subscription
    public void subscribe( Observer<T> observer ) {...}
    ...
}

The existing subscribe & OnSubscribeFunc can be deprecated and implemented using partial subscriptions means for backwards compatibility of existing operator implementations but to truly get all of the advantages of this all of the operators would have to be revised.

I've submitted an accompanying pull request with implementation implementation details.

@samuelgruetter
Copy link
Contributor

On branch 2phase, this test

Func1<Integer, Integer> expensive = new Func1<Integer, Integer>() {
    @Override
    public Integer call(Integer t1) {
        System.out.println("omg this is so expensive");
        return t1;
    }
};

Action1<Integer> printIt = new Action1<Integer>() {
    @Override
    public void call(Integer t1) {
        System.out.println("Got " + t1);
    }
};

@Test
public void go() {
    Observable<Integer> o = Observable.from(1, 2, 3).map(expensive).take(1);
    o.getSubscription().subscribe(printIt);
}

outputs

omg this is so expensive
Got 1
omg this is so expensive
omg this is so expensive

On branch master (removing .getSubscription() in go()), it outputs exactly the same.

So I don't really get your point. I thought you wanted to have only this output:

omg this is so expensive
Got 1

Could you please elaborate?

@headinthebox
Copy link
Contributor

Why complicate things unnecessarily. You should never use a synchronous source in the matter you describe in the first place.

That's why schedulers exist. A synchronous source is only acceptable for doing cheap short work.

@abersnaze
Copy link
Contributor Author

I hadn't rewritten the from, map or take operators to use the two phase subscriptions yet. I've just committed the a unit test you wrote and made it work.

@abersnaze
Copy link
Contributor Author

I don't think it unnecessary. If synchronous Observables shouldn't be used than why do they exist.

Users can't tell if an Observable is synchronous or not. If code is written that based on an Observable that is asynchronous and it is later changed to synchronous then it could have unintended consequences.

Adding a thread doesn't guarantee that the receiver will get scheduled to issue the unsubscribe in time.

@headinthebox
Copy link
Contributor

BTW,

The subscribers to this chain of observables will see the correct result be the author might not realize that there is unnecessary computation going on because the expensive operation is also done for 2 and 3 and thrown away be the take operator.

In C#

new[]{ 1,2,3}.ToObservable().Select(x => { Console.WriteLine("Expensive {0}", x); return x; })
.Take(1).Subscribe(Console.WriteLine)

prints as expected

Expensive 1
1

Only is you use the immediate scheduler you get, as expected since Immediate is not really a proper scheduler (and should be used with extreme care, if any)

Expensive 1
1
Expensive 2
Expensive 3

But all all other schedulers behave properly.

Too lazy right now to check which scheduler is used by default for this in RxJava, but we should use CurrentThread, which should take care of your original problem.

@headinthebox
Copy link
Contributor

@abersnaze as I point out, it should work if you use the current thread scheduler. Synchronous observables are like extremely sharp knives. Use them only when you know what you are doing, or you risk to cut your limbs off.

@abersnaze
Copy link
Contributor Author

CurrentThreadScheduler only seems to work at the granularity of subscribe or not to subscribe to an Observable. On the master branch I put these two unit tests. The testDontStart() test uses take(0) and passes if I insert the current thread scheduler in two key places. No amount current thread schedulers seems to make testInterruptable().

@headinthebox Is there something that I'm missing that will make testInterruptable() work?

    @Test
    public void testInterruptable() {
        Func1<Integer, Integer> expensive = mock(Func1.class);
        when(expensive.call(any(Integer.class))).thenReturn(10);
        Scheduler sched = Schedulers.currentThread();

        List<Integer> v = Observable.from(1, 2, 3).map(expensive).subscribeOn(sched).take(1).subscribeOn(sched).toList().toBlockingObservable().single();

        assertEquals(1, v.size());
        assertEquals(10, v.get(0).intValue());
        verify(expensive, times(1)).call(any(Integer.class));
    }

    @Test
    public void testDontStart() {
        Func1<Integer, Integer> expensive = mock(Func1.class);
        when(expensive.call(any(Integer.class))).thenReturn(10);
        Scheduler sched = Schedulers.currentThread();

        List<Integer> v = Observable.from(1, 2, 3).map(expensive).subscribeOn(sched).take(0).subscribeOn(sched).toList().toBlockingObservable().single();

        assertEquals(0, v.size());
        verify(expensive, never()).call(any(Integer.class));
    }

@headinthebox
Copy link
Contributor

List v = Observable.from(1, 2, 3).map(expensive).subscribeOn(sched)
==>
Will not work, you need to call from with a scheduler. See my remark about .NET (there .ToObservable() takes a scheduler).

@abersnaze
Copy link
Contributor Author

I've created a new branch in my fork called 1phase where I've been testing out your suggestions. I haven't been able to get the unit tests working with observerOn(currentThreadScheduler). I found these comments in the rx.operators.OperationObserveOn interesting. Should observerOn(currentThreadScheduler) do something different?

        public Subscription onSubscribe(final Observer<? super T> observer) {
            if (scheduler instanceof ImmediateScheduler) {
                // do nothing if we request ImmediateScheduler so we don't invoke overhead
                return source.subscribe(observer);
            } else if (scheduler instanceof CurrentThreadScheduler) {
                // do nothing if we request CurrentThreadScheduler so we don't invoke overhead
                return source.subscribe(observer);
            } else {
                return new Observation(observer).init();
            }
        }

@headinthebox
Copy link
Contributor

You should no start with Observable.from(1, 2, 3) and avoid ImmediateScheduler like the plague. *_Never ever_* use the Observable.from(1,2,3) overload. Instead use the one below and pass CurrentThreadScheduler

public static Observable from(Iterable<? extends T> iterable, Scheduler scheduler) {
return from(iterable).observeOn(scheduler);
}

@abersnaze
Copy link
Contributor Author

Sorry if I didn't make this clear but that is what I did in the 1phase branch but I still didn't see any change in behavior.

@benjchristensen
Copy link
Member

This is related to fixing the CurrentThreadScheduler behavior. @headinthebox and I are planning on working on that soon.

@akarnokd
Copy link
Member

akarnokd commented May 8, 2014

The new Subscriber/Operator/OnSubscribe should handle this case. Can you check & close?

jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
…asure met… (ReactiveX#556)

* Issue ReactiveX#547: Added a Sliding Time Window implementations to measure metrics.

* Issue ReactiveX#547: Added a Fixed size Sliding Window implementation which aggregates the last N calls and replaced the existing RingBitSet.
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
… support … (ReactiveX#564)

* Issue ReactiveX#547: Adapted CircuitBreakerConfigurationProperties to support the new sliding window types.
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
…lector an… (ReactiveX#574)

* Issue ReactiveX#547: Added slow call rate to CircuitBreakerMetricsCollector and TaggedCircuitBreakerMetrics.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants