diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java index 3f96e97b14..01de861b04 100644 --- a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java @@ -11,15 +11,15 @@ public final class DebugSubscriber extends Subscriber { private final Func1 onNextHook; final Action1 events; final Observer o; - Operator from = null; - Operator to = null; + Operator from = null; + Operator to = null; public DebugSubscriber( Func1 onNextHook, Action1 _events, Subscriber _o, - Operator _out, - Operator _in) { + Operator _out, + Operator _in) { super(_o); this.events = _events; this.o = _o; @@ -47,19 +47,19 @@ public void onNext(T t) { o.onNext(onNextHook.call(t)); } - public Operator getFrom() { + public Operator getFrom() { return from; } - public void setFrom(Operator op) { - this.from = op; + public void setFrom(Operator bind) { + this.from = bind; } - public Operator getTo() { + public Operator getTo() { return to; } - public void setTo(Operator op) { + public void setTo(Operator op) { this.to = op; } diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java index 8a50cd9da0..f52d5f945c 100644 --- a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java @@ -63,7 +63,7 @@ public void call(Subscriber o) { } @Override - public Operator onLift(final Operator bind) { + public Operator onLift(final Operator bind) { return new Operator() { @Override public Subscriber call(final Subscriber o) { @@ -78,7 +78,7 @@ public Subscription onAdd(Subscriber subscriber, Subscription s) { } @SuppressWarnings("unchecked") - private Subscriber wrapOutbound(Operator bind, Subscriber o) { + private Subscriber wrapOutbound(Operator bind, Subscriber o) { if (o instanceof DebugSubscriber) { if (bind != null) ((DebugSubscriber) o).setFrom(bind); @@ -88,7 +88,7 @@ private Subscriber wrapOutbound(Operator bind, Subscriber Subscriber wrapInbound(Operator bind, Subscriber o) { + private Subscriber wrapInbound(Operator bind, Subscriber o) { if (o instanceof DebugSubscriber) { if (bind != null) ((DebugSubscriber) o).setTo(bind); diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java index f50d6c5834..f4144fb2b0 100644 --- a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java @@ -13,17 +13,17 @@ public static enum Kind { } private final OnSubscribe source; - private final Operator from; + private final Operator from; private final Kind kind; private final Notification notification; - private final Operator to; + private final Operator to; private final long nanoTime; private final long threadId; private Observer o; public static DebugNotification createSubscribe(Observer o, OnSubscribe source) { - Operator to = null; - Operator from = null; + Operator to = null; + Operator from = null; if (o instanceof DebugSubscriber) { to = ((DebugSubscriber) o).getTo(); from = ((DebugSubscriber) o).getFrom(); @@ -32,23 +32,23 @@ public static DebugNotification createSubscribe(Observer o, On return new DebugNotification(o, from, Kind.Subscribe, null, to, source); } - public static DebugNotification createOnNext(Observer o, Operator from, T t, Operator to) { + public static DebugNotification createOnNext(Observer o, Operator from, T t, Operator to) { return new DebugNotification(o, from, Kind.OnNext, Notification.createOnNext(t), to, null); } - public static DebugNotification createOnError(Observer o, Operator from, Throwable e, Operator to) { + public static DebugNotification createOnError(Observer o, Operator from, Throwable e, Operator to) { return new DebugNotification(o, from, Kind.OnError, Notification. createOnError(e), to, null); } - public static DebugNotification createOnCompleted(Observer o, Operator from, Operator to) { + public static DebugNotification createOnCompleted(Observer o, Operator from, Operator to) { return new DebugNotification(o, from, Kind.OnCompleted, Notification. createOnCompleted(), to, null); } - public static DebugNotification createUnsubscribe(Observer o, Operator from, Operator to) { + public static DebugNotification createUnsubscribe(Observer o, Operator from, Operator to) { return new DebugNotification(o, from, Kind.Unsubscribe, null, to, null); } - private DebugNotification(Observer o, Operator from, Kind kind, Notification notification, Operator to, OnSubscribe source) { + private DebugNotification(Observer o, Operator from, Kind kind, Notification notification, Operator to, OnSubscribe source) { this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o; this.from = from; this.kind = kind; @@ -59,7 +59,7 @@ private DebugNotification(Observer o, Operator from, Kind kind, Notificati this.threadId = Thread.currentThread().getId(); } - public Operator getFrom() { + public Operator getFrom() { return from; } @@ -67,7 +67,7 @@ public Notification getNotification() { return notification; } - public Operator getTo() { + public Operator getTo() { return to; } diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java deleted file mode 100644 index 000e741b78..0000000000 --- a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java +++ /dev/null @@ -1,104 +0,0 @@ -package rx.plugins; - -import rx.Notification; -import rx.Observable.OnSubscribe; -import rx.Observable.Operator; -import rx.Observer; -import rx.observers.SafeSubscriber; -import rx.operators.DebugSubscriber; - -public class NotificationEvent { - public static enum Kind { - OnNext, OnError, OnCompleted, Subscribe, Unsubscribe - } - - private final OnSubscribe source; - private final Operator from; - private final Kind kind; - private final Notification notification; - private final Operator to; - private final long nanoTime; - private final long threadId; - private Observer o; - - public static NotificationEvent createSubscribe(Observer o, OnSubscribe source) { - Operator to = null; - Operator from = null; - if (o instanceof DebugSubscriber) { - to = ((DebugSubscriber) o).getTo(); - from = ((DebugSubscriber) o).getFrom(); - o = ((DebugSubscriber) o).getActual(); - } - return new NotificationEvent(o, from, Kind.Subscribe, null, to, source); - } - - public static NotificationEvent createOnNext(Observer o, Operator from, T t, Operator to) { - return new NotificationEvent(o, from, Kind.OnNext, Notification.createOnNext(t), to, null); - } - - public static NotificationEvent createOnError(Observer o, Operator from, Throwable e, Operator to) { - return new NotificationEvent(o, from, Kind.OnError, Notification. createOnError(e), to, null); - } - - public static NotificationEvent createOnCompleted(Observer o, Operator from, Operator to) { - return new NotificationEvent(o, from, Kind.OnCompleted, Notification. createOnCompleted(), to, null); - } - - public static NotificationEvent createUnsubscribe(Observer o, Operator from, Operator to) { - return new NotificationEvent(o, from, Kind.Unsubscribe, null, to, null); - } - - private NotificationEvent(Observer o, Operator from, Kind kind, Notification notification, Operator to, OnSubscribe source) { - this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o; - this.from = from; - this.kind = kind; - this.notification = notification; - this.to = to; - this.source = source; - this.nanoTime = System.nanoTime(); - this.threadId = Thread.currentThread().getId(); - } - - public Operator getFrom() { - return from; - } - - public Notification getNotification() { - return notification; - } - - public Operator getTo() { - return to; - } - - public long getNanoTime() { - return nanoTime; - } - - public long getThreadId() { - return threadId; - } - - @Override - public String toString() { - final StringBuilder s = new StringBuilder("{"); - s.append(" \"nano\": ").append(nanoTime); - s.append(", \"thread\": ").append(threadId); - s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\""); - s.append(", \"type\": \"").append(kind).append("\""); - if (notification != null) { - if (notification.hasValue()) - s.append(", \"value\": \"").append(notification.getValue()).append("\""); - if (notification.hasThrowable()) - s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\""); - } - if (source != null) - s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\""); - if (from != null) - s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\""); - if (to != null) - s.append(", \"to\": \"").append(to.getClass().getName()).append("@").append(Integer.toHexString(to.hashCode())).append("\""); - s.append("}"); - return s.toString(); - } -} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d83e5a4dbe..1dedc27516 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -261,7 +261,7 @@ public static interface OnSubscribeFunc extends Function { * @param bind * @return an Observable that emits values that are the result of applying the bind function to the values of the current Observable */ - public Observable lift(final Operator bind) { + public Observable lift(final Operator bind) { return new Observable(new OnSubscribe() { @Override public void call(Subscriber o) { @@ -1777,7 +1777,7 @@ public final static Observable merge(IterableMSDN: Observable.Merge */ public final static Observable merge(Observable> source) { - return source.lift(new OperatorMerge()); // any idea how to get these generics working?! + return source.lift(new OperatorMerge()); } /** @@ -1821,7 +1821,7 @@ public final static Observable merge(ObservableMSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2) { - return merge(from(t1, t2)); + return merge(from(Arrays.asList(t1, t2))); } /** @@ -1843,7 +1843,7 @@ public final static Observable merge(Observable t1, Observab * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3) { - return merge(from(t1, t2, t3)); + return merge(from(Arrays.asList(t1, t2, t3))); } /** @@ -1867,7 +1867,7 @@ public final static Observable merge(Observable t1, Observab * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4) { - return merge(from(t1, t2, t3, t4)); + return merge(from(Arrays.asList(t1, t2, t3, t4))); } /** @@ -1893,7 +1893,7 @@ public final static Observable merge(Observable t1, Observab * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { - return merge(from(t1, t2, t3, t4, t5)); + return merge(from(Arrays.asList(t1, t2, t3, t4, t5))); } /** @@ -1921,7 +1921,7 @@ public final static Observable merge(Observable t1, Observab * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { - return merge(from(t1, t2, t3, t4, t5, t6)); + return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6))); } /** @@ -1951,7 +1951,7 @@ public final static Observable merge(Observable t1, Observab * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { - return merge(from(t1, t2, t3, t4, t5, t6, t7)); + return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7))); } /** @@ -1984,7 +1984,7 @@ public final static Observable merge(Observable t1, Observab * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { - return merge(from(t1, t2, t3, t4, t5, t6, t7, t8)); + return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8))); } /** @@ -2019,7 +2019,7 @@ public final static Observable merge(Observable t1, Observab */ // suppress because the types are checked by the method signature before using a vararg public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { - return merge(from(t1, t2, t3, t4, t5, t6, t7, t8, t9)); + return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9))); } /** @@ -4863,7 +4863,7 @@ public final Observable map(Func1 func) { * @deprecated use {@link #flatMap(Func1)} */ @Deprecated - public final Observable mapMany(Func1> func) { + public final Observable mapMany(Func1> func) { return mergeMap(func); } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java index e6897764c6..9425916dac 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java @@ -30,13 +30,13 @@ * You can combine the items emitted by multiple Observables so that they act like a single * Observable, by using the merge operation. */ -public final class OperatorMerge implements Operator> { +public final class OperatorMerge implements Operator> { @Override - public Subscriber> call(final Subscriber outerOperation) { + public Subscriber> call(final Subscriber outerOperation) { final Subscriber o = new SynchronizedSubscriber(outerOperation); - return new Subscriber>(outerOperation) { + return new Subscriber>(outerOperation) { private volatile boolean completed = false; private final AtomicInteger runningCount = new AtomicInteger(); @@ -55,7 +55,7 @@ public void onError(Throwable e) { } @Override - public void onNext(Observable innerObservable) { + public void onNext(Observable innerObservable) { runningCount.incrementAndGet(); innerObservable.subscribe(new InnerObserver()); } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java index e367e8e3a2..2c1c3f67e9 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java @@ -97,7 +97,7 @@ public OnSubscribe onCreate(OnSubscribe f) { return f; } - public Operator onLift(final Operator bind) { + public Operator onLift(final Operator bind) { return bind; }