Skip to content

Commit

Permalink
resolves #267 Coroutine and reactive streams modules missing projecti…
Browse files Browse the repository at this point in the history
…on extension
  • Loading branch information
zigzago committed Mar 21, 2021
1 parent 8a36aa0 commit 3d490b1
Show file tree
Hide file tree
Showing 8 changed files with 659 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (C) 2016/2020 Litote
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.litote.kmongo.reactivestreams

import com.mongodb.bulk.BulkWriteResult
import com.mongodb.client.result.InsertOneResult
import org.junit.Test
import org.litote.kmongo.ascending
import org.litote.kmongo.div
import org.litote.kmongo.eq
import org.litote.kmongo.insertOne
import org.litote.kmongo.model.Coordinate
import org.litote.kmongo.model.Friend
import kotlin.test.assertEquals

/**
*
*/
class ProjectionTest : KMongoReactiveStreamsBaseTest<Friend>() {

@Test
fun `single projection is ok`() {
col.bulkWrite(
insertOne(Friend("Joe")),
insertOne(Friend("Bob"))
).listenSingle { _: BulkWriteResult?, _: Throwable? ->

col.projection(Friend::name, Friend::name eq "Joe")
.listenList { list, _ ->
assertEquals(
listOf("Joe"),
list
)
}

col.projection(Friend::name, options = { it.sort(ascending(Friend::name)) })
.listenList { list, _ ->
asyncTest {
assertEquals(
listOf("Bob", "Joe"),
list
)
}
}
}
}

@Test
fun `single multi fields projection is ok`() {
col.insertOne(Friend("Joe", Coordinate(2, 2))).listenSingle { _: InsertOneResult?, _: Throwable? ->
col.projection(Friend::coordinate / Coordinate::lat).first().listenSingle { r, _ ->
asyncTest {
assertEquals(
2,
r
)
}
}
}
}

@Test
fun `pair projection is ok`() {
col.bulkWrite(
insertOne(Friend("Joe", coordinate = Coordinate(1, 2))),
insertOne(Friend("Bob", coordinate = Coordinate(3, 4)))
).listenSingle { _: BulkWriteResult?, _: Throwable? ->

col.projection(Friend::name, Friend::coordinate, Friend::name eq "Joe").first().listenSingle { r, _ ->
asyncTest {
assertEquals(
"Joe" to Coordinate(1, 2),
r
)
}
}
}
}

@Test
fun `pair multi fields projection is ok`() {
col.insertOne(Friend("Joe", Coordinate(1, 2))).listenSingle { _: InsertOneResult?, _: Throwable? ->
col.projection(
Friend::coordinate / Coordinate::lat,
Friend::coordinate / Coordinate::lng
).first().listenSingle { r, _ ->
asyncTest {
assertEquals(1, r!!.first)
assertEquals(2, r.second)
}
}
}
}

@Test
fun `triple projection is ok`() {
col.bulkWrite(
insertOne(Friend("Joe", "Here", coordinate = Coordinate(1, 2), tags = listOf("t1"))),
insertOne(Friend("Bob", "Here", coordinate = Coordinate(3, 4), tags = listOf("t2")))
).listenSingle { _: BulkWriteResult?, _: Throwable? ->

col.projection(
Friend::name,
Friend::coordinate,
Friend::tags,
Friend::name eq "Joe"
).first().listenSingle { r, _ ->
asyncTest {
assertEquals(
Triple("Joe", Coordinate(1, 2), listOf("t1")),
r
)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class WatchTest : KMongoReactiveStreamsBaseTest<Watch>() {
asyncTest {
dropped.set(true)
println("drop")
col.drop().waitSingle { _, _ ->
col.drop().listenSingle { _, _ ->

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,20 @@ import org.bson.conversions.Bson
import org.litote.kmongo.EMPTY_BSON
import org.litote.kmongo.SetTo
import org.litote.kmongo.and
import org.litote.kmongo.excludeId
import org.litote.kmongo.fields
import org.litote.kmongo.include
import org.litote.kmongo.path
import org.litote.kmongo.set
import org.litote.kmongo.util.KMongoUtil
import org.litote.kmongo.util.PairProjection
import org.litote.kmongo.util.SingleProjection
import org.litote.kmongo.util.TripleProjection
import org.litote.kmongo.util.pairProjectionCodecRegistry
import org.litote.kmongo.util.singleProjectionCodecRegistry
import org.litote.kmongo.util.tripleProjectionCodecRegistry
import org.reactivestreams.Publisher
import kotlin.reflect.KProperty
import kotlin.reflect.KProperty1

/**
Expand Down Expand Up @@ -574,3 +584,86 @@ inline fun <reified T : Any> MongoCollection<T>.bulkWrite(
options: BulkWriteOptions = BulkWriteOptions()
): Publisher<BulkWriteResult> = bulkWrite(requests.toList(), options)

/**
* Returns the specified field for all matching documents.
*
* @param property the property to return
* @param query the optional find query
* @param options the optional [FindPublisher] modifiers
* @return a property value FindPublisher
*/
inline fun <T, reified F> MongoCollection<T>.projection(
property: KProperty<F>,
query: Bson = EMPTY_BSON,
options: (FindPublisher<SingleProjection<F>>) -> FindPublisher<SingleProjection<F>> = { it }
): FindPublisher<F> =
withDocumentClass<SingleProjection<F>>()
.withCodecRegistry(singleProjectionCodecRegistry(property.path(), F::class, codecRegistry))
.find(query)
.let { options(it) }
.projection(fields(excludeId(), include(property)))
.map { it?.field }

/**
* Returns the specified two fields for all matching documents.
*
* @param property1 the first property to return
* @param property2 the second property to return
* @param query the optional find query
* @param options the optional [FindPublisher] modifiers
* @return a pair of property values FindPublisher
*/
inline fun <T, reified F1, reified F2> MongoCollection<T>.projection(
property1: KProperty<F1>,
property2: KProperty<F2>,
query: Bson = EMPTY_BSON,
options: (FindPublisher<PairProjection<F1, F2>>) -> FindPublisher<PairProjection<F1, F2>> = { it }
): FindPublisher<Pair<F1?, F2?>> =
withDocumentClass<PairProjection<F1, F2>>()
.withCodecRegistry(
pairProjectionCodecRegistry(
property1.path(),
F1::class,
property2.path(),
F2::class,
codecRegistry
)
)
.find(query)
.let { options(it) }
.projection(fields(excludeId(), include(property1), include(property2)))
.map { it?.field1 to it?.field2 }

/**
* Returns the specified three fields for all matching documents.
*
* @param property1 the first property to return
* @param property2 the second property to return
* @param property3 the third property to return
* @param query the optional find query
* @param options the optional [FindPublisher] modifiers
* @return a triple of property values FindPublisher
*/
inline fun <T, reified F1, reified F2, reified F3> MongoCollection<T>.projection(
property1: KProperty<F1>,
property2: KProperty<F2>,
property3: KProperty<F3>,
query: Bson = EMPTY_BSON,
options: (FindPublisher<TripleProjection<F1, F2, F3>>) -> FindPublisher<TripleProjection<F1, F2, F3>> = { it }
): FindPublisher<Triple<F1?, F2?, F3?>> =
withDocumentClass<TripleProjection<F1, F2, F3>>()
.withCodecRegistry(
tripleProjectionCodecRegistry(
property1.path(),
F1::class,
property2.path(),
F2::class,
property3.path(),
F3::class,
codecRegistry
)
)
.find(query)
.let { options(it) }
.projection(fields(excludeId(), include(property1), include(property2), include(property3)))
.map { Triple(it?.field1, it?.field2, it?.field3) }
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
/**
* Listen one element from the publisher.
*/
fun <T> Publisher<T>.waitSingle(listener: (T?, Throwable?) -> Unit): Unit =
fun <T> Publisher<T>.listenSingle(listener: (T?, Throwable?) -> Unit): Unit =
subscribe(object : Subscriber<T> {
override fun onComplete() {
//do nothing
Expand Down Expand Up @@ -90,4 +90,5 @@ fun <T> Publisher<T>.listenList(listener: (List<T>?, Throwable?) -> Unit) {
listener(null, t)
}
})
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,18 @@

package org.litote.kmongo.reactivestreams

import com.mongodb.CursorType
import com.mongodb.ExplainVerbosity
import com.mongodb.client.model.Collation
import com.mongodb.reactivestreams.client.FindPublisher
import org.bson.Document
import org.bson.conversions.Bson
import org.litote.kmongo.include
import org.litote.kmongo.util.KMongoUtil
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import java.util.concurrent.TimeUnit
import kotlin.reflect.KProperty

/**
Expand Down Expand Up @@ -54,3 +63,96 @@ fun <T> FindPublisher<T>.projection(vararg projections: KProperty<*>): FindPubli
* @return this
*/
fun <T> FindPublisher<T>.sort(sort: String): FindPublisher<T> = sort(KMongoUtil.toBson(sort))

/**
* Maps a value and returns the new FindPublisher.
*/
@Suppress("UNCHECKED_CAST")
fun <I, O> FindPublisher<I>.map(mapper: (I?) -> O?): FindPublisher<O> =
object : FindPublisher<O> {

override fun first(): Publisher<O> = this@map.first().map(mapper)

override fun filter(filter: Bson): FindPublisher<O> = this@map.filter(filter).map(mapper)

override fun limit(limit: Int): FindPublisher<O> = this@map.limit(limit).map(mapper)

override fun skip(skip: Int): FindPublisher<O> = this@map.skip(skip).map(mapper)

override fun maxTime(maxTime: Long, timeUnit: TimeUnit): FindPublisher<O> =
this@map.maxTime(maxTime, timeUnit).map(mapper)

override fun maxAwaitTime(maxAwaitTime: Long, timeUnit: TimeUnit): FindPublisher<O> =
this@map.maxAwaitTime(maxAwaitTime, timeUnit).map(mapper)

override fun projection(projection: Bson): FindPublisher<O> = this@map.projection(projection).map(mapper)

override fun sort(sort: Bson): FindPublisher<O> = this@map.sort(sort).map(mapper)

override fun noCursorTimeout(noCursorTimeout: Boolean): FindPublisher<O> =
this@map.noCursorTimeout(noCursorTimeout).map(mapper)

override fun oplogReplay(oplogReplay: Boolean): FindPublisher<O> = this@map.oplogReplay(oplogReplay).map(mapper)

override fun partial(partial: Boolean): FindPublisher<O> = this@map.partial(partial).map(mapper)

override fun cursorType(cursorType: CursorType): FindPublisher<O> = this@map.cursorType(cursorType).map(mapper)

override fun collation(collation: Collation): FindPublisher<O> = this@map.collation(collation).map(mapper)

override fun comment(comment: String): FindPublisher<O> = this@map.comment(comment).map(mapper)

override fun hint(hint: Bson): FindPublisher<O> = this@map.hint(hint).map(mapper)

override fun hintString(hint: String): FindPublisher<O> = this@map.hintString(hint).map(mapper)

override fun max(max: Bson): FindPublisher<O> = this@map.max(max).map(mapper)

override fun min(min: Bson): FindPublisher<O> = this@map.min(min).map(mapper)

override fun returnKey(returnKey: Boolean): FindPublisher<O> = this@map.returnKey(returnKey).map(mapper)

override fun showRecordId(showRecordId: Boolean): FindPublisher<O> =
this@map.showRecordId(showRecordId).map(mapper)

override fun batchSize(batchSize: Int): FindPublisher<O> = this@map.batchSize(batchSize).map(mapper)

override fun allowDiskUse(allowDiskUse: Boolean?): FindPublisher<O> =
this@map.allowDiskUse(allowDiskUse).map(mapper)

override fun explain(): Publisher<Document> = this@map.explain()

override fun explain(verbosity: ExplainVerbosity): Publisher<Document> = this@map.explain(verbosity)

override fun <E : Any?> explain(explainResultClass: Class<E>): Publisher<E> =
this@map.explain(explainResultClass)

override fun <E : Any?> explain(explainResultClass: Class<E>, verbosity: ExplainVerbosity): Publisher<E> =
this@map.explain(explainResultClass, verbosity)

override fun subscribe(subscriber: Subscriber<in O>) {
this@map.subscribe(mapper, subscriber)
}
}

fun <I, O> Publisher<I>.map(mapper: (I?) -> O?): Publisher<O> =
Publisher<O> { subscriber -> subscribe(mapper, subscriber) }

private fun <I, O> Publisher<I>.subscribe(mapper: (I?) -> O?, subscriber: Subscriber<in O>) =
subscribe(object : Subscriber<I> {
override fun onComplete() {
subscriber.onComplete()
}

override fun onSubscribe(s: Subscription) {
subscriber.onSubscribe(s)
}

override fun onNext(t: I?) {
subscriber.onNext(mapper(t))
}

override fun onError(t: Throwable) {
subscriber.onError(t)
}
})
Loading

0 comments on commit 3d490b1

Please sign in to comment.