From 02141bdf0e9aee5a164ca94e2e80af71e6b298a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Wed, 22 Jun 2022 15:54:23 +0700 Subject: [PATCH 1/6] feat(skipUntil) --- .../kotlin/com/hoc081098/flowext/skipUntil.kt | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt new file mode 100644 index 00000000..cec46c2f --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.utils.NULL_VALUE +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.onFailure +import kotlinx.coroutines.channels.onSuccess +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.selects.select + +@ExperimentalCoroutinesApi +public fun Flow.skipUntil(notifier: Flow): Flow = flow { + coroutineScope { + val values = produce { + collect { send(it ?: NULL_VALUE) } + } + val notifierChannel = produce(capacity = 1) { + notifier.take(1).collect() + send(Unit) + } + + var shouldEmit = false + var loop = true + + while (loop) { + select { + if (!shouldEmit) { + notifierChannel.onReceiveCatching { result -> + result + .onSuccess { shouldEmit = true } + .onFailure { it?.let { throw it } } + } + } + + values + .onReceiveCatching { result -> + result + .onSuccess { + if (shouldEmit) { + emit(NULL_VALUE.unbox(it)) + } + } + .onFailure { + it?.let { throw it } + loop = false + } + } + } + } + } +} + +@Suppress("NOTHING_TO_INLINE") +@ExperimentalCoroutinesApi +public inline fun Flow.dropUntil(notifier: Flow): Flow = skipUntil(notifier) From 803b09469c686a1a7223e8089d97794705ebbdd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Tue, 28 Jun 2022 16:09:45 +0700 Subject: [PATCH 2/6] feat(skipUntil): add test --- CHANGELOG.md | 1 + api/FlowExt.api | 5 + .../kotlin/com/hoc081098/flowext/skipUntil.kt | 34 +++-- .../com/hoc081098/flowext/SkipUntilTest.kt | 119 ++++++++++++++++++ 4 files changed, 147 insertions(+), 12 deletions(-) create mode 100644 src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d991841..c62e4aed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - `defer`. - `flowFromSuspend`. - `mapEager`, `flatMapConcatEager`, `flattenConcatEager`. + - `skipUntil`, `dropUntil`. ## 0.3.0 - May 2, 2022 diff --git a/api/FlowExt.api b/api/FlowExt.api index 585b7716..025d8d5f 100644 --- a/api/FlowExt.api +++ b/api/FlowExt.api @@ -158,6 +158,11 @@ public final class com/hoc081098/flowext/RetryWhenWithDelayStrategyKt { public static synthetic fun retryWithExponentialBackoff-f6PB7jA$default (Lkotlinx/coroutines/flow/Flow;JDJJLkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; } +public final class com/hoc081098/flowext/SkipUntilKt { + public static final fun dropUntil (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun skipUntil (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; +} + public final class com/hoc081098/flowext/TakeUntilKt { public static final fun takeUntil (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt index cec46c2f..c1498018 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt @@ -24,39 +24,43 @@ package com.hoc081098.flowext -import com.hoc081098.flowext.utils.NULL_VALUE import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.channels.onFailure import kotlinx.coroutines.channels.onSuccess import kotlinx.coroutines.channels.produce import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.produceIn import kotlinx.coroutines.flow.take import kotlinx.coroutines.selects.select +/** + * TODO + */ +@FlowPreview @ExperimentalCoroutinesApi public fun Flow.skipUntil(notifier: Flow): Flow = flow { coroutineScope { - val values = produce { - collect { send(it ?: NULL_VALUE) } - } - val notifierChannel = produce(capacity = 1) { - notifier.take(1).collect() - send(Unit) - } + val values = produceIn(this) + val notifierChannel = produce() { notifier.take(1).collect() } var shouldEmit = false var loop = true while (loop) { + ensureActive() + select { if (!shouldEmit) { notifierChannel.onReceiveCatching { result -> - result - .onSuccess { shouldEmit = true } - .onFailure { it?.let { throw it } } + result.onFailure { + it?.let { throw it } + shouldEmit = true + } } } @@ -65,12 +69,14 @@ public fun Flow.skipUntil(notifier: Flow): Flow = flow { result .onSuccess { if (shouldEmit) { - emit(NULL_VALUE.unbox(it)) + emit(it) } } .onFailure { it?.let { throw it } + loop = false + notifierChannel.cancel() } } } @@ -78,6 +84,10 @@ public fun Flow.skipUntil(notifier: Flow): Flow = flow { } } +/** + * TODO + */ @Suppress("NOTHING_TO_INLINE") +@FlowPreview @ExperimentalCoroutinesApi public inline fun Flow.dropUntil(notifier: Flow): Flow = skipUntil(notifier) diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt new file mode 100644 index 00000000..9ae1bc63 --- /dev/null +++ b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt @@ -0,0 +1,119 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.utils.BaseStepTest +import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.assertFailsWith +import com.hoc081098.flowext.utils.test +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach +import kotlin.test.Test +import kotlin.test.assertEquals + +@FlowPreview +@InternalCoroutinesApi +@ExperimentalCoroutinesApi +class SkipUntilTest : BaseStepTest() { + @Test + fun testSkipUntil() = runTest { + // ----------1----------2----------3 + // ---------------| + flowOf(1, 2, 3) + .onEach { delay(100) } + .skipUntil(timer(Unit, 150)) + .test( + listOf( + Event.Value(2), + Event.Value(3), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilNever() = runTest { + flowOf(1, 2, 3, 4) + .skipUntil(neverFlow()) + .test(listOf(Event.Complete)) + } + + @Test + fun testSkipUntilEmpty() = runTest { + flowOf(1, 2, 3, 4) + .skipUntil(emptyFlow()) + .test( + listOf( + Event.Value(1), + Event.Value(2), + Event.Value(3), + Event.Value(4), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilFailureUpstream() = runTest { + // 01--------------------2X + // ----------100 + + val source = flow { + expect(2) + emit(0) + expect(3) + emit(1) + + delay(20) + expect(5) + + emit(2) + expect(7) + throw TestException() + } + + val notifier = flowOf(100).onEach { + delay(10) + expect(4) + } + + expect(1) + assertFailsWith( + source + .skipUntil(notifier) + .onEach { + assertEquals(2, it) + expect(6) + } + ) + finish(8) + } +} From 4df7f8197685cdbbf810b0b5e6ef5290938b3441 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Tue, 5 Jul 2022 11:56:06 +0700 Subject: [PATCH 3/6] style: spotlessApply --- src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt | 2 +- src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt index c1498018..ff2bb327 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt @@ -46,7 +46,7 @@ import kotlinx.coroutines.selects.select public fun Flow.skipUntil(notifier: Flow): Flow = flow { coroutineScope { val values = produceIn(this) - val notifierChannel = produce() { notifier.take(1).collect() } + val notifierChannel = produce { notifier.take(1).collect() } var shouldEmit = false var loop = true diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt index 9ae1bc63..65b53029 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt @@ -28,6 +28,8 @@ import com.hoc081098.flowext.utils.BaseStepTest import com.hoc081098.flowext.utils.TestException import com.hoc081098.flowext.utils.assertFailsWith import com.hoc081098.flowext.utils.test +import kotlin.test.Test +import kotlin.test.assertEquals import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi @@ -36,8 +38,6 @@ import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.onEach -import kotlin.test.Test -import kotlin.test.assertEquals @FlowPreview @InternalCoroutinesApi From 74939d1d1ae1e4accbd506e294b1a70480105654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Tue, 5 Jul 2022 15:12:49 +0700 Subject: [PATCH 4/6] update impl. --- .../kotlin/com/hoc081098/flowext/skipUntil.kt | 51 ++---- .../com/hoc081098/flowext/SkipUntilTest.kt | 43 +++++ .../com/hoc081098/flowext/SkipUntilJvmTest.kt | 165 ++++++++++++++++++ 3 files changed, 221 insertions(+), 38 deletions(-) create mode 100644 src/jvmTest/kotlin/com/hoc081098/flowext/SkipUntilJvmTest.kt diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt index ff2bb327..f55498d7 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt @@ -24,19 +24,16 @@ package com.hoc081098.flowext +import com.hoc081098.flowext.internal.AtomicBoolean +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.channels.onFailure -import kotlinx.coroutines.channels.onSuccess -import kotlinx.coroutines.channels.produce import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.produceIn import kotlinx.coroutines.flow.take -import kotlinx.coroutines.selects.select +import kotlinx.coroutines.launch /** * TODO @@ -45,42 +42,20 @@ import kotlinx.coroutines.selects.select @ExperimentalCoroutinesApi public fun Flow.skipUntil(notifier: Flow): Flow = flow { coroutineScope { - val values = produceIn(this) - val notifierChannel = produce { notifier.take(1).collect() } + val shouldEmit = AtomicBoolean(false) - var shouldEmit = false - var loop = true - - while (loop) { - ensureActive() - - select { - if (!shouldEmit) { - notifierChannel.onReceiveCatching { result -> - result.onFailure { - it?.let { throw it } - shouldEmit = true - } - } - } - - values - .onReceiveCatching { result -> - result - .onSuccess { - if (shouldEmit) { - emit(it) - } - } - .onFailure { - it?.let { throw it } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + notifier.take(1).collect() + shouldEmit.value = true + } - loop = false - notifierChannel.cancel() - } - } + collect { + if (shouldEmit.value) { + emit(it) } } + + job.cancel() } } diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt index 65b53029..c5148058 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt @@ -38,6 +38,7 @@ import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.take @FlowPreview @InternalCoroutinesApi @@ -116,4 +117,46 @@ class SkipUntilTest : BaseStepTest() { ) finish(8) } + + @Test + fun testSkipUntilCancellation() = runTest { + flow { + emit(0) + delay(200) + emit(1) + emit(2) + emit(3) + expectUnreached() // Cancelled by take + emit(5) + }.skipUntil(timer(Unit, 100)) + .take(2) + .test( + listOf( + Event.Value(1), + Event.Value(2), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilNotifierFailure() = runTest { + flow { + emit(0) + delay(200) + emit(1) + emit(2) + emit(3) + }.skipUntil( + timer(Unit, 100).onEach { + throw TestException() + } + ) + .let { + it.test(null) { events -> + assertEquals(1, events.size) + } + assertFailsWith(it) + } + } } diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/SkipUntilJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/SkipUntilJvmTest.kt new file mode 100644 index 00000000..51c42271 --- /dev/null +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/SkipUntilJvmTest.kt @@ -0,0 +1,165 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.utils.BaseStepTest +import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.assertFailsWith +import com.hoc081098.flowext.utils.test +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.runBlocking + +@Ignore("Ignore JVM tests. Run only locally.") +@FlowPreview +@InternalCoroutinesApi +@ExperimentalCoroutinesApi +class SkipUntilJvmTest : BaseStepTest() { + @Test + fun testSkipUntil() = runBlocking { + // ----------1----------2----------3 + // ---------------| + flowOf(1, 2, 3) + .onEach { delay(100) } + .skipUntil(timer(Unit, 150)) + .test( + listOf( + Event.Value(2), + Event.Value(3), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilNever() = runBlocking { + flowOf(1, 2, 3, 4) + .skipUntil(neverFlow()) + .test(listOf(Event.Complete)) + } + + @Test + fun testSkipUntilEmpty() = runBlocking { + flowOf(1, 2, 3, 4) + .skipUntil(emptyFlow()) + .test( + listOf( + Event.Value(1), + Event.Value(2), + Event.Value(3), + Event.Value(4), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilFailureUpstream() = runBlocking { + // 01--------------------2X + // ----------100 + + val source = flow { + expect(2) + emit(0) + expect(3) + emit(1) + + delay(20) + expect(5) + + emit(2) + expect(7) + throw TestException() + } + + val notifier = flowOf(100).onEach { + delay(10) + expect(4) + } + + expect(1) + assertFailsWith( + source + .skipUntil(notifier) + .onEach { + assertEquals(2, it) + expect(6) + } + ) + finish(8) + } + + @Test + fun testSkipUntilCancellation() = runBlocking { + flow { + emit(0) + delay(200) + emit(1) + emit(2) + emit(3) + expectUnreached() // Cancelled by take + emit(5) + }.skipUntil(timer(Unit, 100)) + .take(2) + .test( + listOf( + Event.Value(1), + Event.Value(2), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilNotifierFailure() = runBlocking { + flow { + emit(0) + delay(200) + emit(1) + emit(2) + emit(3) + }.skipUntil( + timer(Unit, 100).onEach { + throw TestException() + } + ) + .let { + it.test(null) { events -> + assertEquals(1, events.size) + } + assertFailsWith(it) + } + } +} From f9357fa0ae3daf90939407e259b81774a4c9eb22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Tue, 5 Jul 2022 16:05:21 +0700 Subject: [PATCH 5/6] docs --- README.md | 8 ++++++++ src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt | 9 +++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5c024b0f..a2dd9f86 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,8 @@ - `retryWhenWithDelayStrategy` - `retryWhenWithExponentialBackoff` - `retryWithExponentialBackoff` + - [`skipUntil`](#skipuntil--dropuntil) + - [`dropUntil`](#skipuntil--dropuntil) - [`takeUntil`](#takeUntil) - `throttleTime` - [`withLatestFrom`](#withLatestFrom) @@ -169,6 +171,12 @@ . - Similar to [RxJS raceWith](https://rxjs.dev/api/operators/raceWith) +#### skipUntil / dropUntil + +- ReactiveX docs: https://reactivex.io/documentation/operators/skipuntil.html +- Similar to [RxJS skipUntil](https://rxjs.dev/api/index/function/skipUntil) +- Similar to [RxJava skipUntil](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#skipUntil-org.reactivestreams.Publisher-) + #### takeUntil - ReactiveX docs: http://reactivex.io/documentation/operators/takeuntil.html diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt index f55498d7..85f6bed6 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt @@ -36,7 +36,10 @@ import kotlinx.coroutines.flow.take import kotlinx.coroutines.launch /** - * TODO + * Returns a [Flow] that skips items emitted by the source [Flow] until a second [Flow] emits a value or completes. + * + * @param notifier The second [Flow] that has to emit a value before the source [Flow]'s values + * begin to be mirrored by the resulting [Flow]. */ @FlowPreview @ExperimentalCoroutinesApi @@ -60,7 +63,9 @@ public fun Flow.skipUntil(notifier: Flow): Flow = flow { } /** - * TODO + * This function is an alias to [skipUntil] operator. + * + * @see skipUntil */ @Suppress("NOTHING_TO_INLINE") @FlowPreview From 682306b265d5551e4465ad36be6e1cb3b1909ac0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Tue, 5 Jul 2022 16:17:38 +0700 Subject: [PATCH 6/6] tests --- .../kotlin/com/hoc081098/flowext/SkipUntilTest.kt | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt index c5148058..94602c25 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt @@ -58,6 +58,17 @@ class SkipUntilTest : BaseStepTest() { Event.Complete ) ) + + flowOf(1, 2, 3) + .onEach { delay(100) } + .dropUntil(timer(Unit, 150)) + .test( + listOf( + Event.Value(2), + Event.Value(3), + Event.Complete + ) + ) } @Test