From f81e1813dabc687eef690de21e482c367f5b03aa Mon Sep 17 00:00:00 2001 From: Renovate Bot Date: Mon, 4 Apr 2022 15:56:49 +0000 Subject: [PATCH 01/13] chore(deps): update jamesives/github-pages-deploy-action action to v4.3.0 --- .github/workflows/build.yml | 2 +- .github/workflows/publish-release.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f46ff075..7f894707 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -148,7 +148,7 @@ jobs: - name: Deploy docs 🚀 to website if: ${{ github.ref == 'refs/heads/master' && github.repository == 'hoc081098/FlowExt' && matrix.os == 'macos-11' }} - uses: JamesIves/github-pages-deploy-action@v4.2.5 + uses: JamesIves/github-pages-deploy-action@v4.3.0 with: branch: gh-pages # The branch the action should deploy to. folder: build/dokka/html # The folder the action should deploy. diff --git a/.github/workflows/publish-release.yml b/.github/workflows/publish-release.yml index 385bf67e..39eb383b 100644 --- a/.github/workflows/publish-release.yml +++ b/.github/workflows/publish-release.yml @@ -86,7 +86,7 @@ jobs: - name: Deploy docs 🚀 to website if: ${{ matrix.os == 'macos-11' }} - uses: JamesIves/github-pages-deploy-action@v4.2.5 + uses: JamesIves/github-pages-deploy-action@v4.3.0 with: branch: gh-pages # The branch the action should deploy to. folder: build/dokka/html # The folder the action should deploy. From f782d2017b4549b7c4abc5d9eeeca00204ce5ed4 Mon Sep 17 00:00:00 2001 From: Renovate Bot Date: Mon, 4 Apr 2022 18:30:48 +0000 Subject: [PATCH 02/13] chore(deps): update coroutinesversion to v1.6.1 --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index e7744557..60c629c9 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -15,7 +15,7 @@ plugins { id("org.jetbrains.kotlinx.kover") version "0.5.0" } -val coroutinesVersion = "1.6.0" +val coroutinesVersion = "1.6.1" val ktlintVersion = "0.44.0" repositories { From 92ca3c2902efe5cd90264411cf0df183b071c919 Mon Sep 17 00:00:00 2001 From: Renovate Bot Date: Tue, 5 Apr 2022 16:26:10 +0000 Subject: [PATCH 03/13] chore(deps): update codecov/codecov-action action to v3 --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f46ff075..c5f1f7e3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -65,7 +65,7 @@ jobs: run: ./gradlew koverXmlReport - name: Upload Test Report - uses: codecov/codecov-action@v2.1.0 + uses: codecov/codecov-action@v3.0.0 - name: Upload test report artifact if: ${{ failure() }} From 75602748aaa2c318aa7dd1117ffd0d7c31db73c7 Mon Sep 17 00:00:00 2001 From: Renovate Bot Date: Wed, 6 Apr 2022 18:27:47 +0000 Subject: [PATCH 04/13] chore(deps): update plugin com.diffplug.spotless to v6.4.2 --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index e7744557..51a95d1a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -7,7 +7,7 @@ import java.net.URL plugins { kotlin("multiplatform") version "1.6.20" - id("com.diffplug.spotless") version "6.4.1" + id("com.diffplug.spotless") version "6.4.2" id("maven-publish") id("com.vanniktech.maven.publish") version "0.19.0" id("org.jetbrains.kotlinx.binary-compatibility-validator") version "0.8.0" From f92e8c9020390e610bb253487a412e213c3a2cde Mon Sep 17 00:00:00 2001 From: Renovate Bot Date: Wed, 27 Apr 2022 22:06:59 +0000 Subject: [PATCH 05/13] chore(deps): update plugin com.diffplug.spotless to v6.5.1 --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index e7744557..d0ca8c35 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -7,7 +7,7 @@ import java.net.URL plugins { kotlin("multiplatform") version "1.6.20" - id("com.diffplug.spotless") version "6.4.1" + id("com.diffplug.spotless") version "6.5.1" id("maven-publish") id("com.vanniktech.maven.publish") version "0.19.0" id("org.jetbrains.kotlinx.binary-compatibility-validator") version "0.8.0" From a27bfb26f48f2857fcb7c9f3dccda9379eee481a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Thu, 28 Apr 2022 10:27:44 +0700 Subject: [PATCH 06/13] refactor(throttle): convert throttle to throttleTime --- .idea/misc.xml | 2 +- CHANGELOG.md | 1 - README.md | 1 - api/FlowExt.api | 4 +- gradle.properties | 2 - .../hoc081098/flowext/internal/AtomicRef.kt | 2 + .../kotlin/com/hoc081098/flowext/throttle.kt | 31 +- .../com/hoc081098/flowext/withLatestFrom.kt | 10 +- .../com/hoc081098/flowext/ThrottleTest.kt | 112 ++-- .../com/hoc081098/flowext/utils/BaseTest.kt | 8 +- .../hoc081098/flowext/internal/AtomicRef.kt | 9 +- .../hoc081098/flowext/internal/AtomicRef.kt | 2 + .../com/hoc081098/flowext/TakeUntilJvmTest.kt | 118 ++++ .../com/hoc081098/flowext/ThrottleJvmTest.kt | 632 ++++++++++++++++++ .../hoc081098/flowext/WithLatestFromTest.kt | 166 +++++ .../hoc081098/flowext/internal/AtomicRef.kt | 2 + 16 files changed, 1012 insertions(+), 90 deletions(-) create mode 100644 src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt create mode 100644 src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt create mode 100644 src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt diff --git a/.idea/misc.xml b/.idea/misc.xml index 46eeceb6..ee593b9c 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,7 +1,7 @@ - + diff --git a/CHANGELOG.md b/CHANGELOG.md index 6957e5fe..5ba3dc01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,6 @@ - Add `Symbol` class. - Add - - `Flow.throttle`. - `Flow.throttleTime`. - `Event.flatMap`. - `Event.valueOrDefault`. diff --git a/README.md b/README.md index 40eef348..0f0279e9 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,6 @@ - `retryWhenWithExponentialBackoff` - `retryWithExponentialBackoff` - [`takeUntil`](#takeUntil) - - `throttle` - `throttleTime` - [`withLatestFrom`](#withLatestFrom) diff --git a/api/FlowExt.api b/api/FlowExt.api index e1739f3f..0051200f 100644 --- a/api/FlowExt.api +++ b/api/FlowExt.api @@ -154,10 +154,10 @@ public final class com/hoc081098/flowext/ThrottleConfiguration : java/lang/Enum } public final class com/hoc081098/flowext/ThrottleKt { - public static final fun throttle (Lkotlinx/coroutines/flow/Flow;Lcom/hoc081098/flowext/ThrottleConfiguration;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; - public static synthetic fun throttle$default (Lkotlinx/coroutines/flow/Flow;Lcom/hoc081098/flowext/ThrottleConfiguration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun throttleTime (Lkotlinx/coroutines/flow/Flow;JLcom/hoc081098/flowext/ThrottleConfiguration;)Lkotlinx/coroutines/flow/Flow; + public static final fun throttleTime (Lkotlinx/coroutines/flow/Flow;Lcom/hoc081098/flowext/ThrottleConfiguration;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun throttleTime$default (Lkotlinx/coroutines/flow/Flow;JLcom/hoc081098/flowext/ThrottleConfiguration;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; + public static synthetic fun throttleTime$default (Lkotlinx/coroutines/flow/Flow;Lcom/hoc081098/flowext/ThrottleConfiguration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun throttleTime-8Mi8wO0 (Lkotlinx/coroutines/flow/Flow;JLcom/hoc081098/flowext/ThrottleConfiguration;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun throttleTime-8Mi8wO0$default (Lkotlinx/coroutines/flow/Flow;JLcom/hoc081098/flowext/ThrottleConfiguration;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; } diff --git a/gradle.properties b/gradle.properties index 2857a61f..d9840e89 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,4 @@ kotlin.code.style=official -kotlin.mpp.enableGranularSourceSetsMetadata=true -kotlin.native.enableDependencyPropagation=false kotlin.js.generate.executable.default=false GROUP=io.github.hoc081098 diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt index e1da2281..973087cd 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt @@ -26,4 +26,6 @@ package com.hoc081098.flowext.internal internal expect class AtomicRef(value: T) { var value: T + + fun compareAndSet(expect: T, update: T): Boolean } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt index c2e9efce..33585655 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt @@ -36,12 +36,13 @@ import kotlinx.coroutines.channels.onFailure import kotlinx.coroutines.channels.onSuccess import kotlinx.coroutines.channels.produce import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.take +import kotlinx.coroutines.launch import kotlinx.coroutines.selects.select import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds /** * Define leading and trailing behavior. @@ -123,10 +124,7 @@ private inline val ThrottleConfiguration.isTrailing: Boolean public fun Flow.throttleTime( duration: Duration, throttleConfiguration: ThrottleConfiguration = LEADING, -): Flow { - val timerFlow = timer(Unit, duration) - return throttle(throttleConfiguration) { timerFlow } -} +): Flow = throttleTime(throttleConfiguration) { duration } /** * Returns a [Flow] that emits a value from the source [Flow], then ignores subsequent source values @@ -181,10 +179,7 @@ public fun Flow.throttleTime( public fun Flow.throttleTime( timeMillis: Long, throttleConfiguration: ThrottleConfiguration = LEADING, -): Flow { - val timerFlow = timer(Unit, timeMillis) - return throttle(throttleConfiguration) { timerFlow } -} +): Flow = throttleTime(throttleConfiguration) { timeMillis.milliseconds } /** * Returns a [Flow] that emits a value from the source [Flow], then ignores subsequent source values @@ -236,9 +231,9 @@ public fun Flow.throttleTime( * ``` */ @ExperimentalCoroutinesApi -public fun Flow.throttle( +public fun Flow.throttleTime( throttleConfiguration: ThrottleConfiguration = LEADING, - durationSelector: (value: T) -> Flow, + durationSelector: (value: T) -> Duration, ): Flow = flow { val leading = throttleConfiguration.isLeading val trailing = throttleConfiguration.isTrailing @@ -292,9 +287,15 @@ public fun Flow.throttle( if (leading) { trySend() } - throttled = durationSelector(NULL_VALUE.unbox(value)) - .take(1) - .launchIn(scope) + throttled = when (val duration = durationSelector(NULL_VALUE.unbox(value))) { + Duration.ZERO -> { + if (trailing) { + trySend() + } + null + } + else -> scope.launch { delay(duration) } + } } .onFailure { it?.let { throw it } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt index fdbca4db..aa81d792 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt @@ -26,11 +26,11 @@ package com.hoc081098.flowext import com.hoc081098.flowext.internal.AtomicRef import com.hoc081098.flowext.utils.NULL_VALUE +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch /** * Merges two [Flow]s into one [Flow] by combining each value from self with the latest value from the second [Flow], if any. @@ -48,9 +48,9 @@ public fun Flow.withLatestFrom( try { coroutineScope { - other - .onEach { otherRef.value = it ?: NULL_VALUE } - .launchIn(this) + launch(start = CoroutineStart.UNDISPATCHED) { + other.collect { otherRef.value = it ?: NULL_VALUE } + } collect { value -> emit( diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/ThrottleTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/ThrottleTest.kt index 0abbfa80..15d3d0b7 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/ThrottleTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/ThrottleTest.kt @@ -39,11 +39,14 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.take +import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.advanceTimeBy import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertIs +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds @ExperimentalCoroutinesApi class ThrottleFirstTest : BaseTest() { @@ -116,7 +119,7 @@ class ThrottleFirstTest : BaseTest() { fun throttleWithCompletedAndNotDelay_C() = runTest { (1..10) .asFlow() - .throttle { emptyFlow() } + .throttleTime { Duration.ZERO } .test((1..10).map { Event.Value(it) } + Event.Complete) } @@ -172,7 +175,7 @@ class ThrottleFirstTest : BaseTest() { } @Test - fun throttleFailureUpstream() = runTest { + fun throttleFailureUpstream() = runTest(StandardTestDispatcher()) { flow { emit(1) throw TestException("Broken!") @@ -180,6 +183,8 @@ class ThrottleFirstTest : BaseTest() { assertIs(it.single().errorOrThrow()) } + println("-".repeat(50)) + flow { emit(1) delay(500) @@ -187,14 +192,16 @@ class ThrottleFirstTest : BaseTest() { delay(500) throw TestException("Broken!") }.throttleTime(200).test(null) { events -> - assertEquals(3, events.size) + assertEquals(3, events.size, "[size]") val (a, b, c) = events - assertEquals(1, a.valueOrThrow()) - assertEquals(2, b.valueOrThrow()) - assertIs(c.errorOrThrow()) + assertEquals(1, a.valueOrThrow(), "[1]") + assertEquals(2, b.valueOrThrow(), "[2]") + assertIs(c.errorOrThrow(), "[3]") } + println("-".repeat(50)) + flow { emit(1) // Should be published since it is first delay(100) @@ -202,6 +209,7 @@ class ThrottleFirstTest : BaseTest() { delay(100) // Should be published as soon as the timeout expires. throw TestException("Broken!") }.throttleTime(400).test(null) { events -> + println(events) assertEquals(2, events.size) val (a, b) = events @@ -214,7 +222,7 @@ class ThrottleFirstTest : BaseTest() { fun throttleFailureSelector() = runTest { (1..10) .asFlow() - .throttle { throw TestException("Broken!") } + .throttleTime { throw TestException("Broken!") } .test(null) { events -> assertEquals(2, events.size) val (a, b) = events @@ -230,11 +238,11 @@ class ThrottleFirstTest : BaseTest() { delay(400) emit(3) } - .throttle { + .throttleTime { when (it) { - 1 -> timer(Unit, 400) - 3 -> flow { throw TestException("1") } - else -> flow { throw TestException("2") } + 1 -> 400.milliseconds + 3 -> throw TestException("1") + else -> throw TestException("2") } } .test(null) { events -> @@ -258,14 +266,11 @@ class ThrottleFirstTest : BaseTest() { delay(600) emit(4) } - .throttle { + .throttleTime { when (it) { - 1 -> timer(Unit, 400) - 3 -> flow { - delay(500) - throw TestException("1") - } - else -> flow { throw TestException("2") } + 1 -> 400.milliseconds + 3 -> throw TestException("1") + else -> throw TestException("2") } } .test(null) { events -> @@ -304,24 +309,19 @@ class ThrottleFirstTest : BaseTest() { var count = 0 (1..10).asFlow() .onEach { delay(200) } - .throttle { + .throttleTime { if (count++ % 2 == 0) { - flow { throw CancellationException("") } + throw CancellationException("") } else { - timer(Unit, 500) + 500.milliseconds } } - .test( - listOf( - Event.Value(1), - Event.Value(2), - Event.Value(5), - Event.Value(6), - Event.Value(9), - Event.Value(10), - Event.Complete, - ) - ) + .test(null) { events -> + assertEquals(2, events.size) + val (a, b) = events + assertEquals(1, a.valueOrThrow()) + assertIs(b.errorOrThrow()) + } } } @@ -426,7 +426,7 @@ class ThrottleLastTest : BaseTest() { fun throttleWithCompletedAndNotDelay_C() = runTest { (1..10) .asFlow() - .throttle(TRAILING) { emptyFlow() } + .throttleTime(TRAILING) { Duration.ZERO } .test((1..10).map { Event.Value(it) } + Event.Complete) } @@ -526,7 +526,7 @@ class ThrottleLastTest : BaseTest() { fun throttleFailureSelector() = runTest { (1..10) .asFlow() - .throttle(TRAILING) { throw TestException("Broken!") } + .throttleTime(TRAILING) { throw TestException("Broken!") } .test(null) { events -> assertIs(events.single().errorOrThrow()) } @@ -541,11 +541,11 @@ class ThrottleLastTest : BaseTest() { delay(400) emit(3) } - .throttle(TRAILING) { + .throttleTime(TRAILING) { when (it) { - 1 -> timer(Unit, 400) - 3 -> flow { throw TestException("1") } - else -> flow { throw TestException("2") } + 1 -> 400.milliseconds + 3 -> throw TestException("1") + else -> throw TestException("2") } } .test(null) { events -> @@ -571,14 +571,11 @@ class ThrottleLastTest : BaseTest() { delay(600) emit(4) } - .throttle(TRAILING) { + .throttleTime(TRAILING) { when (it) { - 1 -> timer(Unit, 400) - 3 -> flow { - delay(500) - throw TestException("1") - } - else -> flow { throw TestException("2") } + 1 -> 400.milliseconds + 3 -> throw TestException("1") + else -> throw TestException("2") } } .test(null) { events -> @@ -615,26 +612,21 @@ class ThrottleLastTest : BaseTest() { fun throttleCancellation() = runTest { // --1--2--3--4--5--6--7--8--9--10 - var count = 0 + var count = 1 (1..10).asFlow() .onEach { delay(200) } - .throttle(TRAILING) { + .throttleTime(TRAILING) { if (count++ % 2 == 0) { - flow { throw CancellationException("") } + throw CancellationException("") } else { - timer(Unit, 500) + 500.milliseconds } } - .test( - listOf( - Event.Value(1), - Event.Value(4), - Event.Value(5), - Event.Value(8), - Event.Value(9), - Event.Value(10), - Event.Complete, - ) - ) + .test(null) { events -> + assertEquals(2, events.size) + val (a, b) = events + assertEquals(3, a.valueOrThrow()) + assertIs(b.errorOrThrow()) + } } } diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/utils/BaseTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/utils/BaseTest.kt index 2a1448e8..9c150b98 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/utils/BaseTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/utils/BaseTest.kt @@ -25,15 +25,19 @@ package com.hoc081098.flowext.utils import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.TestDispatcher import kotlinx.coroutines.test.TestResult import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.UnconfinedTestDispatcher abstract class BaseTest { @ExperimentalCoroutinesApi - protected fun runTest(testBody: suspend TestScope.() -> Unit): TestResult { + protected fun runTest( + testDispatcher: TestDispatcher? = null, + testBody: suspend TestScope.() -> Unit + ): TestResult { return kotlinx.coroutines.test.runTest( - context = UnconfinedTestDispatcher(name = "${this::class.simpleName}-dispatchers"), + context = testDispatcher ?: UnconfinedTestDispatcher(name = "${this::class.simpleName}-dispatcher"), testBody = testBody, ) } diff --git a/src/jsMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt b/src/jsMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt index 8a7e5e0c..c83946ad 100644 --- a/src/jsMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt +++ b/src/jsMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt @@ -24,4 +24,11 @@ package com.hoc081098.flowext.internal -internal actual class AtomicRef actual constructor(actual var value: T) +internal actual class AtomicRef actual constructor(actual var value: T) { + actual fun compareAndSet(expect: T, update: T): Boolean = if (expect == value) { + value = update + true + } else { + false + } +} diff --git a/src/jvmMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt b/src/jvmMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt index 441b18f6..0cc53e39 100644 --- a/src/jvmMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt +++ b/src/jvmMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt @@ -32,4 +32,6 @@ internal actual class AtomicRef actual constructor(value: T) { actual var value: T get() = atomic.get() set(value) = atomic.set(value) + + actual fun compareAndSet(expect: T, update: T): Boolean = atomic.compareAndSet(expect, update) } diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt new file mode 100644 index 00000000..23e8c1c9 --- /dev/null +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt @@ -0,0 +1,118 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.test +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.runBlocking +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +@InternalCoroutinesApi +@ExperimentalCoroutinesApi +class TakeUntilJvmTest { + @Test + fun takeUntilSingle() = runBlocking { + range(0, 10) + .takeUntil(flowOf(1)) + .test(listOf(Event.Complete)) + + flowOf(1) + .takeUntil(flowOf(1)) + .test(listOf(Event.Complete)) + } + + @Test + fun sourceCompletesAfterNotifier() = runBlocking { + range(0, 10) + .onEach { delay(100) } + .onCompletion { println(it) } + .takeUntil(timer(Unit, 470.milliseconds)) + .test( + listOf( + Event.Value(0), + Event.Value(1), + Event.Value(2), + Event.Value(3), + Event.Complete, + ) + ) + } + + @Test + fun sourceCompletesBeforeNotifier() = runBlocking { + range(0, 10) + .onEach { delay(30) } + .takeUntil(timer(Unit, 10.seconds)) + .test( + (0 until 10).map { Event.Value(it) } + + Event.Complete + ) + } + + @Test + fun upstreamError() = runBlocking { + flow { throw TestException() } + .takeUntil(timer(Unit, 100)) + .test(null) { + assertIs(it.single().errorOrThrow()) + } + + flow { + emit(1) + throw TestException() + } + .takeUntil(timer(Unit, 100)) + .test(null) { + assertEquals(2, it.size) + assertEquals(1, it[0].valueOrThrow()) + assertIs(it[1].errorOrThrow()) + } + } + + @Test + fun take() = runBlocking { + flowOf(1, 2, 3) + .takeUntil(timer(Unit, 100)) + .take(1) + .test( + listOf( + Event.Value(1), + Event.Complete, + ) + ) + } +} diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt new file mode 100644 index 00000000..4a41086b --- /dev/null +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt @@ -0,0 +1,632 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.ThrottleConfiguration.TRAILING +import com.hoc081098.flowext.utils.BaseTest +import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.test +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertIs +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +@ExperimentalCoroutinesApi +class ThrottleFirstJvmTest : BaseTest() { + @Test + fun throttleWithCompleted_A() = runTest { + (1..10) + .asFlow() + .onEach { delay(200) } + .throttleTime(500) + .test( + listOf( + Event.Value(1), + Event.Value(4), + Event.Value(7), + Event.Value(10), + Event.Complete, + ) + ) + } + + @Test + fun throttleWithCompleted_B() = runTest { + flow { + emit(1) // deliver + emit(2) // skip + delay(501) // 501 + + emit(3) // deliver + delay(99) // 600 + + emit(4) // skip + delay(100) // 700 + + emit(5) // skip + emit(6) // skip + delay(301) // 1001 + + emit(7) // deliver + delay(400) // 1501 + } + .throttleTime(500) + .test( + listOf( + Event.Value(1), + Event.Value(3), + Event.Value(7), + Event.Complete, + ) + ) + } + + @Test + fun throttleWithCompletedAndNotDelay_A() = runTest { + (1..10) + .asFlow() + .onEach { delay(200) } + .throttleTime(0) + .test((1..10).map { Event.Value(it) } + Event.Complete) + } + + @Test + fun throttleWithCompletedAndNotDelay_B() = runTest { + (1..10) + .asFlow() + .throttleTime(0) + .test((1..10).map { Event.Value(it) } + Event.Complete) + } + + @Test + fun throttleWithCompletedAndNotDelay_C() = runTest { + (1..10) + .asFlow() + .throttleTime { Duration.ZERO } + .test((1..10).map { Event.Value(it) } + Event.Complete) + } + + @Test + fun throttleNullableWithCompleted() = runTest { + (1..10) + .asFlow() + .onEach { delay(200) } + .map { v -> v.takeIf { it % 2 == 0 } } + .throttleTime(500) + .test( + listOf( + Event.Value(null), + Event.Value(4), + Event.Value(null), + Event.Value(10), + Event.Complete, + ) + ) + } + + @Test + fun throttleSingleFlow() = runTest { + flowOf(1) + .throttleTime(100) + .test( + listOf( + Event.Value(1), + Event.Complete, + ) + ) + } + + @Test + fun throttleEmptyFlow() = runTest { + emptyFlow() + .throttleTime(100) + .test(listOf(Event.Complete)) + } + + @Test + fun throttleNeverFlow() = runTest { + var hasValue = false + + val job = neverFlow() + .throttleTime(100) + .onEach { hasValue = true } + .launchIn(this) + advanceTimeBy(1000) + job.cancel() + + assertFalse(hasValue) + } + + @Test + fun throttleFailureUpstream() = runTest(StandardTestDispatcher()) { + flow { + emit(1) + throw TestException("Broken!") + }.throttleTime(200).test(null) { + assertIs(it.single().errorOrThrow()) + } + + println("-".repeat(50)) + + flow { + emit(1) + delay(500) + emit(2) + delay(500) + throw TestException("Broken!") + }.throttleTime(200).test(null) { events -> + assertEquals(3, events.size, "[size]") + val (a, b, c) = events + + assertEquals(1, a.valueOrThrow(), "[1]") + assertEquals(2, b.valueOrThrow(), "[2]") + assertIs(c.errorOrThrow(), "[3]") + } + + println("-".repeat(50)) + + flow { + emit(1) // Should be published since it is first + delay(100) + emit(2) // Should be skipped since error will arrive before the timeout expires + delay(100) // Should be published as soon as the timeout expires. + throw TestException("Broken!") + }.throttleTime(400).test(null) { events -> + println(events) + assertEquals(2, events.size) + val (a, b) = events + + assertEquals(1, a.valueOrThrow()) + assertIs(b.errorOrThrow()) + } + } + + @Test + fun throttleFailureSelector() = runTest { + (1..10) + .asFlow() + .throttleTime { throw TestException("Broken!") } + .test(null) { events -> + assertEquals(2, events.size) + val (a, b) = events + + assertEquals(1, a.valueOrThrow()) + assertIs(b.errorOrThrow()) + } + + flow { + emit(1) + delay(100) + emit(2) + delay(400) + emit(3) + } + .throttleTime { + when (it) { + 1 -> 400.milliseconds + 3 -> throw TestException("1") + else -> throw TestException("2") + } + } + .test(null) { events -> + assertEquals(3, events.size) + val (a, b, c) = events + + assertEquals(1, a.valueOrThrow()) + assertEquals(3, b.valueOrThrow()) + assertEquals( + "1", + assertIs(c.errorOrThrow()).message + ) + } + + flow { + emit(1) + delay(100) + emit(2) + delay(400) + emit(3) + delay(600) + emit(4) + } + .throttleTime { + when (it) { + 1 -> 400.milliseconds + 3 -> throw TestException("1") + else -> throw TestException("2") + } + } + .test(null) { events -> + assertEquals(3, events.size) + val (a, b, c) = events + + assertEquals(1, a.valueOrThrow()) + assertEquals(3, b.valueOrThrow()) + assertEquals( + "1", + assertIs(c.errorOrThrow()).message + ) + } + } + + @Test + fun throttleTake() = runTest { + (1..10) + .asFlow() + .onEach { delay(200) } + .throttleTime(500) + .take(1) + .test(listOf(Event.Value(1), Event.Complete)) + + (1..10) + .asFlow() + .onEach { delay(200) } + .concatWith(flow { throw TestException() }) + .throttleTime(500) + .take(1) + .test(listOf(Event.Value(1), Event.Complete)) + } + + @Test + fun throttleCancellation() = runTest { + var count = 0 + (1..10).asFlow() + .onEach { delay(200) } + .throttleTime { + if (count++ % 2 == 0) { + throw CancellationException("") + } else { + 500.milliseconds + } + } + .test(null) { events -> + assertEquals(2, events.size) + val (a, b) = events + assertEquals(1, a.valueOrThrow()) + assertIs(b.errorOrThrow()) + } + } +} + +@ExperimentalCoroutinesApi +class ThrottleLastJvmTest : BaseTest() { + @Test + fun throttleWithCompleted_A() = runTest { + flow { + // -1---2----3- + // -@-----!--@-----! + // -------2--------3 + + delay(100) + emit(1) + delay(300) + emit(2) + delay(400) + emit(3) + delay(100) + } + .throttleTime(500, TRAILING) + .test( + listOf( + Event.Value(2), + Event.Value(3), + Event.Complete, + ) + ) + } + + @Test + fun throttleWithCompleted_B() = runTest { + flow { + // -1---2----3----4 + // -@-----!--@-----! + // -------2--------4 + + delay(100) + emit(1) + delay(300) + emit(2) + delay(400) + emit(3) + delay(450) + emit(4) + } + .throttleTime(500, TRAILING) + .test( + listOf( + Event.Value(2), + Event.Value(4), + Event.Complete, + ) + ) + } + + @Test + fun throttleWithCompleted_C() = runTest { + flow { + // -1---2----3------4| + // -@-----!--@-----! + // -------2--------3 4 + + delay(100) + emit(1) + delay(300) + emit(2) + delay(400) + emit(3) + delay(550) + emit(4) + } + .throttleTime(500, TRAILING) + .test( + listOf( + Event.Value(2), + Event.Value(3), + Event.Value(4), + Event.Complete, + ) + ) + } + + @Test + fun throttleWithCompletedAndNotDelay_A() = runTest { + (1..10) + .asFlow() + .onEach { delay(200) } + .throttleTime(0, TRAILING) + .test((1..10).map { Event.Value(it) } + Event.Complete) + } + + @Test + fun throttleWithCompletedAndNotDelay_B() = runTest { + (1..10) + .asFlow() + .throttleTime(0, TRAILING) + .test((1..10).map { Event.Value(it) } + Event.Complete) + } + + @Test + fun throttleWithCompletedAndNotDelay_C() = runTest { + (1..10) + .asFlow() + .throttleTime(TRAILING) { Duration.ZERO } + .test((1..10).map { Event.Value(it) } + Event.Complete) + } + + @Test + fun throttleNullableWithCompleted() = runTest { + (1..10) + .asFlow() + .onEach { delay(200) } + .map { v -> v.takeIf { it % 2 == 0 } } + .throttleTime(500, TRAILING) + .test( + listOf( + Event.Value(null), + Event.Value(6), + Event.Value(null), + Event.Value(10), + Event.Complete, + ) + ) + } + + @Test + fun throttleSingleFlow() = runTest { + flowOf(1) + .throttleTime(100, TRAILING) + .test( + listOf( + Event.Value(1), + Event.Complete, + ) + ) + } + + @Test + fun throttleEmptyFlow() = runTest { + emptyFlow() + .throttleTime(100, TRAILING) + .test(listOf(Event.Complete)) + } + + @Test + fun throttleNeverFlow() = runTest { + var hasValue = false + + val job = neverFlow() + .throttleTime(100, TRAILING) + .onEach { hasValue = true } + .launchIn(this) + advanceTimeBy(1000) + job.cancel() + + assertFalse(hasValue) + } + + @Test + fun throttleFailureUpstream() = runTest { + flow { + emit(1) + throw TestException("Broken!") + }.throttleTime(200, TRAILING).test(null) { + assertIs(it.single().errorOrThrow()) + } + + flow { + // 1-----2----X + // --1 --2 + + emit(1) + delay(500) + emit(2) + delay(500) + throw TestException("Broken!") + }.throttleTime(200, TRAILING).test(null) { events -> + assertEquals(3, events.size) + val (a, b, c) = events + + assertEquals(1, a.valueOrThrow()) + assertEquals(2, b.valueOrThrow()) + assertIs(c.errorOrThrow()) + } + + flow { + // 1-2-X + // ----X + + emit(1) + delay(100) + emit(2) + delay(100) + throw TestException("Broken!") + }.throttleTime(400, TRAILING).test(null) { events -> + assertIs(events.single().errorOrThrow()) + } + } + + @Test + fun throttleFailureSelector() = runTest { + (1..10) + .asFlow() + .throttleTime(TRAILING) { throw TestException("Broken!") } + .test(null) { events -> + assertIs(events.single().errorOrThrow()) + } + + flow { + // 1-2----3 + // ----2 X + + emit(1) + delay(100) + emit(2) + delay(400) + emit(3) + } + .throttleTime(TRAILING) { + when (it) { + 1 -> 400.milliseconds + 3 -> throw TestException("1") + else -> throw TestException("2") + } + } + .test(null) { events -> + assertEquals(2, events.size) + val (a, b) = events + + assertEquals(2, a.valueOrThrow()) + assertEquals( + "1", + assertIs(b.errorOrThrow()).message + ) + } + + flow { + // 1-2----3------4 + // ----2 -----X + + emit(1) + delay(100) + emit(2) + delay(400) + emit(3) + delay(600) + emit(4) + } + .throttleTime(TRAILING) { + when (it) { + 1 -> 400.milliseconds + 3 -> throw TestException("1") + else -> throw TestException("2") + } + } + .test(null) { events -> + assertEquals(2, events.size) + val (a, b) = events + + assertEquals(2, a.valueOrThrow()) + assertEquals( + "1", + assertIs(b.errorOrThrow()).message + ) + } + } + + @Test + fun throttleTake() = runTest { + (1..10) + .asFlow() + .onEach { delay(200) } + .throttleTime(500, TRAILING) + .take(1) + .test(listOf(Event.Value(3), Event.Complete)) + + (1..10) + .asFlow() + .onEach { delay(200) } + .concatWith(flow { throw TestException() }) + .throttleTime(500, TRAILING) + .take(1) + .test(listOf(Event.Value(3), Event.Complete)) + } + + @Test + fun throttleCancellation() = runTest { + // --1--2--3--4--5--6--7--8--9--10 + + var count = 1 + (1..10).asFlow() + .onEach { delay(200) } + .throttleTime(TRAILING) { + if (count++ % 2 == 0) { + throw CancellationException("") + } else { + 500.milliseconds + } + } + .test(null) { events -> + assertEquals(2, events.size) + val (a, b) = events + assertEquals(3, a.valueOrThrow()) + assertIs(b.errorOrThrow()) + } + } +} diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt new file mode 100644 index 00000000..563d804f --- /dev/null +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt @@ -0,0 +1,166 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.test +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +@InternalCoroutinesApi +@ExperimentalCoroutinesApi +class WithLatestFromJvmTest { + @Test + fun basic() = runBlocking { + withContext( + Executors.newSingleThreadExecutor { + Thread(it).apply { + name = "My thrad" + } + }.asCoroutineDispatcher() + ) { + println("1" + Thread.currentThread()) + + flowOf(1).withLatestFrom( + flow { + println("2" + Thread.currentThread()) + emit(2) + }.flowOn( + Executors.newSingleThreadExecutor { + Thread(it).apply { + name = "Hihi" + } + }.asCoroutineDispatcher() + ) + ).collect { + println("3" + Thread.currentThread()) + println(it) + } + } + error("") + + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + assertEquals( + f2.withLatestFrom(f1).toList(), + listOf( + "a" to 4, + "b" to 4, + "c" to 4, + "d" to 4, + "e" to 4, + ) + ) + } + + @Test + fun basicWithNull() = runBlocking { + val f1 = flowOf(1, 2, 3, 4, null) + val f2 = flowOf("a", "b", "c", "d", "e") + assertEquals( + f2.withLatestFrom(f1).toList(), + listOf( + "a" to null, + "b" to null, + "c" to null, + "d" to null, + "e" to null, + ) + ) + } + + @Test + fun basic2() = runBlocking { + val f1 = flowOf(1, 2, 3, 4).onEach { delay(300) } + val f2 = flowOf("a", "b", "c", "d", "e").onEach { delay(100) } + assertEquals( + f2.withLatestFrom(f1).toList(), + listOf( + "c" to 1, + "d" to 1, + "e" to 1, + ) + ) + } + + @Test + fun testWithLatestFrom_failureUpStream() = runBlocking { + assertFailsWith { + flow { throw TestException() } + .withLatestFrom(neverFlow()) + .collect() + } + + assertFailsWith { + neverFlow() + .withLatestFrom(flow { throw TestException() }) + .collect() + } + + Unit + } + + @Test + fun testWithLatestFrom_cancellation() = runBlocking { + assertFailsWith { + flow { + emit(1) + throw CancellationException("") + } + .withLatestFrom(emptyFlow()) + .collect() + } + + flowOf(1) + .withLatestFrom( + flow { + emit(2) + throw CancellationException("") + } + ) + .test( + listOf( + Event.Value(1 to 2), + Event.Complete + ) + ) + } +} diff --git a/src/nativeMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt b/src/nativeMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt index 0df0a4be..bb4f59bb 100644 --- a/src/nativeMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt +++ b/src/nativeMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt @@ -30,4 +30,6 @@ internal actual class AtomicRef actual constructor(value: T) { private val atomic = NativeAtomicReference(value) actual var value: T by atomic::value + + actual fun compareAndSet(expect: T, update: T): Boolean = atomic.compareAndSet(expect, update) } From 95fd3ac54d274247d71b402e99f986cfb1c8ffcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Thu, 28 Apr 2022 10:38:42 +0700 Subject: [PATCH 07/13] refactor(throttle): convert throttle to throttleTime --- .../kotlin/com/hoc081098/flowext/throttle.kt | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt index 33585655..77d52a57 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt @@ -303,10 +303,12 @@ public fun Flow.throttleTime( // Once the original flow has completed, there may still be a pending value // waiting to be emitted. If so, wait for the throttling window to end and then // send it. That will complete this throttled flow. - if (trailing && throttled != null && lastValue != null) { - throttled!!.join() - throttled = null - trySend() + if (trailing && lastValue != null) { + throttled?.run { + throttled = null + join() + trySend() + } } lastValue = DONE_VALUE From e5f3d4d07c8393dc504f23751fc794c59245c295 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Thu, 28 Apr 2022 13:49:51 +0700 Subject: [PATCH 08/13] refactor --- .../kotlin/com/hoc081098/flowext/mapTo.kt | 1 + .../hoc081098/flowext/WithLatestFromTest.kt | 46 +++++++++---------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/mapTo.kt b/src/commonMain/kotlin/com/hoc081098/flowext/mapTo.kt index 8393ba18..61b099f5 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/mapTo.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/mapTo.kt @@ -36,4 +36,5 @@ import kotlinx.coroutines.flow.transform public inline fun Flow.mapTo(value: R): Flow = transform { return@transform emit(value) } +@Suppress("NOTHING_TO_INLINE") public inline fun Flow.mapToUnit(): Flow = mapTo(Unit) diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt index 563d804f..aaf77f1e 100644 --- a/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt @@ -50,32 +50,32 @@ import kotlin.test.assertFailsWith class WithLatestFromJvmTest { @Test fun basic() = runBlocking { - withContext( - Executors.newSingleThreadExecutor { - Thread(it).apply { - name = "My thrad" - } - }.asCoroutineDispatcher() - ) { - println("1" + Thread.currentThread()) + val context1 = Executors.newSingleThreadExecutor { + Thread(it).apply { + name = "My thread 1" + } + }.asCoroutineDispatcher() - flowOf(1).withLatestFrom( - flow { - println("2" + Thread.currentThread()) - emit(2) - }.flowOn( - Executors.newSingleThreadExecutor { - Thread(it).apply { - name = "Hihi" - } - }.asCoroutineDispatcher() - ) - ).collect { - println("3" + Thread.currentThread()) - println(it) + val context2 = Executors.newSingleThreadExecutor { + Thread(it).apply { + name = "My thread 2" } + }.asCoroutineDispatcher() + + withContext(context1) { + flowOf(1) + .withLatestFrom(flow { emit(2) }.flowOn(context2)) + .onEach { + println("3" + Thread.currentThread()) + println(it) + } + .test( + listOf( + Event.Value(1 to 2), + Event.Complete + ) + ) } - error("") val f1 = flowOf(1, 2, 3, 4) val f2 = flowOf("a", "b", "c", "d", "e") From fb3fb91b5823482ca19fef08661dc1f465850a3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Thu, 28 Apr 2022 14:00:19 +0700 Subject: [PATCH 09/13] kotlin 1.6.21, coroutines 1.6.1 --- .idea/misc.xml | 2 +- CHANGELOG.md | 3 ++- build.gradle.kts | 14 ++++++++++---- .../kotlin/com/hoc081098/flowext/throttle.kt | 6 +++--- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index ee593b9c..46eeceb6 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,7 +1,7 @@ - + diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ba3dc01..11aabff6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ ## 0.3.0 - Update - - `Kotlin` to `1.6.20`. + - `Kotlin` to `1.6.21`. + - `KotlinX Coroutines` to `1.6.1`. - `Gradle` to `7.4.2`. - Refactor `withLatestFrom`'s implementation. diff --git a/build.gradle.kts b/build.gradle.kts index d0ca8c35..ff7631b7 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,16 +6,16 @@ import org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType import java.net.URL plugins { - kotlin("multiplatform") version "1.6.20" + kotlin("multiplatform") version "1.6.21" id("com.diffplug.spotless") version "6.5.1" id("maven-publish") id("com.vanniktech.maven.publish") version "0.19.0" - id("org.jetbrains.kotlinx.binary-compatibility-validator") version "0.8.0" - id("org.jetbrains.dokka") version "1.6.10" + id("org.jetbrains.kotlinx.binary-compatibility-validator") version "0.9.0" + id("org.jetbrains.dokka") version "1.6.21" id("org.jetbrains.kotlinx.kover") version "0.5.0" } -val coroutinesVersion = "1.6.0" +val coroutinesVersion = "1.6.1" val ktlintVersion = "0.44.0" repositories { @@ -218,6 +218,12 @@ tasks.withType().configureEach { externalDocumentationLink { url.set(URL("https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/")) } + + sourceLink { + localDirectory.set(file("src/commonMain/kotlin")) + remoteUrl.set(URL("https://github.com/hoc081098/FlowExt/tree/master/src/commonMain/kotlin")) + remoteLineSuffix.set("#L") + } } } } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt index 77d52a57..81d448bd 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt @@ -191,7 +191,7 @@ public fun Flow.throttleTime( * (1..10) * .asFlow() * .onEach { delay(200) } - * .throttle { timer(Unit, 500) } + * .throttleTime { 500.milliseconds } * ``` * * produces the following emissions @@ -206,7 +206,7 @@ public fun Flow.throttleTime( * (1..10) * .asFlow() * .onEach { delay(200) } - * .throttle(TRAILING) { timer(Unit, 500) } + * .throttleTime(TRAILING) { 500.milliseconds } * ``` * * produces the following emissions @@ -221,7 +221,7 @@ public fun Flow.throttleTime( * (1..10) * .asFlow() * .onEach { delay(200) } - * .throttle(LEADING_AND_TRAILING) { timer(Unit, 500) } + * .throttleTime(LEADING_AND_TRAILING) { 500.milliseconds } * ``` * * produces the following emissions From c3944b197f33ddf71150673e847c244b6f76ad9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Thu, 28 Apr 2022 14:06:13 +0700 Subject: [PATCH 10/13] refactor(throttle) --- .../kotlin/com/hoc081098/flowext/throttle.kt | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt index 81d448bd..e42bbbd7 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt @@ -287,15 +287,9 @@ public fun Flow.throttleTime( if (leading) { trySend() } - throttled = when (val duration = durationSelector(NULL_VALUE.unbox(value))) { - Duration.ZERO -> { - if (trailing) { - trySend() - } - null - } - else -> scope.launch { delay(duration) } - } + + val duration = durationSelector(NULL_VALUE.unbox(value)) + throttled = scope.launch { delay(duration) } } .onFailure { it?.let { throw it } From c97304206f7ca5dd09a44dd08f3e0affd6be0d89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Sun, 1 May 2022 12:52:54 +0700 Subject: [PATCH 11/13] fix test --- .../com/hoc081098/flowext/ThrottleJvmTest.kt | 70 ++++++++++--------- ...stFromTest.kt => WithLatestFromJvmTest.kt} | 31 -------- 2 files changed, 37 insertions(+), 64 deletions(-) rename src/jvmTest/kotlin/com/hoc081098/flowext/{WithLatestFromTest.kt => WithLatestFromJvmTest.kt} (82%) diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt index 4a41086b..c25bf5e7 100644 --- a/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt @@ -25,22 +25,24 @@ package com.hoc081098.flowext import com.hoc081098.flowext.ThrottleConfiguration.TRAILING -import com.hoc081098.flowext.utils.BaseTest import com.hoc081098.flowext.utils.TestException import com.hoc081098.flowext.utils.test import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.delay import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.take +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.StandardTestDispatcher -import kotlinx.coroutines.test.advanceTimeBy +import java.util.concurrent.Executors import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse @@ -49,9 +51,9 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds @ExperimentalCoroutinesApi -class ThrottleFirstJvmTest : BaseTest() { +class ThrottleFirstJvmTest { @Test - fun throttleWithCompleted_A() = runTest { + fun throttleWithCompleted_A() = runBlocking { (1..10) .asFlow() .onEach { delay(200) } @@ -68,7 +70,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleWithCompleted_B() = runTest { + fun throttleWithCompleted_B() = runBlocking { flow { emit(1) // deliver emit(2) // skip @@ -99,7 +101,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleWithCompletedAndNotDelay_A() = runTest { + fun throttleWithCompletedAndNotDelay_A() = runBlocking { (1..10) .asFlow() .onEach { delay(200) } @@ -108,7 +110,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleWithCompletedAndNotDelay_B() = runTest { + fun throttleWithCompletedAndNotDelay_B() = runBlocking { (1..10) .asFlow() .throttleTime(0) @@ -116,7 +118,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleWithCompletedAndNotDelay_C() = runTest { + fun throttleWithCompletedAndNotDelay_C() = runBlocking { (1..10) .asFlow() .throttleTime { Duration.ZERO } @@ -124,7 +126,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleNullableWithCompleted() = runTest { + fun throttleNullableWithCompleted() = runBlocking { (1..10) .asFlow() .onEach { delay(200) } @@ -142,7 +144,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleSingleFlow() = runTest { + fun throttleSingleFlow() = runBlocking { flowOf(1) .throttleTime(100) .test( @@ -154,28 +156,29 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleEmptyFlow() = runTest { + fun throttleEmptyFlow() = runBlocking { emptyFlow() .throttleTime(100) .test(listOf(Event.Complete)) } @Test - fun throttleNeverFlow() = runTest { + fun throttleNeverFlow() = runBlocking { var hasValue = false val job = neverFlow() .throttleTime(100) .onEach { hasValue = true } + .flowOn(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) .launchIn(this) - advanceTimeBy(1000) + delay(1000) job.cancel() assertFalse(hasValue) } @Test - fun throttleFailureUpstream() = runTest(StandardTestDispatcher()) { + fun throttleFailureUpstream() = runBlocking(StandardTestDispatcher()) { flow { emit(1) throw TestException("Broken!") @@ -219,7 +222,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleFailureSelector() = runTest { + fun throttleFailureSelector() = runBlocking { (1..10) .asFlow() .throttleTime { throw TestException("Broken!") } @@ -287,7 +290,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleTake() = runTest { + fun throttleTake() = runBlocking { (1..10) .asFlow() .onEach { delay(200) } @@ -305,7 +308,7 @@ class ThrottleFirstJvmTest : BaseTest() { } @Test - fun throttleCancellation() = runTest { + fun throttleCancellation() = runBlocking { var count = 0 (1..10).asFlow() .onEach { delay(200) } @@ -326,9 +329,9 @@ class ThrottleFirstJvmTest : BaseTest() { } @ExperimentalCoroutinesApi -class ThrottleLastJvmTest : BaseTest() { +class ThrottleLastJvmTest { @Test - fun throttleWithCompleted_A() = runTest { + fun throttleWithCompleted_A() = runBlocking { flow { // -1---2----3- // -@-----!--@-----! @@ -353,7 +356,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleWithCompleted_B() = runTest { + fun throttleWithCompleted_B() = runBlocking { flow { // -1---2----3----4 // -@-----!--@-----! @@ -379,7 +382,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleWithCompleted_C() = runTest { + fun throttleWithCompleted_C() = runBlocking { flow { // -1---2----3------4| // -@-----!--@-----! @@ -406,7 +409,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleWithCompletedAndNotDelay_A() = runTest { + fun throttleWithCompletedAndNotDelay_A() = runBlocking { (1..10) .asFlow() .onEach { delay(200) } @@ -415,7 +418,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleWithCompletedAndNotDelay_B() = runTest { + fun throttleWithCompletedAndNotDelay_B() = runBlocking { (1..10) .asFlow() .throttleTime(0, TRAILING) @@ -423,7 +426,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleWithCompletedAndNotDelay_C() = runTest { + fun throttleWithCompletedAndNotDelay_C() = runBlocking { (1..10) .asFlow() .throttleTime(TRAILING) { Duration.ZERO } @@ -431,7 +434,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleNullableWithCompleted() = runTest { + fun throttleNullableWithCompleted() = runBlocking { (1..10) .asFlow() .onEach { delay(200) } @@ -449,7 +452,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleSingleFlow() = runTest { + fun throttleSingleFlow() = runBlocking { flowOf(1) .throttleTime(100, TRAILING) .test( @@ -461,28 +464,29 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleEmptyFlow() = runTest { + fun throttleEmptyFlow() = runBlocking { emptyFlow() .throttleTime(100, TRAILING) .test(listOf(Event.Complete)) } @Test - fun throttleNeverFlow() = runTest { + fun throttleNeverFlow() = runBlocking { var hasValue = false val job = neverFlow() .throttleTime(100, TRAILING) .onEach { hasValue = true } + .flowOn(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) .launchIn(this) - advanceTimeBy(1000) + delay(1000) job.cancel() assertFalse(hasValue) } @Test - fun throttleFailureUpstream() = runTest { + fun throttleFailureUpstream() = runBlocking { flow { emit(1) throw TestException("Broken!") @@ -523,7 +527,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleFailureSelector() = runTest { + fun throttleFailureSelector() = runBlocking { (1..10) .asFlow() .throttleTime(TRAILING) { throw TestException("Broken!") } @@ -591,7 +595,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleTake() = runTest { + fun throttleTake() = runBlocking { (1..10) .asFlow() .onEach { delay(200) } @@ -609,7 +613,7 @@ class ThrottleLastJvmTest : BaseTest() { } @Test - fun throttleCancellation() = runTest { + fun throttleCancellation() = runBlocking { // --1--2--3--4--5--6--7--8--9--10 var count = 1 diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromJvmTest.kt similarity index 82% rename from src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt rename to src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromJvmTest.kt index aaf77f1e..d514c774 100644 --- a/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromJvmTest.kt @@ -29,18 +29,14 @@ import com.hoc081098.flowext.utils.test import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import java.util.concurrent.Executors import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -50,33 +46,6 @@ import kotlin.test.assertFailsWith class WithLatestFromJvmTest { @Test fun basic() = runBlocking { - val context1 = Executors.newSingleThreadExecutor { - Thread(it).apply { - name = "My thread 1" - } - }.asCoroutineDispatcher() - - val context2 = Executors.newSingleThreadExecutor { - Thread(it).apply { - name = "My thread 2" - } - }.asCoroutineDispatcher() - - withContext(context1) { - flowOf(1) - .withLatestFrom(flow { emit(2) }.flowOn(context2)) - .onEach { - println("3" + Thread.currentThread()) - println(it) - } - .test( - listOf( - Event.Value(1 to 2), - Event.Complete - ) - ) - } - val f1 = flowOf(1, 2, 3, 4) val f2 = flowOf("a", "b", "c", "d", "e") assertEquals( From a713ef2146387241d1a58bd6e30c9497684fea9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Sun, 1 May 2022 19:47:44 +0700 Subject: [PATCH 12/13] fix throttle --- .../kotlin/com/hoc081098/flowext/throttle.kt | 22 +++++++++++-------- .../com/hoc081098/flowext/ThrottleJvmTest.kt | 3 +-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt index e42bbbd7..d05b6e51 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt @@ -262,18 +262,20 @@ public fun Flow.throttleTime( } } + val onWindowClosed = suspend { + throttled = null + + if (trailing) { + trySend() + } + } + // Now consume the values until the original flow is complete. while (lastValue !== DONE_VALUE) { // wait for the next value select { // When a throttling window ends, send the value if there is a pending value. - throttled?.onJoin?.invoke { - throttled = null - - if (trailing) { - trySend() - } - } + throttled?.onJoin?.invoke(onWindowClosed) values.onReceiveCatching { result -> result @@ -288,8 +290,10 @@ public fun Flow.throttleTime( trySend() } - val duration = durationSelector(NULL_VALUE.unbox(value)) - throttled = scope.launch { delay(duration) } + when (val duration = durationSelector(NULL_VALUE.unbox(value))) { + Duration.ZERO -> onWindowClosed() + else -> throttled = scope.launch { delay(duration) } + } } .onFailure { it?.let { throw it } diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt index c25bf5e7..afc0e150 100644 --- a/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt @@ -41,7 +41,6 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.take import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.test.StandardTestDispatcher import java.util.concurrent.Executors import kotlin.test.Test import kotlin.test.assertEquals @@ -178,7 +177,7 @@ class ThrottleFirstJvmTest { } @Test - fun throttleFailureUpstream() = runBlocking(StandardTestDispatcher()) { + fun throttleFailureUpstream() = runBlocking { flow { emit(1) throw TestException("Broken!") From c0130805e3161d1690ea07ddd32fa84bba9061cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Sun, 1 May 2022 20:13:05 +0700 Subject: [PATCH 13/13] ignore jvm tests --- src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt | 2 ++ src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt | 3 +++ .../kotlin/com/hoc081098/flowext/WithLatestFromJvmTest.kt | 2 ++ 3 files changed, 7 insertions(+) diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt index 23e8c1c9..9bd447e2 100644 --- a/src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/TakeUntilJvmTest.kt @@ -35,12 +35,14 @@ import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.take import kotlinx.coroutines.runBlocking +import kotlin.test.Ignore import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertIs import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds +@Ignore("Ignore JVM tests. Run only locally.") @InternalCoroutinesApi @ExperimentalCoroutinesApi class TakeUntilJvmTest { diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt index afc0e150..a8621de0 100644 --- a/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/ThrottleJvmTest.kt @@ -42,6 +42,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.take import kotlinx.coroutines.runBlocking import java.util.concurrent.Executors +import kotlin.test.Ignore import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse @@ -49,6 +50,7 @@ import kotlin.test.assertIs import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds +@Ignore("Ignore JVM tests. Run only locally.") @ExperimentalCoroutinesApi class ThrottleFirstJvmTest { @Test @@ -327,6 +329,7 @@ class ThrottleFirstJvmTest { } } +@Ignore("Ignore JVM tests. Run only locally.") @ExperimentalCoroutinesApi class ThrottleLastJvmTest { @Test diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromJvmTest.kt index d514c774..0db58e3d 100644 --- a/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromJvmTest.kt +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/WithLatestFromJvmTest.kt @@ -37,10 +37,12 @@ import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking +import kotlin.test.Ignore import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith +@Ignore("Ignore JVM tests. Run only locally.") @InternalCoroutinesApi @ExperimentalCoroutinesApi class WithLatestFromJvmTest {