Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Coroutines to 1.6.0, Kotlin API to 1.5 and refactored tests #790

Merged
merged 1 commit into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Multi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ package io.smallrye.mutiny.coroutines
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.subscription.MultiEmitter
import io.smallrye.mutiny.subscription.MultiSubscriber
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import org.reactivestreams.Subscription
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.coroutineContext

/**
* Subscribe to this [Multi] and provide the items as [Flow].
Expand Down
4 changes: 2 additions & 2 deletions kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Uni.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package io.smallrye.mutiny.coroutines

import io.smallrye.mutiny.Uni
import io.smallrye.mutiny.subscription.UniEmitter
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

/**
* Awaits the result of this [Uni] while suspending the surrounding coroutine.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
package io.smallrye.mutiny.coroutines

import io.smallrye.mutiny.helpers.test.UniAssertSubscriber
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.Test
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.assertj.core.api.Assertions.assertThat
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.Test

@ExperimentalCoroutinesApi
class DeferredAsUniTest {

@Test
fun `test immediate value`() {
runBlocking {
testBlocking {
// Given
val value = UUID.randomUUID()
val deferred = GlobalScope.async { value }
val deferred = embeddedScope().async { value }

// When
val subscriber = UniAssertSubscriber.create<UUID>()
Expand All @@ -37,9 +35,9 @@ class DeferredAsUniTest {

@Test
fun `test immediate failure`() {
runBlocking {
// Given
val deferred = GlobalScope.async { error("kaboom") }
testBlocking {
// Given an error produced by a Deferred created in an isolated CoroutineScope
val deferred = embeddedScope().async { error("kaboom") }

// When
val subscriber = UniAssertSubscriber<Any>()
Expand All @@ -52,9 +50,9 @@ class DeferredAsUniTest {

@Test
fun `test coroutine cancellation before value is computed`() {
runBlocking {
testBlocking {
// Given
val deferred = GlobalScope.async {
val deferred = embeddedScope().async {
delay(250)
UUID.randomUUID()
}
Expand All @@ -71,9 +69,9 @@ class DeferredAsUniTest {

@Test
fun `test null item`() {
runBlocking {
testBlocking {
// Given
val deferred = GlobalScope.async { null }
val deferred = embeddedScope().async { null }

// When
val subscriber = UniAssertSubscriber.create<Any>()
Expand All @@ -86,10 +84,10 @@ class DeferredAsUniTest {

@Test
fun `test uni cancellation before deferred completes`() {
runBlocking {
testBlocking {
// Given
val deferredLatch = CountDownLatch(1)
val deferred = GlobalScope.async {
val deferred = embeddedScope().async {
delay(10_000) // simulate long running job
"Hello, World!"
}
Expand All @@ -110,13 +108,13 @@ class DeferredAsUniTest {
awaitLatch("Deferred", deferredLatch)
assertThat(deferred.isCancelled)
.`as`("Check that Deferred is cancelled")
.isTrue()
.isTrue
}
}

@Test
fun `test cancelling deferred does not impact siblings`() {
runBlocking {
testBlocking {
// Start before async.
val child1Latch = CountDownLatch(1)
val child1 = launch {
Expand All @@ -136,7 +134,7 @@ class DeferredAsUniTest {
deferred.invokeOnCompletion { exception ->
deferredLatch.countDown()
assertThat(exception)
.isNotNull()
.isNotNull
.isInstanceOf(CancellationException::class.java)
}

Expand All @@ -162,7 +160,7 @@ class DeferredAsUniTest {
awaitLatch("Child 1", child1Latch)
assertThat(child1.isCompleted)
.`as`("Check that child 1 completed")
.isTrue()
.isTrue

awaitLatch("Uni", uniLatch)
awaitLatch("Deferred", deferredLatch)
Expand All @@ -171,11 +169,11 @@ class DeferredAsUniTest {
.isNotNull
assertThat(deferred.isCancelled)
.`as`("Check that Deferred was cancelled")
.isTrue()
.isTrue
awaitLatch("Sibling 2", child2Latch)
assertThat(child2.isCompleted)
.`as`("Check that child 2 completed")
.isTrue()
.isTrue
}
}
}
Expand All @@ -184,5 +182,5 @@ class DeferredAsUniTest {
private fun awaitLatch(name: String, latch: CountDownLatch) {
assertThat(latch.await(2, TimeUnit.SECONDS))
.`as`("Check that $name completes within 2s")
.isTrue()
.isTrue
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package io.smallrye.mutiny.coroutines

import io.smallrye.mutiny.helpers.test.AssertSubscriber
import kotlinx.coroutines.*
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.Test
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import org.assertj.core.api.Assertions.assertThat
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.Test

class FlowAsMultiTest {

@Test
fun `test immediate item`() {
runBlocking(Dispatchers.Default) {
testBlocking {
// Given
val item = UUID.randomUUID()
val flow = flowOf(item)
Expand All @@ -35,7 +37,7 @@ class FlowAsMultiTest {

@Test
fun `test immediate items`() {
runBlocking(Dispatchers.Default) {
testBlocking {
// Given
val items = arrayOf(5, 23, 42)
val flow = flowOf(*items)
Expand All @@ -54,7 +56,7 @@ class FlowAsMultiTest {

@Test
fun `test immediate failure`() {
runBlocking(Dispatchers.Default) {
testBlocking {
// Given
val flow = flow<Any> {
error("boom")
Expand All @@ -66,13 +68,13 @@ class FlowAsMultiTest {

// Then
subscriber.awaitFailure()
.assertFailedWith(IllegalStateException::class.java, "boom")
.assertFailedWith(IllegalStateException::class.java, "boom")
}
}

@Test
fun `verify that coroutine cancellation result in failure`() {
runBlocking(Dispatchers.Default) {
testBlocking {
// Given
val flow = flow<UUID> {
delay(200)
Expand All @@ -81,13 +83,11 @@ class FlowAsMultiTest {
val subscriber = AssertSubscriber.create<UUID>(1)

// When
runBlocking {
val job = launch {
flow.asMulti().subscribe().withSubscriber(subscriber)
}
delay(50)
job.cancel(CancellationException("abort"))
val job = launch {
flow.asMulti().subscribe().withSubscriber(subscriber)
}
delay(50)
job.cancel(CancellationException("abort"))
Thread.sleep(350)

// Then
Expand All @@ -97,7 +97,7 @@ class FlowAsMultiTest {

@Test
fun `verify that Flow cancels on Multi subscription cancellation`() {
runBlocking(Dispatchers.IO) {
testBlocking {
// Given
val counter = AtomicInteger()
var exitException: Throwable? = null
Expand All @@ -107,7 +107,7 @@ class FlowAsMultiTest {
emit(counter.incrementAndGet())
}
} catch (err: Throwable) {
exitException = err
exitException = err
}
}

Expand All @@ -122,15 +122,14 @@ class FlowAsMultiTest {
assertThat(counter.get()).isGreaterThan(0)
subscriber.assertItems(*(1..42).toList().toTypedArray())
subscriber.assertNotTerminated()
assertThat(subscriber.isCancelled).isTrue()
assertThat(exitException).isNotNull().isInstanceOf(CancellationException::class.java)
assertThat(subscriber.isCancelled).isTrue
assertThat(exitException).isNotNull.isInstanceOf(CancellationException::class.java)
}
}

@ExperimentalCoroutinesApi
@Test
fun `verify that callbackFlow cancels on Multi subscription cancellation`() {
runBlocking(Dispatchers.IO) {
testBlocking {
// Given
val closed = AtomicBoolean(false)
val flow = callbackFlow<Int> {
Expand All @@ -154,7 +153,7 @@ class FlowAsMultiTest {

// Then
subscriber.assertNotTerminated()
assertThat(subscriber.isCancelled).isTrue()
assertThat(subscriber.isCancelled).isTrue
assertThat(closed).isTrue
}
}
Expand Down
Loading