Skip to content

Commit

Permalink
Merge pull request #690 from Applied-Duality/benChanges
Browse files Browse the repository at this point in the history
Fixed Scala bindings
  • Loading branch information
benjchristensen committed Dec 24, 2013
2 parents 352683c + 622b661 commit 2952598
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 26 deletions.
4 changes: 2 additions & 2 deletions language-adaptors/rxjava-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ sourceSets {
srcDir 'src/main/scala'
srcDir 'src/test/scala'
srcDir 'src/examples/scala'
srcDir 'src/examples/java'
//srcDir 'src/examples/java'
}
java.srcDirs = []
}
Expand All @@ -34,7 +34,7 @@ sourceSets {
// the scala source set:
scala {
srcDir 'src/examples/scala'
srcDir 'src/examples/java'
//srcDir 'src/examples/java'
}
java.srcDirs = []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1860,6 +1860,38 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.doOnEach(observer.asJavaObserver))
}

/**
* Invokes an action when the source Observable calls <code>onNext</code>.
*
* @param onNext the action to invoke when the source Observable calls <code>onNext</code>
* @return the source Observable with the side-effecting behavior applied
*/
def doOnNext(onNext: T => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnNext(onNext))
}

/**
* Invokes an action if the source Observable calls <code>onError</code>.
*
* @param onError the action to invoke if the source Observable calls
* <code>onError</code>
* @return the source Observable with the side-effecting behavior applied
*/
def doOnError(onError: Throwable => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnError(onError))
}

/**
* Invokes an action when the source Observable calls <code>onCompleted</code>.
*
* @param onCompleted the action to invoke when the source Observable calls
* <code>onCompleted</code>
* @return the source Observable with the side-effecting behavior applied
*/
def doOnCompleted(onCompleted: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnCompleted(onCompleted))
}

/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable.
Expand All @@ -1869,9 +1901,7 @@ trait Observable[+T]
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnEach(
onNext
))
toScalaObservable[T](asJavaObservable.doOnNext(onNext))
}

/**
Expand All @@ -1884,10 +1914,7 @@ trait Observable[+T]
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnEach(
onNext,
onError
))
toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError, ()=>{})))
}

/**
Expand All @@ -1901,11 +1928,7 @@ trait Observable[+T]
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnEach(
onNext,
onError,
onCompleted
))
toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ private [scala] object BooleanSubscription {
private [scala] class BooleanSubscription private[scala] (boolean: rx.subscriptions.BooleanSubscription)
extends Subscription {

override val asJavaSubscription: rx.subscriptions.BooleanSubscription = new rx.subscriptions.BooleanSubscription() {
override val asJavaSubscription: rx.subscriptions.BooleanSubscription = boolean
}

/*
new rx.subscriptions.BooleanSubscription() {
override def unsubscribe(): Unit = {
if(unsubscribed.compareAndSet(false, true)) {
if(!boolean.isUnsubscribed) { boolean.unsubscribe() }
}
}
override def isUnsubscribed(): Boolean = unsubscribed.get() || boolean.isUnsubscribed
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ class MultipleAssignmentSubscription private[scala] (override val asJavaSubscrip
/**
* Gets the underlying subscription.
*/
def subscription: Subscription = Subscription(asJavaSubscription.getSubscription)
def subscription: Subscription = Subscription(asJavaSubscription.get)

/**
* Gets the underlying subscription
* @param that the new subscription
* @return the [[rx.lang.scala.subscriptions.MultipleAssignmentSubscription]] itself.
*/
def subscription_=(that: Subscription): this.type = {
asJavaSubscription.setSubscription(that.asJavaSubscription)
asJavaSubscription.set(that.asJavaSubscription)
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class SerialSubscription private[scala] (override val asJavaSubscription: rx.sub
override def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed

def subscription_=(value: Subscription): this.type = {
asJavaSubscription.setSubscription(value.asJavaSubscription)
asJavaSubscription.set(value.asJavaSubscription)
this
}
def subscription: Subscription = Subscription(asJavaSubscription.getSubscription)
def subscription: Subscription = Subscription(asJavaSubscription.get)

}

4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6793,7 +6793,7 @@ public void onNext(T args) { }
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#dooneach">RxJava Wiki: doOnNext()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804.aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnNext(final Action1<T> onNext) {
public Observable<T> doOnNext(final Action1<? super T> onNext) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() { }
Expand Down Expand Up @@ -6822,7 +6822,7 @@ public void onNext(T args) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#dooneach">RxJava Wiki: doOnEach()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229307.aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnEach(final Action1<Notification<T>> onNotification) {
public Observable<T> doOnEach(final Action1<Notification<? super T>> onNotification) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ private static <T> List<List<T>> toLists(Observable<Observable<T>> observables)
final List<List<T>> lists = new ArrayList<List<T>>();
Observable.concat(observables.map(new Func1<Observable<T>, Observable<List<T>>>() {
@Override
public Observable<List<T>> call(Observable<T> xs) {
return xs.toList();
}
})).toBlockingObservable().forEach(new Action1<List<T>>() {
public Observable<List<T>> call(Observable<T> xs) { return xs.toList(); }
}))
.toBlockingObservable()
.forEach(new Action1<List<T>>() {
@Override
public void call(List<T> xs) {
lists.add(xs);
Expand Down

0 comments on commit 2952598

Please sign in to comment.