diff --git a/CHANGELOG.md b/CHANGELOG.md index ae681848..079f25f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - `defer`. - `flowFromSuspend`. - `mapEager`, `flatMapConcatEager`, `flattenConcatEager`. + - `skipUntil`, `dropUntil`. - Support for Apple Silicon targets - `iosSimulatorArm64`. diff --git a/README.md b/README.md index 5c024b0f..a2dd9f86 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,8 @@ - `retryWhenWithDelayStrategy` - `retryWhenWithExponentialBackoff` - `retryWithExponentialBackoff` + - [`skipUntil`](#skipuntil--dropuntil) + - [`dropUntil`](#skipuntil--dropuntil) - [`takeUntil`](#takeUntil) - `throttleTime` - [`withLatestFrom`](#withLatestFrom) @@ -169,6 +171,12 @@ . - Similar to [RxJS raceWith](https://rxjs.dev/api/operators/raceWith) +#### skipUntil / dropUntil + +- ReactiveX docs: https://reactivex.io/documentation/operators/skipuntil.html +- Similar to [RxJS skipUntil](https://rxjs.dev/api/index/function/skipUntil) +- Similar to [RxJava skipUntil](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#skipUntil-org.reactivestreams.Publisher-) + #### takeUntil - ReactiveX docs: http://reactivex.io/documentation/operators/takeuntil.html diff --git a/api/FlowExt.api b/api/FlowExt.api index 585b7716..025d8d5f 100644 --- a/api/FlowExt.api +++ b/api/FlowExt.api @@ -158,6 +158,11 @@ public final class com/hoc081098/flowext/RetryWhenWithDelayStrategyKt { public static synthetic fun retryWithExponentialBackoff-f6PB7jA$default (Lkotlinx/coroutines/flow/Flow;JDJJLkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; } +public final class com/hoc081098/flowext/SkipUntilKt { + public static final fun dropUntil (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun skipUntil (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; +} + public final class com/hoc081098/flowext/TakeUntilKt { public static final fun takeUntil (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt new file mode 100644 index 00000000..85f6bed6 --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/skipUntil.kt @@ -0,0 +1,73 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.internal.AtomicBoolean +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.launch + +/** + * Returns a [Flow] that skips items emitted by the source [Flow] until a second [Flow] emits a value or completes. + * + * @param notifier The second [Flow] that has to emit a value before the source [Flow]'s values + * begin to be mirrored by the resulting [Flow]. + */ +@FlowPreview +@ExperimentalCoroutinesApi +public fun Flow.skipUntil(notifier: Flow): Flow = flow { + coroutineScope { + val shouldEmit = AtomicBoolean(false) + + val job = launch(start = CoroutineStart.UNDISPATCHED) { + notifier.take(1).collect() + shouldEmit.value = true + } + + collect { + if (shouldEmit.value) { + emit(it) + } + } + + job.cancel() + } +} + +/** + * This function is an alias to [skipUntil] operator. + * + * @see skipUntil + */ +@Suppress("NOTHING_TO_INLINE") +@FlowPreview +@ExperimentalCoroutinesApi +public inline fun Flow.dropUntil(notifier: Flow): Flow = skipUntil(notifier) diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt new file mode 100644 index 00000000..94602c25 --- /dev/null +++ b/src/commonTest/kotlin/com/hoc081098/flowext/SkipUntilTest.kt @@ -0,0 +1,173 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.utils.BaseStepTest +import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.assertFailsWith +import com.hoc081098.flowext.utils.test +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.take + +@FlowPreview +@InternalCoroutinesApi +@ExperimentalCoroutinesApi +class SkipUntilTest : BaseStepTest() { + @Test + fun testSkipUntil() = runTest { + // ----------1----------2----------3 + // ---------------| + flowOf(1, 2, 3) + .onEach { delay(100) } + .skipUntil(timer(Unit, 150)) + .test( + listOf( + Event.Value(2), + Event.Value(3), + Event.Complete + ) + ) + + flowOf(1, 2, 3) + .onEach { delay(100) } + .dropUntil(timer(Unit, 150)) + .test( + listOf( + Event.Value(2), + Event.Value(3), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilNever() = runTest { + flowOf(1, 2, 3, 4) + .skipUntil(neverFlow()) + .test(listOf(Event.Complete)) + } + + @Test + fun testSkipUntilEmpty() = runTest { + flowOf(1, 2, 3, 4) + .skipUntil(emptyFlow()) + .test( + listOf( + Event.Value(1), + Event.Value(2), + Event.Value(3), + Event.Value(4), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilFailureUpstream() = runTest { + // 01--------------------2X + // ----------100 + + val source = flow { + expect(2) + emit(0) + expect(3) + emit(1) + + delay(20) + expect(5) + + emit(2) + expect(7) + throw TestException() + } + + val notifier = flowOf(100).onEach { + delay(10) + expect(4) + } + + expect(1) + assertFailsWith( + source + .skipUntil(notifier) + .onEach { + assertEquals(2, it) + expect(6) + } + ) + finish(8) + } + + @Test + fun testSkipUntilCancellation() = runTest { + flow { + emit(0) + delay(200) + emit(1) + emit(2) + emit(3) + expectUnreached() // Cancelled by take + emit(5) + }.skipUntil(timer(Unit, 100)) + .take(2) + .test( + listOf( + Event.Value(1), + Event.Value(2), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilNotifierFailure() = runTest { + flow { + emit(0) + delay(200) + emit(1) + emit(2) + emit(3) + }.skipUntil( + timer(Unit, 100).onEach { + throw TestException() + } + ) + .let { + it.test(null) { events -> + assertEquals(1, events.size) + } + assertFailsWith(it) + } + } +} diff --git a/src/jvmTest/kotlin/com/hoc081098/flowext/SkipUntilJvmTest.kt b/src/jvmTest/kotlin/com/hoc081098/flowext/SkipUntilJvmTest.kt new file mode 100644 index 00000000..51c42271 --- /dev/null +++ b/src/jvmTest/kotlin/com/hoc081098/flowext/SkipUntilJvmTest.kt @@ -0,0 +1,165 @@ +/* + * MIT License + * + * Copyright (c) 2021-2022 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.utils.BaseStepTest +import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.assertFailsWith +import com.hoc081098.flowext.utils.test +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.runBlocking + +@Ignore("Ignore JVM tests. Run only locally.") +@FlowPreview +@InternalCoroutinesApi +@ExperimentalCoroutinesApi +class SkipUntilJvmTest : BaseStepTest() { + @Test + fun testSkipUntil() = runBlocking { + // ----------1----------2----------3 + // ---------------| + flowOf(1, 2, 3) + .onEach { delay(100) } + .skipUntil(timer(Unit, 150)) + .test( + listOf( + Event.Value(2), + Event.Value(3), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilNever() = runBlocking { + flowOf(1, 2, 3, 4) + .skipUntil(neverFlow()) + .test(listOf(Event.Complete)) + } + + @Test + fun testSkipUntilEmpty() = runBlocking { + flowOf(1, 2, 3, 4) + .skipUntil(emptyFlow()) + .test( + listOf( + Event.Value(1), + Event.Value(2), + Event.Value(3), + Event.Value(4), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilFailureUpstream() = runBlocking { + // 01--------------------2X + // ----------100 + + val source = flow { + expect(2) + emit(0) + expect(3) + emit(1) + + delay(20) + expect(5) + + emit(2) + expect(7) + throw TestException() + } + + val notifier = flowOf(100).onEach { + delay(10) + expect(4) + } + + expect(1) + assertFailsWith( + source + .skipUntil(notifier) + .onEach { + assertEquals(2, it) + expect(6) + } + ) + finish(8) + } + + @Test + fun testSkipUntilCancellation() = runBlocking { + flow { + emit(0) + delay(200) + emit(1) + emit(2) + emit(3) + expectUnreached() // Cancelled by take + emit(5) + }.skipUntil(timer(Unit, 100)) + .take(2) + .test( + listOf( + Event.Value(1), + Event.Value(2), + Event.Complete + ) + ) + } + + @Test + fun testSkipUntilNotifierFailure() = runBlocking { + flow { + emit(0) + delay(200) + emit(1) + emit(2) + emit(3) + }.skipUntil( + timer(Unit, 100).onEach { + throw TestException() + } + ) + .let { + it.test(null) { events -> + assertEquals(1, events.size) + } + assertFailsWith(it) + } + } +}