Skip to content

Commit

Permalink
#1103 Refactor Multi.toFlow() to avoid blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
heubeck committed Oct 30, 2022
1 parent cdf9523 commit a437c85
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 32 deletions.
45 changes: 14 additions & 31 deletions kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Multi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.subscription.MultiEmitter
import io.smallrye.mutiny.subscription.MultiSubscriber
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import org.reactivestreams.Subscription

/**
Expand All @@ -23,23 +28,12 @@ import org.reactivestreams.Subscription
* The subscription to this [Multi] gets automatically cancelled when the surrounding coroutine fails or gets cancelled.
*/
@ExperimentalCoroutinesApi

This comment has been minimized.

Copy link
@j128919965

j128919965 Oct 31, 2022

This line might be deleted if invokeOnClose is not used

suspend fun <T> Multi<T>.asFlow(): Flow<T> = channelFlow<T> {
suspend fun <T> Multi<T>.asFlow(): Flow<T> = callbackFlow<T> {
val parentCtx = coroutineContext
val terminationFailure = AtomicReference<Throwable?>(null)
val terminationLock = Mutex(locked = true)

val subscriber = object : MultiSubscriber<T> {
private val subscription = AtomicReference<Subscription?>()

init {
invokeOnClose { reason ->
if (reason != null) {
// coroutine was cancelled or is failed
subscription.get()?.cancel()
}
}
}

override fun onSubscribe(s: Subscription) {
if (subscription.compareAndSet(null, s)) {
s.request(Long.MAX_VALUE)
Expand All @@ -50,36 +44,25 @@ suspend fun <T> Multi<T>.asFlow(): Flow<T> = channelFlow<T> {

override fun onItem(item: T) {
if (parentCtx.isActive) {
runBlocking {
channel.send(item)
}
} else if (terminationLock.isLocked) {
channel.trySendBlocking(item).getOrThrow()
} else {
// Coroutine is completed or was cancelled:
// regular cancellation may happen by Flow abortion like with Flow.first(), no failure must be thrown
// in case of a failure, 'invokeOnClose' is called with a reason separately - setting a terminatinoFailure is also not necessary here
subscription.get()?.cancel()
terminationLock.unlock()
}
}

override fun onFailure(failure: Throwable) {
terminationFailure.set(failure)
terminationLock.unlock()
cancel(CancellationException(failure.message, failure))
}

override fun onCompletion() {
if (terminationLock.isLocked) {
terminationLock.unlock()
}
channel.close()
}

}

subscribe().withSubscriber(subscriber)
// wait (suspending) for completion or failure of Multi to terminate the Flow
terminationLock.lock()
terminationFailure.get()?.also { throw it }
}
awaitClose()
}.buffer(Channel.UNLIMITED) // Switching to unlimited channel capacity avoids blocking

/**
* Provide this [Flow]s items or failure as [Multi].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.yield
import org.assertj.core.api.Assertions.assertThat

@ExperimentalCoroutinesApi
Expand Down Expand Up @@ -147,13 +148,13 @@ class MultiAsFlowTest {
delay(100)
emit()
emit()
yield()
fail()
}

// Then
assertThat(assertJob.await()).hasMessage("boom").isInstanceOf(IllegalStateException::class.java)
assertThat(eventItems).containsExactly(5, 5)

}
}

Expand Down

0 comments on commit a437c85

Please sign in to comment.