diff --git a/language-adaptors/rxjava-kotlin/README.md b/language-adaptors/rxjava-kotlin/README.md
new file mode 100644
index 0000000000..86f8125152
--- /dev/null
+++ b/language-adaptors/rxjava-kotlin/README.md
@@ -0,0 +1,36 @@
+# Kotlin Adaptor for RxJava
+
+This adaptor allows Kotlin Functions to be used and RxJava will know how to invoke them
+
+This enable code such as:
+
+```kotlin
+Observable.toObservable("one", "two", "three")
+ .take(2)
+ .subscribe{ (arg:String) ->
+ println(arg)
+ }
+```
+
+In the future this module will expose a more idiomatic way to use RxJava inside Kotlin
+
+# Binaries
+
+Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-kotlin%22).
+
+Example for Maven:
+
+```xml
+
+ com.netflix.rxjava
+ rxjava-kotlin
+ x.y.z
+
+```
+
+and for Ivy:
+
+```xml
+
+```
+
diff --git a/language-adaptors/rxjava-kotlin/build.gradle b/language-adaptors/rxjava-kotlin/build.gradle
new file mode 100644
index 0000000000..b717a0aff6
--- /dev/null
+++ b/language-adaptors/rxjava-kotlin/build.gradle
@@ -0,0 +1,59 @@
+buildscript {
+ repositories {
+ mavenCentral()
+ maven {
+ url 'http://repository.jetbrains.com/all'
+ }
+ }
+ dependencies {
+ classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.5.748'
+ }
+}
+
+apply plugin: 'java'
+apply plugin: 'kotlin'
+apply plugin: 'eclipse'
+apply plugin: 'idea'
+apply plugin: 'osgi'
+
+
+repositories {
+ maven {
+ url 'http://repository.jetbrains.com/all'
+ }
+}
+
+dependencies {
+ compile project(':rxjava-core')
+ compile 'org.jetbrains.kotlin:kotlin-stdlib:0.5.748'
+ provided 'junit:junit-dep:4.10'
+ provided 'org.mockito:mockito-core:1.8.5'
+ provided 'com.google.guava:guava:14.0.1'
+}
+
+eclipse {
+ classpath {
+ // include 'provided' dependencies on the classpath
+ plusConfigurations += configurations.provided
+
+ downloadSources = true
+ downloadJavadoc = true
+ }
+}
+
+idea {
+ module {
+ // include 'provided' dependencies on the classpath
+ scopes.PROVIDED.plus += configurations.provided
+ }
+}
+
+jar {
+ manifest {
+ name = 'rxjava-kotlin'
+ instruction 'Bundle-Vendor', 'Netflix'
+ instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
+ instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
+ instruction 'Fragment-Host', 'com.netflix.rxjava.core'
+ }
+}
\ No newline at end of file
diff --git a/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/KotlinAdaptor.kt b/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/KotlinAdaptor.kt
new file mode 100644
index 0000000000..fce004502f
--- /dev/null
+++ b/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/KotlinAdaptor.kt
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rx.lang.kotlin
+
+import rx.util.functions.FunctionLanguageAdaptor
+
+public class KotlinAdaptor: FunctionLanguageAdaptor {
+
+ public override fun call(function: Any?, args: Array?): Any? {
+ return when(args!!.size){
+ 0 -> (function!! as Function0)()
+ 1 -> (function!! as Function1)(args[0])
+ 2 -> (function!! as Function2)(args[0], args[1])
+ 3 -> (function!! as Function3)(args[0], args[1], args[2])
+ 4 -> (function!! as Function4)(args[0], args[1], args[2], args[3])
+ 5 -> (function!! as Function5)(args[0], args[1], args[2], args[3], args[4])
+ 6 -> (function!! as Function6)(args[0], args[1], args[2], args[3], args[4], args[5])
+ 7 -> (function!! as Function7)(args[0], args[1], args[2], args[3], args[4], args[5], args[6])
+ 8 -> (function!! as Function8)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7])
+ 9 -> (function!! as Function9)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8])
+ 10 -> (function!! as Function10)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9])
+ 11 -> (function!! as Function11)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10])
+ 12 -> (function!! as Function12)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11])
+ 13 -> (function!! as Function13)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12])
+ 14 -> (function!! as Function14)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13])
+ 15 -> (function!! as Function15)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14])
+ 16 -> (function!! as Function16)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15])
+ 17 -> (function!! as Function17)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16])
+ 18 -> (function!! as Function18)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], args[17])
+ 19 -> (function!! as Function19)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], args[17], args[18])
+ 20 -> (function!! as Function20)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], args[17], args[18], args[19])
+ 21 -> (function!! as Function21)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], args[17], args[18], args[19], args[20])
+ 22 -> (function!! as Function22)(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], args[17], args[18], args[19], args[20], args[21])
+ else -> throw UnsupportedOperationException("")
+ }
+ }
+
+ public override fun getFunctionClass(): Array>? {
+ return array(
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>(),
+ javaClass>())
+ }
+}
\ No newline at end of file
diff --git a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ObservableTests.kt b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ObservableTests.kt
new file mode 100644
index 0000000000..8134ae6b26
--- /dev/null
+++ b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ObservableTests.kt
@@ -0,0 +1,332 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rx.lang.kotlin
+
+import rx.Observable
+import rx.Observer
+import rx.Subscription
+import rx.subscriptions.Subscriptions
+import org.mockito.Mock
+import org.junit.Before
+import org.mockito.MockitoAnnotations
+import org.junit.Test
+import org.junit.Assert.*
+import org.mockito.Mockito.*
+import org.mockito.Matchers.*
+import rx.Notification
+
+public class ObservableTests {
+
+
+ [Mock] var a: ScriptAssertion? = null
+
+ [Mock] var w: Observer? = null
+
+ [Before] public fun before() {
+ MockitoAnnotations.initMocks(this)
+ }
+
+ private val receiveInt = {(result: Int) ->
+ a!!.received(result)
+ }
+
+ private val receiveListOfInt = {(result: List) ->
+ a!!.received(result)
+ }
+
+ private val receiveString = {(result: String) ->
+ a!!.received(result)
+ }
+
+ private val lengthEqualsTo3 = {(x: String) ->
+ x.length() == 3
+ }
+
+ [Test] public fun testCreate() {
+
+ Observable.create{(observer: Observer) ->
+ observer.onNext("hello")
+ observer.onCompleted()
+ }!!.subscribe(receiveString)
+ verify(a, times(1))!!.received("hello")
+ }
+
+ [Test] public fun testFilter() {
+
+ Observable.filter(Observable.toObservable(1, 2, 3)) {(i: Int) ->
+ i >= 2
+ }!!.subscribe(receiveInt)
+ verify(a, times(0))!!.received(1)
+ verify(a, times(1))!!.received(2)
+ verify(a, times(1))!!.received(3)
+ }
+
+ [Test] public fun testLast() {
+ assertEquals("three", Observable.toObservable("one", "two", "three")!!.toBlockingObservable()!!.last())
+ }
+
+ [Test] public fun testLastWithPredicate() {
+ assertEquals("two", Observable.toObservable("one", "two", "three")!!.toBlockingObservable()!!.last{(x: String) ->
+ x.length == 3
+ })
+ }
+
+ [Test] public fun testMap() {
+ TestFactory().getObservable().map{(it: String) ->
+ "say${it}"
+ }!!.subscribe(receiveString)
+ verify(a, times(1))!!.received("sayhello_1")
+
+ Observable.map(Observable.toObservable(1, 2, 3)) {(i: Int) ->
+ "hello_$i"
+ }!!.subscribe(receiveString)
+ verify(a, times(1))!!.received("hello_${1}")
+ verify(a, times(1))!!.received("hello_${2}")
+ verify(a, times(1))!!.received("hello_${3}")
+ }
+
+ [Test] public fun testMaterialize() {
+ Observable.materialize(Observable.toObservable(1, 2, 3))!!.subscribe{(result: Notification) ->
+ a!!.received(result)
+ }
+ verify(a, times(4))!!.received(any(javaClass>()))
+ verify(a, times(0))!!.error(any(javaClass()))
+ }
+
+ [Test] public fun testMerge() {
+ Observable.merge(
+ Observable.toObservable(1, 2, 3),
+ Observable.merge(
+ Observable.toObservable(6),
+ Observable.error(NullPointerException()),
+ Observable.toObservable(7)
+ ),
+ Observable.toObservable(4, 5)
+ )!!.subscribe(onNext = receiveInt, onError = {(exception: Exception) -> a!!.error(exception) })
+
+ verify(a, times(1))!!.received(1)
+ verify(a, times(1))!!.received(2)
+ verify(a, times(1))!!.received(3)
+ verify(a, times(0))!!.received(4) // the NPE will cause this sequence to be skipped
+ verify(a, times(0))!!.received(5) // the NPE will cause this sequence to be skipped
+ verify(a, times(1))!!.received(6) // this comes before the NPE so should exist
+ verify(a, times(0))!!.received(7)// this comes in the sequence after the NPE
+ verify(a, times(1))!!.error(any(javaClass()))
+ }
+
+ [Test] public fun testMergeDelayError() {
+ Observable.mergeDelayError(
+ Observable.toObservable(1, 2, 3),
+ Observable.merge(
+ Observable.toObservable(6),
+ Observable.error(NullPointerException()),
+ Observable.toObservable(7)
+ ),
+ Observable.toObservable(4, 5)
+ )!!.subscribe(onNext = receiveInt, onError = {(exception: Exception) -> a!!.error(exception) })
+ verify(a, times(1))!!.received(1)
+ verify(a, times(1))!!.received(2)
+ verify(a, times(1))!!.received(3)
+ verify(a, times(1))!!.received(4)
+ verify(a, times(1))!!.received(5)
+ verify(a, times(1))!!.received(6)
+ verify(a, times(0))!!.received(7)
+ verify(a, times(1))!!.error(any(javaClass()))
+ }
+
+ [Test] public fun testScriptWithMaterialize() {
+ TestFactory().getObservable().materialize()!!.subscribe{(result: Notification) ->
+ a!!.received(result)
+ }
+ verify(a, times(2))!!.received(any(javaClass>()))
+ }
+
+ [Test] public fun testScriptWithMerge() {
+ val factory = TestFactory()
+ Observable.merge(factory.getObservable(), factory.getObservable())!!.subscribe(receiveString)
+ verify(a, times(1))!!.received("hello_1")
+ verify(a, times(1))!!.received("hello_2")
+ }
+
+ [Test] public fun testScriptWithOnNext() {
+ TestFactory().getObservable().subscribe(receiveString)
+ verify(a, times(1))!!.received("hello_1")
+ }
+
+ [Test] public fun testScriptWithOnNextUsingMao() {
+ TestFactory().getObservable().subscribe(hashMapOf("onNext" to receiveString))
+ verify(a, times(1))!!.received("hello_1")
+ }
+
+ [Test] public fun testSkipTake() {
+ Observable.skip(Observable.toObservable(1, 2, 3), 1)!!.take(1)!!.subscribe(receiveInt)
+ verify(a, times(0))!!.received(1)
+ verify(a, times(1))!!.received(2)
+ verify(a, times(0))!!.received(3)
+ }
+
+ [Test] public fun testTakeWhile() {
+ Observable.takeWhile(Observable.toObservable(1, 2, 3)) {(x: Int) ->
+ x < 3
+ }!!.subscribe(receiveInt)
+ verify(a, times(1))!!.received(1)
+ verify(a, times(1))!!.received(2)
+ verify(a, times(0))!!.received(3)
+ }
+
+ [Test] public fun testTakeWhileWithIndex() {
+ Observable.takeWhileWithIndex(Observable.toObservable(1, 2, 3)) {(x: Int, i: Int) ->
+ i < 2
+ }!!.subscribe(receiveInt)
+ verify(a, times(1))!!.received(1)
+ verify(a, times(1))!!.received(2)
+ verify(a, times(0))!!.received(3)
+ }
+
+
+ [Test]
+ public fun testToSortedList() {
+ TestFactory().getNumbers().toSortedList()!!.subscribe(receiveListOfInt)
+ verify(a, times(1))!!.received(arrayListOf(1, 2, 3, 4, 5))
+ }
+
+ [Test]
+ public fun testToSortedListStatic() {
+ Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4))!!.subscribe(receiveListOfInt)
+ verify(a, times(1))!!.received(arrayListOf(1, 2, 3, 4, 5))
+ }
+
+ [Test]
+ public fun testToSortedListWithFunction() {
+ TestFactory().getNumbers().toSortedList{(a: Int, b: Int) -> a - b }!!.subscribe(receiveListOfInt)
+ verify(a, times(1))!!.received(arrayListOf(1, 2, 3, 4, 5))
+ }
+
+ [Test]
+ public fun testToSortedListWithFunctionStatic() {
+ Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4)) {(a: Int, b: Int) -> a - b }!!.subscribe(receiveListOfInt)
+ verify(a, times(1))!!.received(arrayListOf(1, 2, 3, 4, 5))
+ }
+
+ [Test]
+ public fun testForEach() {
+ asyncObservable.toBlockingObservable()!!.forEach(receiveInt)
+ verify(a, times(1))!!.received(1)
+ verify(a, times(1))!!.received(2)
+ verify(a, times(1))!!.received(3)
+ }
+
+ [Test]
+ public fun testForEachWithError() {
+ try {
+ asyncObservable.toBlockingObservable()!!.forEach{(result: Any) -> throw RuntimeException("err") }
+ fail("we expect an exception to be thrown")
+ }catch(e: Exception) {
+ // do nothing as we expect this
+ }
+ }
+
+ [Test]
+ public fun testLastOrDefault() {
+
+ assertEquals("two", Observable.toObservable("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default", lengthEqualsTo3))
+ }
+
+ [Test]
+ public fun testLastOrDefault2() {
+
+ assertEquals("default", Observable.toObservable("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") {(x: String) ->
+ x.length() > 3
+ })
+ }
+
+ public fun testSingle1() {
+
+ assertEquals("one", Observable.toObservable("one")!!.toBlockingObservable()!!.single(lengthEqualsTo3))
+ }
+
+ [Test(expected = javaClass())]
+ public fun testSingle2() {
+ Observable.toObservable("one", "two")!!.toBlockingObservable()!!.single(lengthEqualsTo3)
+ }
+
+ [Test]
+ public fun testDefer() {
+ val obs = Observable.toObservable(1, 2)!!
+ Observable.defer{ obs }!!.subscribe(receiveInt)
+ verify(a, times(1))!!.received(1)
+ verify(a, times(1))!!.received(2)
+
+ }
+
+ [Test]
+ public fun testAll() {
+ Observable.toObservable(1, 2, 3)!!.all{(x: Int) -> x > 0 }!!.subscribe{(result: Boolean) ->
+ a!!.received(result)
+ }
+ verify(a, times(1))!!.received(true)
+ }
+
+
+ val asyncObservable = {(observer: Observer) ->
+ Thread(Runnable{
+ try{
+ Thread.sleep(50)
+ }catch(e: Exception) {
+ //Do nothing
+ }
+ observer.onNext(1)
+ observer.onNext(2)
+ observer.onNext(3)
+ observer.onCompleted()
+ }).start()
+ Subscriptions.empty()!!
+ }.asObservable()
+
+ trait ScriptAssertion{
+ public fun error(e: Exception?)
+ public fun received(a: Any?)
+ }
+
+ class TestObservable(val count: Int): Observable(){
+
+ public override fun subscribe(observer: Observer?): Subscription? {
+ observer!!.onNext("hello_$count")
+ observer.onCompleted()
+ return Subscription{ }
+ }
+ }
+
+ class TestFactory {
+ var counter = 1
+
+ public fun getNumbers(): Observable {
+ return Observable.toObservable(1, 2, 3, 4, 5)!!
+ }
+
+ public fun getObservable(): TestObservable {
+ return TestObservable(counter++)
+ }
+ }
+
+ fun Function1, Subscription>.asObservable(): Observable {
+ return Observable.create(rx.util.functions.Func1, Subscription>{
+ this(it!!)
+ })!!
+ }
+}
+
diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java
index 53671c210d..728d9da2a9 100644
--- a/rxjava-core/src/main/java/rx/util/functions/Functions.java
+++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java
@@ -15,6 +15,7 @@
*/
package rx.util.functions;
+
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +36,7 @@ public class Functions {
loadLanguageAdaptor("JRuby");
loadLanguageAdaptor("Clojure");
loadLanguageAdaptor("Scala");
+ loadLanguageAdaptor("Kotlin");
// as new languages arise we can add them here but this does not prevent someone from using 'registerLanguageAdaptor' directly
}
diff --git a/settings.gradle b/settings.gradle
index f07f904404..6b0620cbb5 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -4,4 +4,5 @@ include 'rxjava-core', \
'language-adaptors:rxjava-jruby', \
'language-adaptors:rxjava-clojure', \
'language-adaptors:rxjava-scala', \
+'language-adaptors:rxjava-kotlin', \
'rxjava-contrib:rxjava-swing'