diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan0.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan0.java index aaaf75c447..94382b4c67 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan0.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan0.java @@ -24,7 +24,7 @@ public abstract class ActivePlan0 { protected final Map joinObservers = new HashMap(); - public abstract void match(); + protected abstract void match(); protected void addJoinObserver(JoinObserver joinObserver) { joinObservers.put(joinObserver, joinObserver); diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan1.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan1.java index 1595e3f6d5..ea45aa5c27 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan1.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan1.java @@ -22,22 +22,22 @@ /** * Represents an active plan. */ -public class ActivePlan1 extends ActivePlan0 { +public final class ActivePlan1 extends ActivePlan0 { private final Action1 onNext; private final Action0 onCompleted; - private final JoinObserver1 first; + private final JoinObserver1 jo1; - public ActivePlan1(JoinObserver1 first, Action1 onNext, Action0 onCompleted) { + ActivePlan1(JoinObserver1 jo1, Action1 onNext, Action0 onCompleted) { this.onNext = onNext; this.onCompleted = onCompleted; - this.first = first; - addJoinObserver(first); + this.jo1 = jo1; + addJoinObserver(jo1); } @Override - public void match() { - if (!first.queue().isEmpty()) { - Notification n1 = first.queue().peek(); + protected void match() { + if (!jo1.queue().isEmpty()) { + Notification n1 = jo1.queue().peek(); if (n1.isOnCompleted()) { onCompleted.call(); } else { diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan2.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan2.java index a477e99066..66b8bc39c6 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan2.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan2.java @@ -22,26 +22,26 @@ /** * Represents an active plan. */ -public class ActivePlan2 extends ActivePlan0 { +public final class ActivePlan2 extends ActivePlan0 { private final Action2 onNext; private final Action0 onCompleted; - private final JoinObserver1 first; - private final JoinObserver1 second; + private final JoinObserver1 jo1; + private final JoinObserver1 jo2; - public ActivePlan2(JoinObserver1 first, JoinObserver1 second, Action2 onNext, Action0 onCompleted) { + ActivePlan2(JoinObserver1 jo1, JoinObserver1 jo2, Action2 onNext, Action0 onCompleted) { this.onNext = onNext; this.onCompleted = onCompleted; - this.first = first; - this.second = second; - addJoinObserver(first); - addJoinObserver(second); + this.jo1 = jo1; + this.jo2 = jo2; + addJoinObserver(jo1); + addJoinObserver(jo2); } @Override - public void match() { - if (!first.queue().isEmpty() && !second.queue().isEmpty()) { - Notification n1 = first.queue().peek(); - Notification n2 = second.queue().peek(); + protected void match() { + if (!jo1.queue().isEmpty() && !jo2.queue().isEmpty()) { + Notification n1 = jo1.queue().peek(); + Notification n2 = jo2.queue().peek(); if (n1.isOnCompleted() || n2.isOnCompleted()) { onCompleted.call(); diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan3.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan3.java index d0a90002e4..ddc9bc633e 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan3.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan3.java @@ -22,14 +22,14 @@ /** * Represents an active plan. */ -public class ActivePlan3 extends ActivePlan0 { +public final class ActivePlan3 extends ActivePlan0 { private final Action3 onNext; private final Action0 onCompleted; private final JoinObserver1 first; private final JoinObserver1 second; private final JoinObserver1 third; - public ActivePlan3(JoinObserver1 first, + ActivePlan3(JoinObserver1 first, JoinObserver1 second, JoinObserver1 third, Action3 onNext, @@ -45,7 +45,7 @@ public ActivePlan3(JoinObserver1 first, } @Override - public void match() { + protected void match() { if (!first.queue().isEmpty() && !second.queue().isEmpty() && !third.queue().isEmpty()) { diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan4.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan4.java new file mode 100644 index 0000000000..e3032dca4b --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan4.java @@ -0,0 +1,75 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.functions.Action0; +import rx.functions.Action4; + +/** + * Represents an active plan. + */ +public final class ActivePlan4 extends ActivePlan0 { + private final Action4 onNext; + private final Action0 onCompleted; + private final JoinObserver1 jo1; + private final JoinObserver1 jo2; + private final JoinObserver1 jo3; + private final JoinObserver1 jo4; + + ActivePlan4( + JoinObserver1 jo1, + JoinObserver1 jo2, + JoinObserver1 jo3, + JoinObserver1 jo4, + Action4 onNext, + Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.jo1 = jo1; + this.jo2 = jo2; + this.jo3 = jo3; + this.jo4 = jo4; + addJoinObserver(jo1); + addJoinObserver(jo2); + addJoinObserver(jo3); + addJoinObserver(jo4); + } + + @Override + protected void match() { + if (!jo1.queue().isEmpty() + && !jo2.queue().isEmpty() + && !jo3.queue().isEmpty() + && !jo4.queue().isEmpty()) { + Notification n1 = jo1.queue().peek(); + Notification n2 = jo2.queue().peek(); + Notification n3 = jo3.queue().peek(); + Notification n4 = jo4.queue().peek(); + + if (n1.isOnCompleted() + || n2.isOnCompleted() + || n3.isOnCompleted() + || n4.isOnCompleted()) { + onCompleted.call(); + } else { + dequeue(); + onNext.call(n1.getValue(), n2.getValue(), n3.getValue(), n4.getValue()); + } + } + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan5.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan5.java new file mode 100644 index 0000000000..4fff0da151 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan5.java @@ -0,0 +1,90 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.functions.Action0; +import rx.functions.Action5; + +/** + * Represents an active plan. + */ +public final class ActivePlan5 extends ActivePlan0 { + private final Action5 onNext; + private final Action0 onCompleted; + private final JoinObserver1 jo1; + private final JoinObserver1 jo2; + private final JoinObserver1 jo3; + private final JoinObserver1 jo4; + private final JoinObserver1 jo5; + + ActivePlan5( + JoinObserver1 jo1, + JoinObserver1 jo2, + JoinObserver1 jo3, + JoinObserver1 jo4, + JoinObserver1 jo5, + Action5 onNext, + Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.jo1 = jo1; + this.jo2 = jo2; + this.jo3 = jo3; + this.jo4 = jo4; + this.jo5 = jo5; + addJoinObserver(jo1); + addJoinObserver(jo2); + addJoinObserver(jo3); + addJoinObserver(jo4); + addJoinObserver(jo5); + } + + @Override + protected void match() { + if (!jo1.queue().isEmpty() + && !jo2.queue().isEmpty() + && !jo3.queue().isEmpty() + && !jo4.queue().isEmpty() + && !jo5.queue().isEmpty() + ) { + Notification n1 = jo1.queue().peek(); + Notification n2 = jo2.queue().peek(); + Notification n3 = jo3.queue().peek(); + Notification n4 = jo4.queue().peek(); + Notification n5 = jo5.queue().peek(); + + if (n1.isOnCompleted() + || n2.isOnCompleted() + || n3.isOnCompleted() + || n4.isOnCompleted() + || n5.isOnCompleted() + ) { + onCompleted.call(); + } else { + dequeue(); + onNext.call( + n1.getValue(), + n2.getValue(), + n3.getValue(), + n4.getValue(), + n5.getValue() + ); + } + } + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan6.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan6.java new file mode 100644 index 0000000000..36076aa9b4 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan6.java @@ -0,0 +1,98 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.functions.Action0; +import rx.functions.Action6; + +/** + * Represents an active plan. + */ +public final class ActivePlan6 extends ActivePlan0 { + private final Action6 onNext; + private final Action0 onCompleted; + private final JoinObserver1 jo1; + private final JoinObserver1 jo2; + private final JoinObserver1 jo3; + private final JoinObserver1 jo4; + private final JoinObserver1 jo5; + private final JoinObserver1 jo6; + + ActivePlan6( + JoinObserver1 jo1, + JoinObserver1 jo2, + JoinObserver1 jo3, + JoinObserver1 jo4, + JoinObserver1 jo5, + JoinObserver1 jo6, + Action6 onNext, + Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.jo1 = jo1; + this.jo2 = jo2; + this.jo3 = jo3; + this.jo4 = jo4; + this.jo5 = jo5; + this.jo6 = jo6; + addJoinObserver(jo1); + addJoinObserver(jo2); + addJoinObserver(jo3); + addJoinObserver(jo4); + addJoinObserver(jo5); + addJoinObserver(jo6); + } + + @Override + protected void match() { + if (!jo1.queue().isEmpty() + && !jo2.queue().isEmpty() + && !jo3.queue().isEmpty() + && !jo4.queue().isEmpty() + && !jo5.queue().isEmpty() + && !jo6.queue().isEmpty() + ) { + Notification n1 = jo1.queue().peek(); + Notification n2 = jo2.queue().peek(); + Notification n3 = jo3.queue().peek(); + Notification n4 = jo4.queue().peek(); + Notification n5 = jo5.queue().peek(); + Notification n6 = jo6.queue().peek(); + + if (n1.isOnCompleted() + || n2.isOnCompleted() + || n3.isOnCompleted() + || n4.isOnCompleted() + || n5.isOnCompleted() + || n6.isOnCompleted() + ) { + onCompleted.call(); + } else { + dequeue(); + onNext.call( + n1.getValue(), + n2.getValue(), + n3.getValue(), + n4.getValue(), + n5.getValue(), + n6.getValue() + ); + } + } + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan7.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan7.java new file mode 100644 index 0000000000..cac20bdbcd --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan7.java @@ -0,0 +1,106 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.functions.Action0; +import rx.functions.Action7; + +/** + * Represents an active plan. + */ +public final class ActivePlan7 extends ActivePlan0 { + private final Action7 onNext; + private final Action0 onCompleted; + private final JoinObserver1 jo1; + private final JoinObserver1 jo2; + private final JoinObserver1 jo3; + private final JoinObserver1 jo4; + private final JoinObserver1 jo5; + private final JoinObserver1 jo6; + private final JoinObserver1 jo7; + + ActivePlan7( + JoinObserver1 jo1, + JoinObserver1 jo2, + JoinObserver1 jo3, + JoinObserver1 jo4, + JoinObserver1 jo5, + JoinObserver1 jo6, + JoinObserver1 jo7, + Action7 onNext, + Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.jo1 = jo1; + this.jo2 = jo2; + this.jo3 = jo3; + this.jo4 = jo4; + this.jo5 = jo5; + this.jo6 = jo6; + this.jo7 = jo7; + addJoinObserver(jo1); + addJoinObserver(jo2); + addJoinObserver(jo3); + addJoinObserver(jo4); + addJoinObserver(jo5); + addJoinObserver(jo6); + addJoinObserver(jo7); + } + + @Override + protected void match() { + if (!jo1.queue().isEmpty() + && !jo2.queue().isEmpty() + && !jo3.queue().isEmpty() + && !jo4.queue().isEmpty() + && !jo5.queue().isEmpty() + && !jo6.queue().isEmpty() + && !jo7.queue().isEmpty() + ) { + Notification n1 = jo1.queue().peek(); + Notification n2 = jo2.queue().peek(); + Notification n3 = jo3.queue().peek(); + Notification n4 = jo4.queue().peek(); + Notification n5 = jo5.queue().peek(); + Notification n6 = jo6.queue().peek(); + Notification n7 = jo7.queue().peek(); + + if (n1.isOnCompleted() + || n2.isOnCompleted() + || n3.isOnCompleted() + || n4.isOnCompleted() + || n5.isOnCompleted() + || n6.isOnCompleted() + || n7.isOnCompleted() + ) { + onCompleted.call(); + } else { + dequeue(); + onNext.call( + n1.getValue(), + n2.getValue(), + n3.getValue(), + n4.getValue(), + n5.getValue(), + n6.getValue(), + n7.getValue() + ); + } + } + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan8.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan8.java new file mode 100644 index 0000000000..20d054abf7 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan8.java @@ -0,0 +1,114 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.functions.Action0; +import rx.functions.Action8; + +/** + * Represents an active plan. + */ +public final class ActivePlan8 extends ActivePlan0 { + private final Action8 onNext; + private final Action0 onCompleted; + private final JoinObserver1 jo1; + private final JoinObserver1 jo2; + private final JoinObserver1 jo3; + private final JoinObserver1 jo4; + private final JoinObserver1 jo5; + private final JoinObserver1 jo6; + private final JoinObserver1 jo7; + private final JoinObserver1 jo8; + + ActivePlan8( + JoinObserver1 jo1, + JoinObserver1 jo2, + JoinObserver1 jo3, + JoinObserver1 jo4, + JoinObserver1 jo5, + JoinObserver1 jo6, + JoinObserver1 jo7, + JoinObserver1 jo8, + Action8 onNext, + Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.jo1 = jo1; + this.jo2 = jo2; + this.jo3 = jo3; + this.jo4 = jo4; + this.jo5 = jo5; + this.jo6 = jo6; + this.jo7 = jo7; + this.jo8 = jo8; + addJoinObserver(jo1); + addJoinObserver(jo2); + addJoinObserver(jo3); + addJoinObserver(jo4); + addJoinObserver(jo5); + addJoinObserver(jo6); + addJoinObserver(jo7); + addJoinObserver(jo8); + } + + @Override + protected void match() { + if (!jo1.queue().isEmpty() + && !jo2.queue().isEmpty() + && !jo3.queue().isEmpty() + && !jo4.queue().isEmpty() + && !jo5.queue().isEmpty() + && !jo6.queue().isEmpty() + && !jo7.queue().isEmpty() + && !jo8.queue().isEmpty() + ) { + Notification n1 = jo1.queue().peek(); + Notification n2 = jo2.queue().peek(); + Notification n3 = jo3.queue().peek(); + Notification n4 = jo4.queue().peek(); + Notification n5 = jo5.queue().peek(); + Notification n6 = jo6.queue().peek(); + Notification n7 = jo7.queue().peek(); + Notification n8 = jo8.queue().peek(); + + if (n1.isOnCompleted() + || n2.isOnCompleted() + || n3.isOnCompleted() + || n4.isOnCompleted() + || n5.isOnCompleted() + || n6.isOnCompleted() + || n7.isOnCompleted() + || n8.isOnCompleted() + ) { + onCompleted.call(); + } else { + dequeue(); + onNext.call( + n1.getValue(), + n2.getValue(), + n3.getValue(), + n4.getValue(), + n5.getValue(), + n6.getValue(), + n7.getValue(), + n8.getValue() + ); + } + } + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan9.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan9.java new file mode 100644 index 0000000000..d2d281de22 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan9.java @@ -0,0 +1,122 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.functions.Action0; +import rx.functions.Action9; + +/** + * Represents an active plan. + */ +public final class ActivePlan9 extends ActivePlan0 { + private final Action9 onNext; + private final Action0 onCompleted; + private final JoinObserver1 jo1; + private final JoinObserver1 jo2; + private final JoinObserver1 jo3; + private final JoinObserver1 jo4; + private final JoinObserver1 jo5; + private final JoinObserver1 jo6; + private final JoinObserver1 jo7; + private final JoinObserver1 jo8; + private final JoinObserver1 jo9; + + ActivePlan9( + JoinObserver1 jo1, + JoinObserver1 jo2, + JoinObserver1 jo3, + JoinObserver1 jo4, + JoinObserver1 jo5, + JoinObserver1 jo6, + JoinObserver1 jo7, + JoinObserver1 jo8, + JoinObserver1 jo9, + Action9 onNext, + Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.jo1 = jo1; + this.jo2 = jo2; + this.jo3 = jo3; + this.jo4 = jo4; + this.jo5 = jo5; + this.jo6 = jo6; + this.jo7 = jo7; + this.jo8 = jo8; + this.jo9 = jo9; + addJoinObserver(jo1); + addJoinObserver(jo2); + addJoinObserver(jo3); + addJoinObserver(jo4); + addJoinObserver(jo5); + addJoinObserver(jo6); + addJoinObserver(jo7); + addJoinObserver(jo8); + addJoinObserver(jo9); + } + + @Override + protected void match() { + if (!jo1.queue().isEmpty() + && !jo2.queue().isEmpty() + && !jo3.queue().isEmpty() + && !jo4.queue().isEmpty() + && !jo5.queue().isEmpty() + && !jo6.queue().isEmpty() + && !jo7.queue().isEmpty() + && !jo8.queue().isEmpty() + && !jo9.queue().isEmpty() + ) { + Notification n1 = jo1.queue().peek(); + Notification n2 = jo2.queue().peek(); + Notification n3 = jo3.queue().peek(); + Notification n4 = jo4.queue().peek(); + Notification n5 = jo5.queue().peek(); + Notification n6 = jo6.queue().peek(); + Notification n7 = jo7.queue().peek(); + Notification n8 = jo8.queue().peek(); + Notification n9 = jo9.queue().peek(); + + if (n1.isOnCompleted() + || n2.isOnCompleted() + || n3.isOnCompleted() + || n4.isOnCompleted() + || n5.isOnCompleted() + || n6.isOnCompleted() + || n7.isOnCompleted() + || n8.isOnCompleted() + || n9.isOnCompleted() + ) { + onCompleted.call(); + } else { + dequeue(); + onNext.call( + n1.getValue(), + n2.getValue(), + n3.getValue(), + n4.getValue(), + n5.getValue(), + n6.getValue(), + n7.getValue(), + n8.getValue(), + n9.getValue() + ); + } + } + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlanN.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlanN.java new file mode 100644 index 0000000000..c2a7596ed1 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlanN.java @@ -0,0 +1,68 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.ArrayList; +import java.util.List; + +import rx.Notification; +import rx.functions.Action0; +import rx.functions.ActionN; + +/** + * Represents an active plan. + */ +public final class ActivePlanN extends ActivePlan0 { + private final ActionN onNext; + private final Action0 onCompleted; + private final List> observers; + + ActivePlanN(List> observers, + ActionN onNext, + Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.observers = new ArrayList>(observers); + for (JoinObserver1 jo : this.observers) { + addJoinObserver(jo); + } + } + + @Override + protected void match() { + Object[] notifications = new Object[this.observers.size()]; + int j = 0; + int completedCount = 0; + for (JoinObserver1 jo : this.observers) { + if (jo.queue().isEmpty()) { + return; + } + Notification n = jo.queue().peek(); + if (n.isOnCompleted()) { + completedCount++; + } + notifications[j] = n.getValue(); + j++; + } + if (completedCount == j) { + onCompleted.call(); + } else { + dequeue(); + onNext.call(notifications); + } + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/JoinObserver1.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/JoinObserver1.java index c4cf5d3e25..e281ff2118 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/JoinObserver1.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/JoinObserver1.java @@ -30,7 +30,7 @@ /** * Default implementation of a join observer. */ -public final class JoinObserver1 extends Subscriber> implements JoinObserver { +final class JoinObserver1 extends Subscriber> implements JoinObserver { private Object gate; private final Observable source; private final Action1 onError; @@ -39,7 +39,7 @@ public final class JoinObserver1 extends Subscriber> implemen private final AtomicBoolean subscribed = new AtomicBoolean(false); private final SafeSubscriber> safeObserver; - public JoinObserver1(Observable source, Action1 onError) { + JoinObserver1(Observable source, Action1 onError) { this.source = source; this.onError = onError; queue = new LinkedList>(); diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern1.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern1.java index 2f39a6b3c4..c14525e255 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern1.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern1.java @@ -21,15 +21,15 @@ /** * Represents a join pattern over one observable sequence. */ -public class Pattern1 implements Pattern { - private final Observable first; +public final class Pattern1 implements Pattern { + private final Observable o1; - public Pattern1(Observable first) { - this.first = first; + public Pattern1(Observable o1) { + this.o1 = o1; } - public Observable first() { - return first; + Observable o1() { + return o1; } /** diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern2.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern2.java index b967ad2930..ea6f51d8e9 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern2.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern2.java @@ -21,21 +21,21 @@ /** * Represents a join pattern over observable sequences. */ -public class Pattern2 implements Pattern { - private final Observable first; - private final Observable second; +public final class Pattern2 implements Pattern { + private final Observable o1; + private final Observable o2; - public Pattern2(Observable first, Observable second) { - this.first = first; - this.second = second; + public Pattern2(Observable o1, Observable o2) { + this.o1 = o1; + this.o2 = o2; } - public Observable first() { - return first; + Observable o1() { + return o1; } - public Observable second() { - return second; + Observable o2() { + return o2; } /** @@ -49,9 +49,19 @@ public Pattern3 and(Observable other) { if (other == null) { throw new NullPointerException(); } - return new Pattern3(first, second, other); + return new Pattern3(o1, o2, other); } + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ public Plan0 then(Func2 selector) { if (selector == null) { throw new NullPointerException(); diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern3.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern3.java index 60d4daf2f8..5a8b231f95 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern3.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern3.java @@ -21,36 +21,54 @@ /** * Represents a join pattern over observable sequences. */ -public class Pattern3 implements Pattern { - private final Observable first; - private final Observable second; - private final Observable third; +public final class Pattern3 implements Pattern { + private final Observable o1; + private final Observable o2; + private final Observable o3; - public Pattern3(Observable first, Observable second, - Observable third) { - this.first = first; - this.second = second; - this.third = third; + public Pattern3(Observable o1, Observable o2, + Observable o3) { + this.o1 = o1; + this.o2 = o2; + this.o3 = o3; } - public Observable first() { - return first; + Observable o1() { + return o1; } - public Observable second() { - return second; + Observable o2() { + return o2; } - public Observable third() { - return third; + Observable o3() { + return o3; } - // public Pattern4 and(Observable other) { - // if (other == null) { - // throw new NullPointerException(); - // } - // return new Pattern4(first, second, third, other); - // } + /** + * Creates a pattern that matches when all three observable sequences have an available element. + * + * @param other + * Observable sequence to match with the two previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public Pattern4 and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return new Pattern4(o1, o2, o3, other); + } + + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ public Plan0 then(Func3 selector) { if (selector == null) { throw new NullPointerException(); diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern4.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern4.java new file mode 100644 index 0000000000..58d69e86a2 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern4.java @@ -0,0 +1,87 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Observable; +import rx.functions.Func4; + +/** + * Represents a join pattern over observable sequences. + */ +public final class Pattern4 implements Pattern { + private final Observable o1; + private final Observable o2; + private final Observable o3; + private final Observable o4; + + public Pattern4( + Observable o1, + Observable o2, + Observable o3, + Observable o4 + ) { + this.o1 = o1; + this.o2 = o2; + this.o3 = o3; + this.o4 = o4; + } + + Observable o1() { + return o1; + } + + Observable o2() { + return o2; + } + + Observable o3() { + return o3; + } + + Observable o4() { + return o4; + } + + /** + * Creates a pattern that matches when all four observable sequences have an available element. + * + * @param other + * Observable sequence to match with the three previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public Pattern5 and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return new Pattern5(o1, o2, o3, o4, other); + } + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ + public Plan0 then(Func4 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan4(this, selector); + } +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern5.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern5.java new file mode 100644 index 0000000000..ce068c3c41 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern5.java @@ -0,0 +1,94 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Observable; +import rx.functions.Func5; + +/** + * Represents a join pattern over observable sequences. + */ +public final class Pattern5 implements Pattern { + private final Observable o1; + private final Observable o2; + private final Observable o3; + private final Observable o4; + private final Observable o5; + + public Pattern5( + Observable o1, + Observable o2, + Observable o3, + Observable o4, + Observable o5 + ) { + this.o1 = o1; + this.o2 = o2; + this.o3 = o3; + this.o4 = o4; + this.o5 = o5; + } + + Observable o1() { + return o1; + } + + Observable o2() { + return o2; + } + + Observable o3() { + return o3; + } + + Observable o4() { + return o4; + } + + Observable o5() { + return o5; + } + + /** + * Creates a pattern that matches when all five observable sequences have an available element. + * + * @param other + * Observable sequence to match with the four previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public Pattern6 and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return new Pattern6(o1, o2, o3, o4, o5, other); + } + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ + public Plan0 then(Func5 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan5(this, selector); + } +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern6.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern6.java new file mode 100644 index 0000000000..eae8e20be4 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern6.java @@ -0,0 +1,101 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Observable; +import rx.functions.Func6; + +/** + * Represents a join pattern over observable sequences. + */ +public final class Pattern6 implements Pattern { + private final Observable o1; + private final Observable o2; + private final Observable o3; + private final Observable o4; + private final Observable o5; + private final Observable o6; + + public Pattern6( + Observable o1, + Observable o2, + Observable o3, + Observable o4, + Observable o5, + Observable o6 + ) { + this.o1 = o1; + this.o2 = o2; + this.o3 = o3; + this.o4 = o4; + this.o5 = o5; + this.o6 = o6; + } + + Observable o1() { + return o1; + } + + Observable o2() { + return o2; + } + + Observable o3() { + return o3; + } + + Observable o4() { + return o4; + } + + Observable o5() { + return o5; + } + + Observable o6() { + return o6; + } + + /** + * Creates a pattern that matches when all six observable sequences have an available element. + * + * @param other + * Observable sequence to match with the five previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public Pattern7 and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return new Pattern7(o1, o2, o3, o4, o5, o6, other); + } + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ + public Plan0 then(Func6 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan6(this, selector); + } +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern7.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern7.java new file mode 100644 index 0000000000..70df807fd8 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern7.java @@ -0,0 +1,108 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Observable; +import rx.functions.Func7; + +/** + * Represents a join pattern over observable sequences. + */ +public final class Pattern7 implements Pattern { + private final Observable o1; + private final Observable o2; + private final Observable o3; + private final Observable o4; + private final Observable o5; + private final Observable o6; + private final Observable o7; + + public Pattern7( + Observable o1, + Observable o2, + Observable o3, + Observable o4, + Observable o5, + Observable o6, + Observable o7 + ) { + this.o1 = o1; + this.o2 = o2; + this.o3 = o3; + this.o4 = o4; + this.o5 = o5; + this.o6 = o6; + this.o7 = o7; + } + + Observable o1() { + return o1; + } + + Observable o2() { + return o2; + } + + Observable o3() { + return o3; + } + + Observable o4() { + return o4; + } + + Observable o5() { + return o5; + } + + Observable o6() { + return o6; + } + + Observable o7() { + return o7; + } + + /** + * Creates a pattern that matches when all seven observable sequences have an available element. + * + * @param other + * Observable sequence to match with the six previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public Pattern8 and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return new Pattern8(o1, o2, o3, o4, o5, o6, o7, other); + } + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ + public Plan0 then(Func7 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan7(this, selector); + } +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern8.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern8.java new file mode 100644 index 0000000000..17557ae3e6 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern8.java @@ -0,0 +1,115 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Observable; +import rx.functions.Func8; + +/** + * Represents a join pattern over observable sequences. + */ +public final class Pattern8 implements Pattern { + private final Observable o1; + private final Observable o2; + private final Observable o3; + private final Observable o4; + private final Observable o5; + private final Observable o6; + private final Observable o7; + private final Observable o8; + + public Pattern8( + Observable o1, + Observable o2, + Observable o3, + Observable o4, + Observable o5, + Observable o6, + Observable o7, + Observable o8 + ) { + this.o1 = o1; + this.o2 = o2; + this.o3 = o3; + this.o4 = o4; + this.o5 = o5; + this.o6 = o6; + this.o7 = o7; + this.o8 = o8; + } + + Observable o1() { + return o1; + } + + Observable o2() { + return o2; + } + + Observable o3() { + return o3; + } + + Observable o4() { + return o4; + } + + Observable o5() { + return o5; + } + + Observable o6() { + return o6; + } + + Observable o7() { + return o7; + } + + Observable o8() { + return o8; + } + + /** + * Creates a pattern that matches when all eight observable sequences have an available element. + * + * @param other + * Observable sequence to match with the seven previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public Pattern9 and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return new Pattern9(o1, o2, o3, o4, o5, o6, o7, o8, other); + } + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ + public Plan0 then(Func8 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan8(this, selector); + } +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern9.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern9.java new file mode 100644 index 0000000000..238e86e4be --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Pattern9.java @@ -0,0 +1,136 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.ArrayList; +import java.util.List; + +import rx.Observable; +import rx.functions.Func9; + +/** + * Represents a join pattern over observable sequences. + */ +public final class Pattern9 implements Pattern { + private final Observable o1; + private final Observable o2; + private final Observable o3; + private final Observable o4; + private final Observable o5; + private final Observable o6; + private final Observable o7; + private final Observable o8; + private final Observable o9; + + public Pattern9( + Observable o1, + Observable o2, + Observable o3, + Observable o4, + Observable o5, + Observable o6, + Observable o7, + Observable o8, + Observable o9 + ) { + this.o1 = o1; + this.o2 = o2; + this.o3 = o3; + this.o4 = o4; + this.o5 = o5; + this.o6 = o6; + this.o7 = o7; + this.o8 = o8; + this.o9 = o9; + } + + Observable o1() { + return o1; + } + + Observable o2() { + return o2; + } + + Observable o3() { + return o3; + } + + Observable o4() { + return o4; + } + + Observable o5() { + return o5; + } + + Observable o6() { + return o6; + } + + Observable o7() { + return o7; + } + + Observable o8() { + return o8; + } + + Observable o9() { + return o9; + } + + /** + * Creates a pattern that matches when all nine observable sequences have an available element. + * + * @param other + * Observable sequence to match with the eight previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public PatternN and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + List> list = new ArrayList>(); + list.add(o1); + list.add(o2); + list.add(o3); + list.add(o4); + list.add(o5); + list.add(o6); + list.add(o7); + list.add(o8); + list.add(o9); + list.add(other); + return new PatternN(list); + } + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ + public Plan0 then(Func9 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan9(this, selector); + } +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/PatternN.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/PatternN.java new file mode 100644 index 0000000000..085235ed22 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/PatternN.java @@ -0,0 +1,84 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.ArrayList; +import java.util.List; + +import rx.Observable; +import rx.functions.FuncN; + +/** + * Represents a join pattern over observable sequences. + */ +public final class PatternN implements Pattern { + private final List> observables; + + public PatternN(List> observables) { + this.observables = observables; + } + + public PatternN(List> observables, Observable other) { + this.observables = new ArrayList>(observables); + this.observables.add(other); + } + + /** + * @return the number of observables in this pattern. + */ + int size() { + return observables.size(); + } + /** + * Returns the specific Observable from this pattern. + * @param index the index + * @return the observable + */ + Observable get(int index) { + return observables.get(index); + } + + /** + * Creates a pattern that matches when all previous observable sequences have an available element. + * + * @param other + * Observable sequence to match with the previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public PatternN and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return new PatternN(observables, other); + } + + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * + * @param selector + * the function that will be invoked for elements in the source sequences. + * @return the plan for the matching + * @throws NullPointerException + * if selector is null + */ + public Plan0 then(FuncN selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new PlanN(this, selector); + } +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan0.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan0.java index c10d5c1be7..452cf30e4a 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan0.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan0.java @@ -29,7 +29,7 @@ public abstract ActivePlan0 activate(Map externalSubscript Observer observer, Action1 deactivate); @SuppressWarnings("unchecked") - public static JoinObserver1 createObserver( + public static final JoinObserver1 createObserver( Map externalSubscriptions, Observable observable, Action1 onError diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan1.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan1.java index 7d1912e979..e70fa727c5 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan1.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan1.java @@ -27,28 +27,20 @@ /** * Represents an execution plan for join patterns. */ -public class Plan1 extends Plan0 { - protected Pattern1 expression; - protected Func1 selector; +public final class Plan1 extends Plan0 { + protected final Pattern1 expression; + protected final Func1 selector; public Plan1(Pattern1 expression, Func1 selector) { this.expression = expression; this.selector = selector; } - public Pattern1 expression() { - return expression; - } - - public Func1 selector() { - return selector; - } - @Override public ActivePlan0 activate(Map externalSubscriptions, final Observer observer, final Action1 deactivate) { Action1 onError = Actions.onErrorFrom(observer); - final JoinObserver1 firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError); + final JoinObserver1 firstJoinObserver = createObserver(externalSubscriptions, expression.o1(), onError); final AtomicReference> self = new AtomicReference>(); @@ -68,8 +60,9 @@ public void call(T1 t1) { new Action0() { @Override public void call() { - firstJoinObserver.removeActivePlan(self.get()); - deactivate.call(self.get()); + ActivePlan0 ap = self.get(); + firstJoinObserver.removeActivePlan(ap); + deactivate.call(ap); } }); diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan2.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan2.java index 06495a51ca..288fad97a7 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan2.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan2.java @@ -28,9 +28,9 @@ /** * Represents an execution plan for join patterns. */ -public class Plan2 extends Plan0 { - protected Pattern2 expression; - protected Func2 selector; +public final class Plan2 extends Plan0 { + protected final Pattern2 expression; + protected final Func2 selector; public Plan2(Pattern2 expression, Func2 selector) { this.expression = expression; @@ -42,12 +42,12 @@ public ActivePlan0 activate(Map externalSubscriptions, final Observer observer, final Action1 deactivate) { Action1 onError = Actions.onErrorFrom(observer); - final JoinObserver1 firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError); - final JoinObserver1 secondJoinObserver = createObserver(externalSubscriptions, expression.second(), onError); + final JoinObserver1 jo1 = createObserver(externalSubscriptions, expression.o1(), onError); + final JoinObserver1 jo2 = createObserver(externalSubscriptions, expression.o2(), onError); final AtomicReference> self = new AtomicReference>(); - ActivePlan2 activePlan = new ActivePlan2(firstJoinObserver, secondJoinObserver, new Action2() { + ActivePlan2 activePlan = new ActivePlan2(jo1, jo2, new Action2() { @Override public void call(T1 t1, T2 t2) { R result; @@ -63,16 +63,17 @@ public void call(T1 t1, T2 t2) { new Action0() { @Override public void call() { - firstJoinObserver.removeActivePlan(self.get()); - secondJoinObserver.removeActivePlan(self.get()); - deactivate.call(self.get()); + ActivePlan0 ap = self.get(); + jo1.removeActivePlan(ap); + jo2.removeActivePlan(ap); + deactivate.call(ap); } }); self.set(activePlan); - firstJoinObserver.addActivePlan(activePlan); - secondJoinObserver.addActivePlan(activePlan); + jo1.addActivePlan(activePlan); + jo2.addActivePlan(activePlan); return activePlan; } diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan3.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan3.java index 9ed8fa4313..4f95ac3b60 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan3.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan3.java @@ -28,9 +28,9 @@ /** * Represents an execution plan for join patterns. */ -public class Plan3 extends Plan0 { - protected Pattern3 expression; - protected Func3 selector; +public final class Plan3 extends Plan0 { + protected final Pattern3 expression; + protected final Func3 selector; public Plan3(Pattern3 expression, Func3 selector) { this.expression = expression; @@ -42,14 +42,15 @@ public ActivePlan0 activate(Map externalSubscriptions, final Observer observer, final Action1 deactivate) { Action1 onError = Actions.onErrorFrom(observer); - final JoinObserver1 firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError); - final JoinObserver1 secondJoinObserver = createObserver(externalSubscriptions, expression.second(), onError); - final JoinObserver1 thirdJoinObserver = createObserver(externalSubscriptions, expression.third(), onError); + final JoinObserver1 jo1 = createObserver(externalSubscriptions, expression.o1(), onError); + final JoinObserver1 jo2 = createObserver(externalSubscriptions, expression.o2(), onError); + final JoinObserver1 jo3 = createObserver(externalSubscriptions, expression.o3(), onError); final AtomicReference> self = new AtomicReference>(); - ActivePlan3 activePlan = new ActivePlan3(firstJoinObserver, secondJoinObserver, - thirdJoinObserver, new Action3() { + ActivePlan3 activePlan = new ActivePlan3( + jo1, jo2, jo3, + new Action3() { @Override public void call(T1 t1, T2 t2, T3 t3) { R result; @@ -65,18 +66,19 @@ public void call(T1 t1, T2 t2, T3 t3) { new Action0() { @Override public void call() { - firstJoinObserver.removeActivePlan(self.get()); - secondJoinObserver.removeActivePlan(self.get()); - thirdJoinObserver.removeActivePlan(self.get()); - deactivate.call(self.get()); + ActivePlan0 ap = self.get(); + jo1.removeActivePlan(ap); + jo2.removeActivePlan(ap); + jo3.removeActivePlan(ap); + deactivate.call(ap); } }); self.set(activePlan); - firstJoinObserver.addActivePlan(activePlan); - secondJoinObserver.addActivePlan(activePlan); - thirdJoinObserver.addActivePlan(activePlan); + jo1.addActivePlan(activePlan); + jo2.addActivePlan(activePlan); + jo3.addActivePlan(activePlan); return activePlan; } diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan4.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan4.java new file mode 100644 index 0000000000..3856be263b --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan4.java @@ -0,0 +1,89 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observer; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action4; +import rx.functions.Actions; +import rx.functions.Func4; + +/** + * Represents an execution plan for join patterns. + */ +public final class Plan4 extends Plan0 { + protected final Pattern4 expression; + protected final Func4 selector; + + public Plan4(Pattern4 expression, Func4 selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 jo1 = createObserver(externalSubscriptions, expression.o1(), onError); + final JoinObserver1 jo2 = createObserver(externalSubscriptions, expression.o2(), onError); + final JoinObserver1 jo3 = createObserver(externalSubscriptions, expression.o3(), onError); + final JoinObserver1 jo4 = createObserver(externalSubscriptions, expression.o4(), onError); + + final AtomicReference self = new AtomicReference(); + + ActivePlan0 activePlan = new ActivePlan4( + jo1, jo2, jo3, jo4, + new Action4() { + @Override + public void call(T1 t1, T2 t2, T3 t3, T4 t4) { + R result; + try { + result = selector.call(t1, t2, t3, t4); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + ActivePlan0 ap = self.get(); + jo1.removeActivePlan(ap); + jo2.removeActivePlan(ap); + jo3.removeActivePlan(ap); + jo4.removeActivePlan(ap); + deactivate.call(ap); + } + }); + + self.set(activePlan); + + jo1.addActivePlan(activePlan); + jo2.addActivePlan(activePlan); + jo3.addActivePlan(activePlan); + jo4.addActivePlan(activePlan); + + return activePlan; + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan5.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan5.java new file mode 100644 index 0000000000..98b47448d2 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan5.java @@ -0,0 +1,92 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observer; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action5; +import rx.functions.Actions; +import rx.functions.Func5; + +/** + * Represents an execution plan for join patterns. + */ +public final class Plan5 extends Plan0 { + protected final Pattern5 expression; + protected final Func5 selector; + + public Plan5(Pattern5 expression, Func5 selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 jo1 = createObserver(externalSubscriptions, expression.o1(), onError); + final JoinObserver1 jo2 = createObserver(externalSubscriptions, expression.o2(), onError); + final JoinObserver1 jo3 = createObserver(externalSubscriptions, expression.o3(), onError); + final JoinObserver1 jo4 = createObserver(externalSubscriptions, expression.o4(), onError); + final JoinObserver1 jo5 = createObserver(externalSubscriptions, expression.o5(), onError); + + final AtomicReference self = new AtomicReference(); + + ActivePlan0 activePlan = new ActivePlan5( + jo1, jo2, jo3, jo4, jo5, + new Action5() { + @Override + public void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5) { + R result; + try { + result = selector.call(t1, t2, t3, t4, t5); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + ActivePlan0 ap = self.get(); + jo1.removeActivePlan(ap); + jo2.removeActivePlan(ap); + jo3.removeActivePlan(ap); + jo4.removeActivePlan(ap); + jo5.removeActivePlan(ap); + deactivate.call(ap); + } + }); + + self.set(activePlan); + + jo1.addActivePlan(activePlan); + jo2.addActivePlan(activePlan); + jo3.addActivePlan(activePlan); + jo4.addActivePlan(activePlan); + jo5.addActivePlan(activePlan); + + return activePlan; + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan6.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan6.java new file mode 100644 index 0000000000..cd87e855cf --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan6.java @@ -0,0 +1,95 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observer; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action6; +import rx.functions.Actions; +import rx.functions.Func6; + +/** + * Represents an execution plan for join patterns. + */ +public final class Plan6 extends Plan0 { + protected final Pattern6 expression; + protected final Func6 selector; + + public Plan6(Pattern6 expression, Func6 selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 jo1 = createObserver(externalSubscriptions, expression.o1(), onError); + final JoinObserver1 jo2 = createObserver(externalSubscriptions, expression.o2(), onError); + final JoinObserver1 jo3 = createObserver(externalSubscriptions, expression.o3(), onError); + final JoinObserver1 jo4 = createObserver(externalSubscriptions, expression.o4(), onError); + final JoinObserver1 jo5 = createObserver(externalSubscriptions, expression.o5(), onError); + final JoinObserver1 jo6 = createObserver(externalSubscriptions, expression.o6(), onError); + + final AtomicReference self = new AtomicReference(); + + ActivePlan0 activePlan = new ActivePlan6( + jo1, jo2, jo3, jo4, jo5, jo6, + new Action6() { + @Override + public void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6) { + R result; + try { + result = selector.call(t1, t2, t3, t4, t5, t6); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + ActivePlan0 ap = self.get(); + jo1.removeActivePlan(ap); + jo2.removeActivePlan(ap); + jo3.removeActivePlan(ap); + jo4.removeActivePlan(ap); + jo5.removeActivePlan(ap); + jo6.removeActivePlan(ap); + deactivate.call(ap); + } + }); + + self.set(activePlan); + + jo1.addActivePlan(activePlan); + jo2.addActivePlan(activePlan); + jo3.addActivePlan(activePlan); + jo4.addActivePlan(activePlan); + jo5.addActivePlan(activePlan); + jo6.addActivePlan(activePlan); + + return activePlan; + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan7.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan7.java new file mode 100644 index 0000000000..64eef70c12 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan7.java @@ -0,0 +1,98 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observer; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action7; +import rx.functions.Actions; +import rx.functions.Func7; + +/** + * Represents an execution plan for join patterns. + */ +public final class Plan7 extends Plan0 { + protected final Pattern7 expression; + protected final Func7 selector; + + public Plan7(Pattern7 expression, Func7 selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 jo1 = createObserver(externalSubscriptions, expression.o1(), onError); + final JoinObserver1 jo2 = createObserver(externalSubscriptions, expression.o2(), onError); + final JoinObserver1 jo3 = createObserver(externalSubscriptions, expression.o3(), onError); + final JoinObserver1 jo4 = createObserver(externalSubscriptions, expression.o4(), onError); + final JoinObserver1 jo5 = createObserver(externalSubscriptions, expression.o5(), onError); + final JoinObserver1 jo6 = createObserver(externalSubscriptions, expression.o6(), onError); + final JoinObserver1 jo7 = createObserver(externalSubscriptions, expression.o7(), onError); + + final AtomicReference self = new AtomicReference(); + + ActivePlan0 activePlan = new ActivePlan7( + jo1, jo2, jo3, jo4, jo5, jo6, jo7, + new Action7() { + @Override + public void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7) { + R result; + try { + result = selector.call(t1, t2, t3, t4, t5, t6, t7); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + ActivePlan0 ap = self.get(); + jo1.removeActivePlan(ap); + jo2.removeActivePlan(ap); + jo3.removeActivePlan(ap); + jo4.removeActivePlan(ap); + jo5.removeActivePlan(ap); + jo6.removeActivePlan(ap); + jo7.removeActivePlan(ap); + deactivate.call(ap); + } + }); + + self.set(activePlan); + + jo1.addActivePlan(activePlan); + jo2.addActivePlan(activePlan); + jo3.addActivePlan(activePlan); + jo4.addActivePlan(activePlan); + jo5.addActivePlan(activePlan); + jo6.addActivePlan(activePlan); + jo7.addActivePlan(activePlan); + + return activePlan; + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan8.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan8.java new file mode 100644 index 0000000000..1dead50de7 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan8.java @@ -0,0 +1,101 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observer; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action8; +import rx.functions.Actions; +import rx.functions.Func8; + +/** + * Represents an execution plan for join patterns. + */ +public final class Plan8 extends Plan0 { + protected final Pattern8 expression; + protected final Func8 selector; + + public Plan8(Pattern8 expression, Func8 selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 jo1 = createObserver(externalSubscriptions, expression.o1(), onError); + final JoinObserver1 jo2 = createObserver(externalSubscriptions, expression.o2(), onError); + final JoinObserver1 jo3 = createObserver(externalSubscriptions, expression.o3(), onError); + final JoinObserver1 jo4 = createObserver(externalSubscriptions, expression.o4(), onError); + final JoinObserver1 jo5 = createObserver(externalSubscriptions, expression.o5(), onError); + final JoinObserver1 jo6 = createObserver(externalSubscriptions, expression.o6(), onError); + final JoinObserver1 jo7 = createObserver(externalSubscriptions, expression.o7(), onError); + final JoinObserver1 jo8 = createObserver(externalSubscriptions, expression.o8(), onError); + + final AtomicReference self = new AtomicReference(); + + ActivePlan0 activePlan = new ActivePlan8( + jo1, jo2, jo3, jo4, jo5, jo6, jo7, jo8, + new Action8() { + @Override + public void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8) { + R result; + try { + result = selector.call(t1, t2, t3, t4, t5, t6, t7, t8); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + ActivePlan0 ap = self.get(); + jo1.removeActivePlan(ap); + jo2.removeActivePlan(ap); + jo3.removeActivePlan(ap); + jo4.removeActivePlan(ap); + jo5.removeActivePlan(ap); + jo6.removeActivePlan(ap); + jo7.removeActivePlan(ap); + jo8.removeActivePlan(ap); + deactivate.call(ap); + } + }); + + self.set(activePlan); + + jo1.addActivePlan(activePlan); + jo2.addActivePlan(activePlan); + jo3.addActivePlan(activePlan); + jo4.addActivePlan(activePlan); + jo5.addActivePlan(activePlan); + jo6.addActivePlan(activePlan); + jo7.addActivePlan(activePlan); + jo8.addActivePlan(activePlan); + + return activePlan; + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan9.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan9.java new file mode 100644 index 0000000000..6a628eef18 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/Plan9.java @@ -0,0 +1,104 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observer; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action9; +import rx.functions.Actions; +import rx.functions.Func9; + +/** + * Represents an execution plan for join patterns. + */ +public final class Plan9 extends Plan0 { + protected final Pattern9 expression; + protected final Func9 selector; + + public Plan9(Pattern9 expression, Func9 selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 jo1 = createObserver(externalSubscriptions, expression.o1(), onError); + final JoinObserver1 jo2 = createObserver(externalSubscriptions, expression.o2(), onError); + final JoinObserver1 jo3 = createObserver(externalSubscriptions, expression.o3(), onError); + final JoinObserver1 jo4 = createObserver(externalSubscriptions, expression.o4(), onError); + final JoinObserver1 jo5 = createObserver(externalSubscriptions, expression.o5(), onError); + final JoinObserver1 jo6 = createObserver(externalSubscriptions, expression.o6(), onError); + final JoinObserver1 jo7 = createObserver(externalSubscriptions, expression.o7(), onError); + final JoinObserver1 jo8 = createObserver(externalSubscriptions, expression.o8(), onError); + final JoinObserver1 jo9 = createObserver(externalSubscriptions, expression.o9(), onError); + + final AtomicReference self = new AtomicReference(); + + ActivePlan0 activePlan = new ActivePlan9( + jo1, jo2, jo3, jo4, jo5, jo6, jo7, jo8, jo9, + new Action9() { + @Override + public void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9) { + R result; + try { + result = selector.call(t1, t2, t3, t4, t5, t6, t7, t8, t9); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + ActivePlan0 ap = self.get(); + jo1.removeActivePlan(ap); + jo2.removeActivePlan(ap); + jo3.removeActivePlan(ap); + jo4.removeActivePlan(ap); + jo5.removeActivePlan(ap); + jo6.removeActivePlan(ap); + jo7.removeActivePlan(ap); + jo8.removeActivePlan(ap); + jo9.removeActivePlan(ap); + deactivate.call(ap); + } + }); + + self.set(activePlan); + + jo1.addActivePlan(activePlan); + jo2.addActivePlan(activePlan); + jo3.addActivePlan(activePlan); + jo4.addActivePlan(activePlan); + jo5.addActivePlan(activePlan); + jo6.addActivePlan(activePlan); + jo7.addActivePlan(activePlan); + jo8.addActivePlan(activePlan); + jo9.addActivePlan(activePlan); + + return activePlan; + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/PlanN.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/PlanN.java new file mode 100644 index 0000000000..33e8591320 --- /dev/null +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/PlanN.java @@ -0,0 +1,85 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observer; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.ActionN; +import rx.functions.Actions; +import rx.functions.FuncN; + +/** + * Represents an execution plan for join patterns. + */ +public final class PlanN extends Plan0 { + protected final PatternN expression; + protected final FuncN selector; + + public PlanN(PatternN expression, FuncN selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final List> observers = new ArrayList>(); + for (int i = 0; i < expression.size(); i++) { + observers.add(createObserver(externalSubscriptions, expression.get(i), onError)); + } + final AtomicReference self = new AtomicReference(); + + ActivePlanN activePlan = new ActivePlanN(observers, new ActionN() { + @Override + public void call(Object... args) { + R result; + try { + result = selector.call(args); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + for (JoinObserver1 jo : observers) { + jo.removeActivePlan(self.get()); + } + deactivate.call(self.get()); + } + }); + + self.set(activePlan); + + for (JoinObserver1 jo : observers) { + jo.addActivePlan(activePlan); + } + + return activePlan; + } + +} diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperatorJoinPatterns.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperatorJoinPatterns.java index 98e93479e7..5ec7e1c3df 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperatorJoinPatterns.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperatorJoinPatterns.java @@ -37,7 +37,8 @@ /** * Join patterns: And, Then, When. */ -public class OperatorJoinPatterns { +public final class OperatorJoinPatterns { + public OperatorJoinPatterns() { throw new IllegalStateException("No instances!"); } /** * Creates a pattern that matches when both observable sequences have an available element. */ diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java index 2faefd2d4c..3b8762e27f 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java @@ -6,7 +6,12 @@ import rx.joins.Plan0; import rx.joins.operators.OperatorJoinPatterns; -public class JoinObservable { +/** + * Represents an observable that supports join operations. + * + * @param the value type joined + */ +public final class JoinObservable { private final Observable o; @@ -14,6 +19,11 @@ private JoinObservable(Observable o) { this.o = o; } + /** + * Creates a JoinObservable from a regular Observable. + * @param o the observable to wrap + * @return the created JoinObservable instance + */ public static JoinObservable from(Observable o) { return new JoinObservable(o); } diff --git a/rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperatorJoinsTest.java b/rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperatorJoinsTest.java index b6b5dfe5a3..4d9e01f2a8 100644 --- a/rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperatorJoinsTest.java +++ b/rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperatorJoinsTest.java @@ -15,8 +15,10 @@ */ package rx.joins.operators; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -31,10 +33,20 @@ import rx.Observable; import rx.Observer; +import rx.exceptions.TestException; +import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; import rx.functions.Func3; +import rx.functions.Func4; +import rx.functions.Func5; +import rx.functions.Func6; +import rx.functions.Func7; +import rx.functions.Func8; +import rx.functions.Func9; +import rx.functions.FuncN; import rx.functions.Functions; +import rx.joins.PatternN; import rx.joins.Plan0; import rx.observables.JoinObservable; import rx.observers.TestSubscriber; @@ -44,12 +56,76 @@ public class OperatorJoinsTest { @Mock Observer observer; - Func2 add2 = new Func2() { - @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; - } - }; + static final class Adder implements + Func2, + Func3, + Func4, + Func5, + Func6, + Func7, + Func8, + Func9, + FuncN + { + + @Override + public Integer call(Object... args) { + int sum = 0; + + for(Object o : args) { + sum += (Integer)o; + } + + return sum; + } + + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, + Integer t5, Integer t6, Integer t7, Integer t8, Integer t9) { + return t1 + t2 + t3 + t4 + t5 + t6 + t7 + t8 + t9; + } + + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, + Integer t5, Integer t6, Integer t7, Integer t8) { + return t1 + t2 + t3 + t4 + t5 + t6 + t7 + t8; + } + + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, + Integer t5, Integer t6, Integer t7) { + return t1 + t2 + t3 + t4 + t5 + t6 + t7; + } + + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, + Integer t5, Integer t6) { + return t1 + t2 + t3 + t4 + t5 + t6; + } + + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, + Integer t5) { + return t1 + t2 + t3 + t4 + t5; + } + + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4) { + return t1 + t2 + t3 + t4; + } + + @Override + public Integer call(Integer t1, Integer t2, Integer t3) { + return t1 + t2 + t3; + } + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + } + Adder add = new Adder(); Func2 mul2 = new Func2() { @Override public Integer call(Integer t1, Integer t2) { @@ -63,30 +139,73 @@ public Integer call(Integer t1, Integer t2) { } }; - Func3 add3 = new Func3() { - @Override - public Integer call(Integer t1, Integer t2, Integer t3) { - return t1 + t2 + t3; - } - }; - Func1 func1Throw = new Func1() { - @Override - public Integer call(Integer t1) { - throw new RuntimeException("Forced failure"); - } - }; - Func2 func2Throw = new Func2() { - @Override - public Integer call(Integer t1, Integer t2) { - throw new RuntimeException("Forced failure"); - } - }; - Func3 func3Throw = new Func3() { - @Override - public Integer call(Integer t1, Integer t2, Integer t3) { - throw new RuntimeException("Forced failure"); - } - }; + static final class ThrowFunc implements + Func0, + Func1, + Func2, + Func3, + Func4, + Func5, + Func6, + Func7, + Func8, + Func9, + FuncN + { + @Override + public R call() { + throw new TestException("Forced failure"); + } + @Override + public R call(Integer t1) { + return call(); + } + @Override + public R call(Object... args) { + return call(); + } + @Override + public R call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, + Integer t6, Integer t7, Integer t8, Integer t9) { + return call(); + } + @Override + public R call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, + Integer t6, Integer t7, Integer t8) { + return call(); + } + @Override + public R call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, + Integer t6, Integer t7) { + return call(); + } + @Override + public R call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, + Integer t6) { + return call(); + } + @Override + public R call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) { + return call(); + } + @Override + public R call(Integer t1, Integer t2, Integer t3, Integer t4) { + return call(); + } + @Override + public R call(Integer t1, Integer t2, Integer t3) { + return call(); + } + @Override + public R call(Integer t1, Integer t2) { + return call(); + } + } + ThrowFunc throwFunc = new ThrowFunc(); + + Observable some = Observable.just(1); + + Observable error = Observable.error(new TestException("Forced failure")); @Before public void before() { @@ -95,196 +214,164 @@ public void before() { @Test(expected = NullPointerException.class) public void and2ArgumentNull() { - Observable some = Observable.just(1); JoinObservable.from(some).and(null); } @Test(expected = NullPointerException.class) public void and3argumentNull() { - Observable some = Observable.just(1); JoinObservable.from(some).and(some).and(null); } + void verifyAnd(JoinObservable m, int count) { + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + m.toObservable().subscribe(o); + + verify(o, never()).onError(any(Throwable.class)); + verify(o, times(1)).onNext(count); + verify(o, times(1)).onCompleted(); + } + void verifyError(JoinObservable m) { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + m.toObservable().subscribe(o); + + verify(o, times(1)).onError(any(TestException.class)); + verify(o, never()).onNext(any(Integer.class)); + verify(o, never()).onCompleted(); + } + @Test public void and2() { - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(some).and(some).then(add2)).toObservable(); - - m.subscribe(observer); - - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onNext(2); - verify(observer, times(1)).onCompleted(); + verifyAnd(JoinObservable.when(JoinObservable.from(some).and(some).then(add)), 2); } @Test public void and2Error1() { - Observable error = Observable.error(new RuntimeException("Forced failure")); - - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(error).and(some).then(add2)).toObservable(); - - m.subscribe(observer); - - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + verifyError(JoinObservable.when(JoinObservable.from(error).and(some).then(add))); } @Test public void and2Error2() { - Observable error = Observable.error(new RuntimeException("Forced failure")); - - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(some).and(error).then(add2)).toObservable(); - - m.subscribe(observer); - - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + verifyError(JoinObservable.when(JoinObservable.from(some).and(error).then(add))); } @Test public void and3() { - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(some).and(some).and(some).then(add3)).toObservable(); - - m.subscribe(observer); - - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onNext(3); - verify(observer, times(1)).onCompleted(); + verifyAnd(JoinObservable.when(JoinObservable.from(some).and(some).and(some).then(add)), 3); } @Test public void and3Error1() { - Observable error = Observable.error(new RuntimeException("Forced failure")); - - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(error).and(some).and(some).then(add3)).toObservable(); - - m.subscribe(observer); - - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + verifyError(JoinObservable.when(JoinObservable.from(error).and(some).and(some).then(add))); } @Test public void and3Error2() { - Observable error = Observable.error(new RuntimeException("Forced failure")); - - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(some).and(error).and(some).then(add3)).toObservable(); - - m.subscribe(observer); - - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + verifyError(JoinObservable.when(JoinObservable.from(some).and(error).and(some).then(add))); } @Test public void and3Error3() { - Observable error = Observable.error(new RuntimeException("Forced failure")); - - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(some).and(some).and(error).then(add3)).toObservable(); - - m.subscribe(observer); - - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + verifyError(JoinObservable.when(JoinObservable.from(some).and(some).and(error).then(add))); } @Test(expected = NullPointerException.class) public void thenArgumentNull() { - Observable some = Observable.just(1); - JoinObservable.from(some).then(null); } @Test(expected = NullPointerException.class) public void then2ArgumentNull() { - Observable some = Observable.just(1); - JoinObservable.from(some).and(some).then(null); } @Test(expected = NullPointerException.class) public void then3ArgumentNull() { - Observable some = Observable.just(1); - JoinObservable.from(some).and(some).and(some).then(null); } + + @Test(expected = NullPointerException.class) + public void then4ArgumentNull() { + JoinObservable.from(some).and(some).and(some).and(some).then(null); + } - @Test - public void then1() { - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(some).then(Functions. identity())).toObservable(); - m.subscribe(observer); + @Test(expected = NullPointerException.class) + public void then5ArgumentNull() { + JoinObservable.from(some).and(some).and(some).and(some).and(some).then(null); + } - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onNext(1); - verify(observer, times(1)).onCompleted(); + @Test(expected = NullPointerException.class) + public void then6ArgumentNull() { + JoinObservable.from(some).and(some).and(some).and(some).and(some).and(some).then(null); } - @Test - public void then1Error() { - Observable some = Observable.error(new RuntimeException("Forced failure")); + @Test(expected = NullPointerException.class) + public void then7ArgumentNull() { + JoinObservable.from(some).and(some).and(some).and(some).and(some).and(some).and(some).then(null); + } - Observable m = JoinObservable.when(JoinObservable.from(some).then(Functions. identity())).toObservable(); - m.subscribe(observer); + @Test(expected = NullPointerException.class) + public void then8ArgumentNull() { + JoinObservable.from(some).and(some).and(some).and(some).and(some).and(some).and(some).and(some).then(null); + } - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + @Test(expected = NullPointerException.class) + public void then9ArgumentNull() { + JoinObservable.from(some).and(some).and(some).and(some).and(some).and(some).and(some).and(some).and(some).then(null); } @Test - public void then1Throws() { - Observable some = Observable.just(1); + public void thenNArgumentNull() { + for (int n = 10; n < 100; n++) { + PatternN p = JoinObservable.from(some).and(some) + .and(some).and(some) + .and(some).and(some) + .and(some).and(some) + .and(some).and(some); + try { + for (int j = 0; j < n - 10; j++) { + p = p.and(some); + } + p.then(null); + fail("Failed to throw exception with pattern length " + n); + } catch (NullPointerException ex) { + // expected, continue + } + } + } - Observable m = JoinObservable.when(JoinObservable.from(some).then(func1Throw)).toObservable(); - m.subscribe(observer); + @Test(expected = NullPointerException.class) + public void then10ArgumentNull() { + JoinObservable.from(some).and(some).and(some).and(some).and(some).and(some).and(some).and(some).and(some).and(some).then(null); + } - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + @Test + public void then1() { + verifyAnd(JoinObservable.when(JoinObservable.from(some).then(Functions. identity())), 1); } @Test - public void then2Throws() { - Observable some = Observable.just(1); + public void then1Error() { + verifyError(JoinObservable.when(JoinObservable.from(error).then(Functions. identity()))); + } - Observable m = JoinObservable.when(JoinObservable.from(some).and(some).then(func2Throw)).toObservable(); - m.subscribe(observer); + @Test + public void then1Throws() { + verifyError(JoinObservable.when(JoinObservable.from(some).then(throwFunc))); + } - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + @Test + public void then2Throws() { + verifyError(JoinObservable.when(JoinObservable.from(some).and(some).then(throwFunc))); } @Test public void then3Throws() { - Observable some = Observable.just(1); - - Observable m = JoinObservable.when(JoinObservable.from(some).and(some).and(some).then(func3Throw)).toObservable(); - m.subscribe(observer); - - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onNext(any(Integer.class)); - verify(observer, never()).onCompleted(); + verifyError(JoinObservable.when(JoinObservable.from(some).and(some).and(some).then(throwFunc))); } @Test(expected = NullPointerException.class) @@ -302,7 +389,7 @@ public void whenMultipleSymmetric() { Observable source1 = Observable.from(1, 2, 3); Observable source2 = Observable.from(4, 5, 6); - Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable(); + Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add)).toObservable(); m.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); @@ -317,7 +404,7 @@ public void whenMultipleAsymSymmetric() { Observable source1 = Observable.from(1, 2, 3); Observable source2 = Observable.from(4, 5); - Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable(); + Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add)).toObservable(); m.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); @@ -331,7 +418,7 @@ public void whenEmptyEmpty() { Observable source1 = Observable.empty(); Observable source2 = Observable.empty(); - Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable(); + Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add)).toObservable(); m.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); @@ -344,7 +431,7 @@ public void whenNeverNever() { Observable source1 = Observable.never(); Observable source2 = Observable.never(); - Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable(); + Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add)).toObservable(); m.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); @@ -355,9 +442,9 @@ public void whenNeverNever() { @Test public void whenThrowNonEmpty() { Observable source1 = Observable.empty(); - Observable source2 = Observable.error(new RuntimeException("Forced failure")); + Observable source2 = Observable.error(new TestException("Forced failure")); - Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable(); + Observable m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add)).toObservable(); m.subscribe(observer); verify(observer, times(1)).onError(any(Throwable.class)); @@ -372,7 +459,7 @@ public void whenComplicated() { PublishSubject zs = PublishSubject.create(); Observable m = JoinObservable.when( - JoinObservable.from(xs).and(ys).then(add2), // 1+4=5, 2+5=7, 3+6=9 + JoinObservable.from(xs).and(ys).then(add), // 1+4=5, 2+5=7, 3+6=9 JoinObservable.from(xs).and(zs).then(mul2), // 1*7=7, 2*8=16, 3*9=27 JoinObservable.from(ys).and(zs).then(sub2) // 4-7=-3, 5-8=-3, 6-9=-3 ).toObservable(); @@ -414,4 +501,889 @@ public void whenComplicated() { inOrder.verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } + + // ----------------- + + @Test + public void and4() { + verifyAnd(JoinObservable.when(JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .then(add)), 4); + } + + @Test + public void and4Error1() { + verifyError(JoinObservable.when( + JoinObservable.from(error) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and4Error2() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(error) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and4Error3() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(error) + .and(some) + .then(add))); + } + + @Test + public void and4Error4() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(error) + .then(add))); + } + + @Test + public void then4Throws() { + verifyError(JoinObservable.when( + JoinObservable + .from(some) + .and(some) + .and(some) + .and(some) + .then(throwFunc))); + } + + // ----------------- + + @Test + public void and5() { + verifyAnd(JoinObservable.when(JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add)), 5); + } + + @Test + public void and5Error1() { + verifyError(JoinObservable.when( + JoinObservable.from(error) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and5Error2() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(error) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and5Error3() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(error) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and5Error4() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(error) + .and(some) + .then(add))); + } + @Test + public void and5Error5() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(error) + .then(add))); + } + + @Test + public void then5Throws() { + verifyError(JoinObservable.when( + JoinObservable + .from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(throwFunc))); + } + + // ----------------- + + @Test + public void and6() { + verifyAnd(JoinObservable.when(JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add)), 6); + } + + @Test + public void and6Error1() { + verifyError(JoinObservable.when( + JoinObservable.from(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and6Error2() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and6Error3() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and6Error4() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .then(add))); + } + @Test + public void and6Error5() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .then(add))); + } + + @Test + public void and6Error6() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .then(add))); + } + + @Test + public void then6Throws() { + verifyError(JoinObservable.when( + JoinObservable + .from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(throwFunc))); + } + // ----------------- + + @Test + public void and7() { + verifyAnd(JoinObservable.when(JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add)), 7); + } + + @Test + public void and7Error1() { + verifyError(JoinObservable.when( + JoinObservable.from(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and7Error2() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and7Error3() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and7Error4() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .then(add))); + } + @Test + public void and7Error5() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and7Error6() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .then(add))); + } + + @Test + public void and7Error7() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .then(add))); + } + + @Test + public void then7Throws() { + verifyError(JoinObservable.when( + JoinObservable + .from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(throwFunc))); + } + // ----------------- + + @Test + public void and8() { + verifyAnd(JoinObservable.when(JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add)), 8); + } + + @Test + public void and8Error1() { + verifyError(JoinObservable.when( + JoinObservable.from(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and8Error2() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and8Error3() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and8Error4() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + @Test + public void and8Error5() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and8Error6() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and8Error7() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .then(add))); + } + + @Test + public void and8Error8() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .then(add))); + } + + @Test + public void then8Throws() { + verifyError(JoinObservable.when( + JoinObservable + .from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(throwFunc))); + } + // ----------------- + + @Test + public void and9() { + verifyAnd(JoinObservable.when(JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add)), 9); + } + + @Test + public void and9Error1() { + verifyError(JoinObservable.when( + JoinObservable.from(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and9Error2() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and9Error3() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and9Error4() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + @Test + public void and9Error5() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and9Error6() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and9Error7() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void and9Error8() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .then(add))); + } + @Test + public void and9Error9() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .then(add))); + } + + @Test + public void then9Throws() { + verifyError(JoinObservable.when( + JoinObservable + .from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(throwFunc))); + } + // ----------------- + + @Test + public void andN() { + int s = 10; + for (int n = s; n < 100; n++) { + System.out.println("AndN(" + n + ")"); + PatternN p = JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some); + + for (int j = 0; j < n - s; j++) { + p = p.and(some); + } + verifyAnd(JoinObservable.when(p.then(add)), n); + } + } + + @Test + public void andNError1() { + verifyError(JoinObservable.when( + JoinObservable.from(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void andNError2() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void andNError3() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void andNError4() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + @Test + public void andNError5() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void andNError6() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void andNError7() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void andNError8() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .and(some) + .then(add))); + } + + @Test + public void andNError9() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .and(some) + .then(add))); + } + + @Test + public void andNErrorN() { + verifyError(JoinObservable.when( + JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(error) + .then(add))); + } + + @Test + public void andNErrorNRange() { + for (int n = 10; n < 100; n++) { + PatternN p = JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some); + + for (int j = 0; j < n - 10; j++) { + p = p.and(some); + } + p = p.and(error); + + verifyError(JoinObservable.when(p.then(add))); + } + } + + + @Test + public void thenNThrows() { + for (int n = 10; n < 100; n++) { + PatternN p = JoinObservable.from(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some) + .and(some); + + for (int j = 0; j < n - 10; j++) { + p = p.and(some); + } + verifyError(JoinObservable.when(p.then(throwFunc))); + } + } }