Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behavior subject time gap fix 2 #1185

Merged
merged 6 commits into from
May 20, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 32 additions & 65 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
*/
package rx.subjects;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.operators.NotificationLite;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;

/**
Expand Down Expand Up @@ -56,88 +52,59 @@
public final class AsyncSubject<T> extends Subject<T, T> {

public static <T> AsyncSubject<T> create() {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(Notification.<T>createOnCompleted());

OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
* This function executes at beginning of subscription.
*
* This will always run, even if Subject is in terminal state.
*/
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
// nothing to do if not terminated
}
},
/**
* This function executes if the Subject is terminated.
*/
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
// we want the last value + completed so add this extra logic
// to send onCompleted if the last value is an onNext
emitValueToObserver(lastNotification.get(), o);
}
}, null);

return new AsyncSubject<T>(onSubscribe, subscriptionManager, lastNotification);
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
state.onTerminated = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
Object v = state.get();
o.accept(v);
o.completeSingle(v);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat confusing to read. It took me a bit to realize that completeSingle is actually "complete only if not already completed in the previous o.accept if an onError occurred, or an onCompleted with no onNext".

Also, it seems only applicable to AsyncSubject yet lives in the SubjectSubscriptionManager.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll redo ReplaySubject anyway so I'll look at this.

}
};
return new AsyncSubject<T>(state, state);
}

protected static <T> void emitValueToObserver(Notification<T> n, Observer<? super T> o) {
n.accept(o);
if (n.isOnNext()) {
o.onCompleted();
}
}
final SubjectSubscriptionManager<T> state;
volatile Object lastValue;
private final NotificationLite<T> nl = NotificationLite.instance();

private final SubjectSubscriptionManager<T> subscriptionManager;
final AtomicReference<Notification<T>> lastNotification;

protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
super(onSubscribe);
this.subscriptionManager = subscriptionManager;
this.lastNotification = lastNotification;
this.state = state;
}

@Override
public void onCompleted() {
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {

@Override
public void call() {
if (state.active) {
Object last = lastValue;
if (last == null) {
last = nl.completed();
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
emitValueToObserver(lastNotification.get(), o);
for (SubjectObserver<T> bo : state.terminate(last)) {
if (last == nl.completed()) {
bo.onCompleted();
} else {
bo.onNext(nl.getValue(last));
bo.onCompleted();
}
}
}
}

@Override
public void onError(final Throwable e) {
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
@Override
public void call() {
lastNotification.set(Notification.<T> createOnError(e));
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
emitValueToObserver(lastNotification.get(), o);
if (state.active) {
Object n = nl.error(e);
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.onError(e);
}
}

}

@Override
public void onNext(T v) {
lastNotification.set(Notification.createOnNext(v));
lastValue = nl.next(v);
}

}
130 changes: 53 additions & 77 deletions rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
*/
package rx.subjects;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.operators.NotificationLite;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.subscriptions.Subscriptions;

/**
* Subject that publishes the most recent and all subsequent events to each subscribed {@link Observer}.
Expand Down Expand Up @@ -65,110 +65,86 @@
*
* @param <T>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public final class BehaviorSubject<T> extends Subject<T, T> {

/**
* Create a {@link BehaviorSubject} without a default value.
* @param <T> the value type
* @return the constructed {@link BehaviorSubject}
*/
public static <T> BehaviorSubject<T> create() {
return create(null, false);
}
/**
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it.
*
* @param <T> the value type
* @param defaultValue
* the value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events
* @return the constructed {@link BehaviorSubject}
*/
public static <T> BehaviorSubject<T> create(T defaultValue) {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
// set a default value so subscriptions will immediately receive this until a new notification is received
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(Notification.createOnNext(defaultValue));

OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
* This function executes at beginning of subscription.
*
* This will always run, even if Subject is in terminal state.
*/
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
/*
* When we subscribe we always emit the latest value to the observer.
*
* Here we only emit if it's an onNext as terminal states are handled in the next function.
*/
Notification<T> n = lastNotification.get();
if (n.isOnNext()) {
n.accept(o);
}
}
},
/**
* This function executes if the Subject is terminated before subscription occurs.
*/
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
/*
* If we are already terminated, or termination happens while trying to subscribe
* this will be invoked and we emit whatever the last terminal value was.
*/
lastNotification.get().accept(o);
}
}, null);
return create(defaultValue, true);
}
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
if (hasDefault) {
state.set(NotificationLite.instance().next(defaultValue));
}
state.onAdded = new Action1<SubjectObserver<T>>() {

return new BehaviorSubject<T>(onSubscribe, subscriptionManager, lastNotification);
@Override
public void call(SubjectObserver<T> o) {
o.emitFirst(state.get());
}

};
state.onTerminated = state.onAdded;
return new BehaviorSubject<T>(state, state);
}

private final SubjectSubscriptionManager<T> subscriptionManager;
final AtomicReference<Notification<T>> lastNotification;
private final SubjectSubscriptionManager<T> state;
private final NotificationLite<T> nl = NotificationLite.instance();

protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
super(onSubscribe);
this.subscriptionManager = subscriptionManager;
this.lastNotification = lastNotification;
this.state = state;
}

@Override
public void onCompleted() {
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {

@Override
public void call() {
lastNotification.set(Notification.<T> createOnCompleted());
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
o.onCompleted();
Object last = state.get();
if (last == null || state.active) {
Object n = nl.completed();
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
}
}
}

@Override
public void onError(final Throwable e) {
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {

@Override
public void call() {
lastNotification.set(Notification.<T> createOnError(e));
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
o.onError(e);
public void onError(Throwable e) {
Object last = state.get();
if (last == null || state.active) {
Object n = nl.error(e);
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
}
}
}

@Override
public void onNext(T v) {
// do not overwrite a terminal notification
// so new subscribers can get them
if (lastNotification.get().isOnNext()) {
lastNotification.set(Notification.createOnNext(v));
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
o.onNext(v);
Object last = state.get();
if (last == null || state.active) {
Object n = nl.next(v);
for (SubjectObserver<T> bo : state.next(n)) {
bo.emitNext(n);
}
}
}


/* test support */ int subscriberCount() {
return state.observers().length;
}
}
Loading