Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plus operator idea from concat #192

Merged
merged 12 commits into from
Nov 11, 2023
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

- Remove now-unsupported targets: `iosArm32`, `watchosX86`.

### Added

- Add `Flow.plus` operator, it is an alias to `concatWith` operator.

## [0.7.3] - Oct 29, 2023

### Changed
Expand Down
20 changes: 18 additions & 2 deletions README.md
hoangchungk53qx1 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ dependencies {
- [`castNullable`](#cast--castnotnull--castnullable--safeCast)
- [`chunked`](#buffercount--chunked)
- [`safeCast`](#cast--castnotnull--castnullable--safeCast)
- [`concatWith`](#concatwith)
- [`concatWith`](#concatwith--plus)
- [`startWith`](#startwith)
- [`flatMapFirst`](#flatmapfirst--exhaustmap)
- [`exhaustMap`](#flatmapfirst--exhaustmap)
Expand Down Expand Up @@ -161,6 +161,7 @@ dependencies {
- [`throttleTime`](#throttletime)
- [`withLatestFrom`](#withlatestfrom)
- [`zipWithNext`](#pairwise--zipWithNext)
- [`plus`](#concatwith--plus)

#### bufferCount / chunked

Expand Down Expand Up @@ -207,6 +208,7 @@ bufferCount: [8, 9]
to [RxJava concat](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#concat-java.lang.Iterable-)

Creates an output `Flow` which sequentially emits all values from the first given `Flow` and then moves on to the next.
Note, `plus` is an alias to `concat`.

```kotlin
concat(
Expand Down Expand Up @@ -519,7 +521,7 @@ safeCast: null

----

#### concatWith
#### concatWith / plus

- Similar to [RxJS concatWith](https://rxjs.dev/api/operators/concatWith)
- Similar
Expand All @@ -531,6 +533,13 @@ Returns a `Flow` that emits the items emitted from the current `Flow`, then the
flowOf(1, 2, 3)
.concatWith(flowOf(4, 5, 6))
.collect { println("concatWith: $it") }

println("---")

val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)

(flow1 + flow2).collect { println("plus: $it") }
```

Output:
Expand All @@ -542,6 +551,13 @@ concatWith: 3
concatWith: 4
concatWith: 5
concatWith: 6
---
plus: 1
plus: 2
plus: 3
plus: 4
plus: 5
plus: 6
```

----
Expand Down
1 change: 1 addition & 0 deletions api/FlowExt.api
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class com/hoc081098/flowext/ConcatKt {
public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun plus (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
16 changes: 16 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/concat.kt
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,19 @@ public fun <T> Flow<T>.startWith(others: Sequence<T>): Flow<T> = concat(others.a
* Returns a [Flow] that emits the items in a specified [Flow] before it begins to emit items emitted by the current [Flow].
*/
public fun <T> Flow<T>.startWith(other: Flow<T>): Flow<T> = concat(other, this)

/**
* This function is an alias to [concatWith] operator.
*
* Returns a [Flow] that emits the items emitted from this [Flow], then the next, one after the other, without interleaving them.
*
* @see concatWith
*
* Example:
* ``` kotlin
* val flow1 = flowOf(1, 2, 3)
* val flow2 = flowOf(4, 5, 6)
* val result = flow1 + flow2 // produces the following emissions 1, 2, 3, 4, 5, 6
* ```
*/
public operator fun <T, R : T> Flow<T>.plus(other: Flow<R>): Flow<T> = concat(this, other)
57 changes: 57 additions & 0 deletions src/commonTest/kotlin/com/hoc081098/flowext/ConcatTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,63 @@ class ConcatTest : BaseTest() {
),
).test(null, expectation)
}

@Test
fun plusTwoFlow() = runTest {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)

(flow1 + flow2)
.test(
listOf(
Event.Value(1),
Event.Value(2),
Event.Value(3),
Event.Value(4),
Event.Value(5),
Event.Value(6),
Event.Complete,
),
)
}

@Test
fun plusTwoFlow2() = runTest {
(flowOf("a", 2, 3) + flowOf(4, 5, 6))
.test(
listOf(
Event.Value("a"),
Event.Value(2),
Event.Value(3),
Event.Value(4),
Event.Value(5),
Event.Value(6),
Event.Complete,
),
)
}

@Test
fun testPlus_firstFailureUpstream() = runTest {
val flow = flowOf(1, 2, 3)
val failureFlow = kotlinx.coroutines.flow.flow<Nothing> { throw TestException("Crash!") }
val expectation: suspend (List<Event<Int>>) -> Unit = { events ->
val message = assertIs<TestException>(events.single().errorOrThrow()).message
assertEquals("Crash!", message)
}

(failureFlow + flow)
.test(null, expectation)

(failureFlow + flow + flow)
.test(null, expectation)

(failureFlow + flow + flow + flow)
.test(null, expectation)

(failureFlow + flow + flow + flow + flow)
.test(null, expectation)
}
}

private operator fun <T> Iterable<T>.times(times: Int): List<T> = (0 until times).flatMap { this }