Skip to content

Commit

Permalink
Merge pull request #1053 from benjchristensen/deprecation-cleanup
Browse files Browse the repository at this point in the history
Deprecation Cleanup
  • Loading branch information
benjchristensen committed Apr 20, 2014
2 parents d40c868 + e505cee commit ccac9e7
Show file tree
Hide file tree
Showing 81 changed files with 141 additions and 5,919 deletions.
8 changes: 4 additions & 4 deletions language-adaptors/rxjava-clojure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,22 @@ or, at the REPL:
```

### Using rx/fn
Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.util.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`:
Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`:

```clojure
(-> my-observable
(.map (rx/fn [v] (* 2 v))))
```

If you already have a plain old Clojure function you'd like to use, you can pass it to the `rx/fn*` function to get a new object that implements `rx.util.functions.Func`:
If you already have a plain old Clojure function you'd like to use, you can pass it to the `rx/fn*` function to get a new object that implements `rx.functions.Func`:

```clojure
(-> my-numbers
(.reduce (rx/fn* +)))
```

### Using rx/action
The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.util.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts:
The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts:

```clojure
(-> my-observable
Expand All @@ -133,7 +133,7 @@ The `rx/action` macro is identical to `rx/fn` except that the object returned im
```

### Using Observable/create
As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnSubscribe` which is basically an alias for `rx.util.functions.Action1` that takes an `rx.Subscriber` as its argument. Thus, you can just use `rx/action` when creating new observables:
As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnSubscribe` which is basically an alias for `rx.functions.Action1` that takes an `rx.Subscriber` as its argument. Thus, you can just use `rx/action` when creating new observables:

```clojure
; A simple observable that emits 0..9 taking unsubscribe into account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
(instance? rx.Observable action)
(rx/on-next observer
(.finallyDo ^rx.Observable action
(reify rx.util.functions.Action0
(reify rx.functions.Action0
(call [this]
(swap! state-atom update-in [:in-flight] disj action)
(advance! state-atom)))))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
BlockingObservable
GroupedObservable]
[rx.subscriptions Subscriptions]
[rx.util.functions Action0 Action1 Func0 Func1 Func2]))
[rx.functions Action0 Action1 Func0 Func1 Func2]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -292,16 +292,14 @@
;################################################################################
; Operators

(defn synchronize
"Synchronize execution.
(defn serialize
"Serialize execution.
See:
rx.Observable/synchronize
rx.Observable/serialize
"
([^Observable xs]
(.synchronize xs))
([lock ^Observable xs]
(.synchronize xs lock)))
(.serialize xs)))

(defn merge*
"Merge an Observable of Observables into a single Observable
Expand Down Expand Up @@ -472,10 +470,10 @@
empty Observable if xs is empty.
See:
rx.Observable/takeFirst
rx.Observable/take(1)
"
[^Observable xs]
(.takeFirst xs))
(.take xs 1))

(defn ^Observable group-by
"Returns an Observable of clojure.lang.MapEntry where the key is the result of
Expand Down Expand Up @@ -832,7 +830,7 @@
See:
rx.Observable/onErrorResumeNext
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#onErrorResumeNext(rx.util.functions.Func1)
http://netflix.github.io/RxJava/javadoc/rx/Observable.html#onErrorResumeNext(rx.functions.Func1)
"
[p f ^Observable o]
(let [p (if (class? p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,14 @@
(b/into []))]
(is (= [2 4] result))))

(deftest test-syncrhonize
; I'm going to believe synchronize works and just exercise it
(deftest test-serialize
; I'm going to believe serialize works and just exercise it
; here for sanity.
(is (= [1 2 3]
(->> [1 2 3]
(rx/seq->o)
(rx/synchronize)
(b/into []))))
(let [lock (Object.)]
(is (= [1 2 3]
(->> [1 2 3]
(rx/seq->o)
(rx/synchronize lock)
(b/into []))))))
(rx/serialize)
(b/into [])))))

(let [expected-result [[1 3 5] [2 4 6]]
sleepy-o #(f/future-generator*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def class ObservableTests {

Observable.from("one", "two", "three", "four", "five", "six")
.groupBy({String s -> s.length()})
.mapMany({
.flatMap({
groupObservable ->

return groupObservable.map({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ trait Observable[+T]
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its [[rx.lang.scala.Observer]]s
*/
def synchronize: Observable[T] = {
toScalaObservable[T](asJavaObservable.synchronize)
def serialize: Observable[T] = {
toScalaObservable[T](asJavaObservable.serialize)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@
*/
package rx.quasar;

import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.AbstractFuture;
import co.paralleluniverse.strands.ConditionSynchronizer;
import co.paralleluniverse.strands.SimpleConditionSynchronizer;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.DelegatingReceivePort;
import co.paralleluniverse.strands.channels.ProducerException;
import co.paralleluniverse.strands.channels.ReceivePort;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -38,6 +28,16 @@
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.SafeSubscriber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.AbstractFuture;
import co.paralleluniverse.strands.ConditionSynchronizer;
import co.paralleluniverse.strands.SimpleConditionSynchronizer;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.DelegatingReceivePort;
import co.paralleluniverse.strands.channels.ProducerException;
import co.paralleluniverse.strands.channels.ReceivePort;

/**
* An extension of {@link Observable} that provides blocking operators, compatible with both threads and fibers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
* limitations under the License.
*/package rx.quasar;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import co.paralleluniverse.fibers.FiberAsync;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
Expand All @@ -22,12 +29,6 @@
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.ReceivePort;
import co.paralleluniverse.strands.channels.SendPort;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;

/**
* This class contains static methods that connect {@link Observable}s and {@link Channel}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@
*/
package rx.quasar;

import co.paralleluniverse.fibers.DefaultFiberScheduler;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.FiberScheduler;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.functions.Action0;
import co.paralleluniverse.fibers.DefaultFiberScheduler;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.FiberScheduler;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;

/**
* Schedules work on a new fiber.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package rx.quasar;

import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.channels.ReceivePort;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.channels.ReceivePort;

/**
* Converts a {@link ReceivePort} into an Observable that emits each message received on the channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
*/
package rx.quasar;

import co.paralleluniverse.fibers.instrument.MethodDatabase;
import co.paralleluniverse.fibers.instrument.SimpleSuspendableClassifier;
import co.paralleluniverse.fibers.instrument.SuspendableClassifier;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import co.paralleluniverse.fibers.instrument.MethodDatabase;
import co.paralleluniverse.fibers.instrument.SimpleSuspendableClassifier;
import co.paralleluniverse.fibers.instrument.SuspendableClassifier;

public class RxSuspendableClassifier implements SuspendableClassifier {
private static final Set<String> CORE_PACKAGES = new HashSet<String>(Arrays.asList(new String[]{
"rx", "rx.joins", "rx.observables", "rx.observers", "rx.operators", "rx.plugins", "rx.schedulers",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@
*/
package rx.quasar;

import co.paralleluniverse.strands.channels.ProducerException;
import co.paralleluniverse.strands.channels.ReceivePort;
import static org.junit.Assert.*;

import java.util.Iterator;
import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import org.junit.Assert;
import org.junit.Before;
Expand All @@ -32,10 +28,12 @@
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import co.paralleluniverse.strands.channels.ProducerException;
import co.paralleluniverse.strands.channels.ReceivePort;

public class BlockingObservableTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,35 @@
*/
package rx.quasar;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.SuspendableCallable;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.ProducerException;
import co.paralleluniverse.strands.channels.ReceivePort;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.subjects.PublishSubject;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.SuspendableCallable;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.ProducerException;
import co.paralleluniverse.strands.channels.ReceivePort;

public class ChannelObservableTest {
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package rx.quasar;

import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import rx.Scheduler;
import rx.schedulers.AbstractSchedulerConcurrencyTests;

Expand Down
37 changes: 0 additions & 37 deletions rxjava-core/src/main/java/rx/Notification.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,43 +52,6 @@ private Notification(Kind kind, T value, Throwable e) {
this.kind = kind;
}

/**
* A constructor used to represent an onNext notification.
*
* @param value
* The data passed to the onNext method.
*/
@Deprecated
public Notification(T value) {
this.value = value;
this.throwable = null;
this.kind = Kind.OnNext;
}

/**
* A constructor used to represent an onError notification.
*
* @param exception
* The exception passed to the onError notification.
* @deprecated Because type Throwable can't disambiguate the constructors if both onNext and onError are type "Throwable"
*/
@Deprecated
public Notification(Throwable exception) {
this.throwable = exception;
this.value = null;
this.kind = Kind.OnError;
}

/**
* A constructor used to represent an onCompleted notification.
*/
@Deprecated
public Notification() {
this.throwable = null;
this.value = null;
this.kind = Kind.OnCompleted;
}

/**
* Retrieves the exception associated with an onError notification.
*
Expand Down
Loading

0 comments on commit ccac9e7

Please sign in to comment.