Skip to content

Commit

Permalink
fix throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
hoc081098 committed May 1, 2022
1 parent c973042 commit a713ef2
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
22 changes: 13 additions & 9 deletions src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,20 @@ public fun <T> Flow<T>.throttleTime(
}
}

val onWindowClosed = suspend {
throttled = null

if (trailing) {
trySend()
}
}

// Now consume the values until the original flow is complete.
while (lastValue !== DONE_VALUE) {
// wait for the next value
select<Unit> {
// When a throttling window ends, send the value if there is a pending value.
throttled?.onJoin?.invoke {
throttled = null

if (trailing) {
trySend()
}
}
throttled?.onJoin?.invoke(onWindowClosed)

values.onReceiveCatching { result ->
result
Expand All @@ -288,8 +290,10 @@ public fun <T> Flow<T>.throttleTime(
trySend()
}

val duration = durationSelector(NULL_VALUE.unbox(value))
throttled = scope.launch { delay(duration) }
when (val duration = durationSelector(NULL_VALUE.unbox(value))) {
Duration.ZERO -> onWindowClosed()
else -> throttled = scope.launch { delay(duration) }
}
}
.onFailure {
it?.let { throw it }
Expand Down
3 changes: 1 addition & 2 deletions src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.StandardTestDispatcher
import java.util.concurrent.Executors
import kotlin.test.Test
import kotlin.test.assertEquals
Expand Down Expand Up @@ -178,7 +177,7 @@ class ThrottleFirstJvmTest {
}

@Test
fun throttleFailureUpstream() = runBlocking(StandardTestDispatcher()) {
fun throttleFailureUpstream() = runBlocking {
flow {
emit(1)
throw TestException("Broken!")
Expand Down

0 comments on commit a713ef2

Please sign in to comment.