Skip to content

Commit

Permalink
Merge pull request #79 from hoc081098/refactor_
Browse files Browse the repository at this point in the history
Refactor all
  • Loading branch information
hoc081098 authored May 1, 2022
2 parents d2941a6 + c013080 commit 10a31d4
Show file tree
Hide file tree
Showing 19 changed files with 1,021 additions and 112 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
run: ./gradlew koverXmlReport

- name: Upload Test Report
uses: codecov/codecov-action@v2.1.0
uses: codecov/codecov-action@v3.0.0

- name: Upload test report artifact
if: ${{ failure() }}
Expand Down Expand Up @@ -148,7 +148,7 @@ jobs:

- name: Deploy docs 🚀 to website
if: ${{ github.ref == 'refs/heads/master' && github.repository == 'hoc081098/FlowExt' && matrix.os == 'macos-11' }}
uses: JamesIves/github-pages-deploy-action@v4.2.5
uses: JamesIves/github-pages-deploy-action@v4.3.0
with:
branch: gh-pages # The branch the action should deploy to.
folder: build/dokka/html # The folder the action should deploy.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:

- name: Deploy docs 🚀 to website
if: ${{ matrix.os == 'macos-11' }}
uses: JamesIves/github-pages-deploy-action@v4.2.5
uses: JamesIves/github-pages-deploy-action@v4.3.0
with:
branch: gh-pages # The branch the action should deploy to.
folder: build/dokka/html # The folder the action should deploy.
Expand Down
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## 0.3.0

- Update
- `Kotlin` to `1.6.20`.
- `Kotlin` to `1.6.21`.
- `KotlinX Coroutines` to `1.6.1`.
- `Gradle` to `7.4.2`.

- Refactor `withLatestFrom`'s implementation.
Expand All @@ -11,7 +12,6 @@
- Add `Symbol` class.

- Add
- `Flow.throttle`.
- `Flow.throttleTime`.
- `Event.flatMap`.
- `Event.valueOrDefault`.
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
- `retryWhenWithExponentialBackoff`
- `retryWithExponentialBackoff`
- [`takeUntil`](#takeUntil)
- `throttle`
- `throttleTime`
- [`withLatestFrom`](#withLatestFrom)

Expand Down
4 changes: 2 additions & 2 deletions api/FlowExt.api
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ public final class com/hoc081098/flowext/ThrottleConfiguration : java/lang/Enum
}

public final class com/hoc081098/flowext/ThrottleKt {
public static final fun throttle (Lkotlinx/coroutines/flow/Flow;Lcom/hoc081098/flowext/ThrottleConfiguration;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun throttle$default (Lkotlinx/coroutines/flow/Flow;Lcom/hoc081098/flowext/ThrottleConfiguration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun throttleTime (Lkotlinx/coroutines/flow/Flow;JLcom/hoc081098/flowext/ThrottleConfiguration;)Lkotlinx/coroutines/flow/Flow;
public static final fun throttleTime (Lkotlinx/coroutines/flow/Flow;Lcom/hoc081098/flowext/ThrottleConfiguration;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun throttleTime$default (Lkotlinx/coroutines/flow/Flow;JLcom/hoc081098/flowext/ThrottleConfiguration;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun throttleTime$default (Lkotlinx/coroutines/flow/Flow;Lcom/hoc081098/flowext/ThrottleConfiguration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun throttleTime-8Mi8wO0 (Lkotlinx/coroutines/flow/Flow;JLcom/hoc081098/flowext/ThrottleConfiguration;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun throttleTime-8Mi8wO0$default (Lkotlinx/coroutines/flow/Flow;JLcom/hoc081098/flowext/ThrottleConfiguration;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}
Expand Down
16 changes: 11 additions & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType
import java.net.URL

plugins {
kotlin("multiplatform") version "1.6.20"
id("com.diffplug.spotless") version "6.4.1"
kotlin("multiplatform") version "1.6.21"
id("com.diffplug.spotless") version "6.5.1"
id("maven-publish")
id("com.vanniktech.maven.publish") version "0.19.0"
id("org.jetbrains.kotlinx.binary-compatibility-validator") version "0.8.0"
id("org.jetbrains.dokka") version "1.6.10"
id("org.jetbrains.kotlinx.binary-compatibility-validator") version "0.9.0"
id("org.jetbrains.dokka") version "1.6.21"
id("org.jetbrains.kotlinx.kover") version "0.5.0"
}

val coroutinesVersion = "1.6.0"
val coroutinesVersion = "1.6.1"
val ktlintVersion = "0.44.0"

repositories {
Expand Down Expand Up @@ -218,6 +218,12 @@ tasks.withType<DokkaTask>().configureEach {
externalDocumentationLink {
url.set(URL("https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/"))
}

sourceLink {
localDirectory.set(file("src/commonMain/kotlin"))
remoteUrl.set(URL("https://github.com/hoc081098/FlowExt/tree/master/src/commonMain/kotlin"))
remoteLineSuffix.set("#L")
}
}
}
}
2 changes: 0 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
kotlin.code.style=official
kotlin.mpp.enableGranularSourceSetsMetadata=true
kotlin.native.enableDependencyPropagation=false
kotlin.js.generate.executable.default=false

GROUP=io.github.hoc081098
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ package com.hoc081098.flowext.internal

internal expect class AtomicRef<T>(value: T) {
var value: T

fun compareAndSet(expect: T, update: T): Boolean
}
1 change: 1 addition & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/mapTo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ import kotlinx.coroutines.flow.transform
public inline fun <T, R> Flow<T>.mapTo(value: R): Flow<R> =
transform { return@transform emit(value) }

@Suppress("NOTHING_TO_INLINE")
public inline fun <T> Flow<T>.mapToUnit(): Flow<Unit> = mapTo(Unit)
59 changes: 30 additions & 29 deletions src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.onSuccess
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/**
* Define leading and trailing behavior.
Expand Down Expand Up @@ -123,10 +124,7 @@ private inline val ThrottleConfiguration.isTrailing: Boolean
public fun <T> Flow<T>.throttleTime(
duration: Duration,
throttleConfiguration: ThrottleConfiguration = LEADING,
): Flow<T> {
val timerFlow = timer(Unit, duration)
return throttle(throttleConfiguration) { timerFlow }
}
): Flow<T> = throttleTime(throttleConfiguration) { duration }

/**
* Returns a [Flow] that emits a value from the source [Flow], then ignores subsequent source values
Expand Down Expand Up @@ -181,10 +179,7 @@ public fun <T> Flow<T>.throttleTime(
public fun <T> Flow<T>.throttleTime(
timeMillis: Long,
throttleConfiguration: ThrottleConfiguration = LEADING,
): Flow<T> {
val timerFlow = timer(Unit, timeMillis)
return throttle(throttleConfiguration) { timerFlow }
}
): Flow<T> = throttleTime(throttleConfiguration) { timeMillis.milliseconds }

/**
* Returns a [Flow] that emits a value from the source [Flow], then ignores subsequent source values
Expand All @@ -196,7 +191,7 @@ public fun <T> Flow<T>.throttleTime(
* (1..10)
* .asFlow()
* .onEach { delay(200) }
* .throttle { timer(Unit, 500) }
* .throttleTime { 500.milliseconds }
* ```
*
* produces the following emissions
Expand All @@ -211,7 +206,7 @@ public fun <T> Flow<T>.throttleTime(
* (1..10)
* .asFlow()
* .onEach { delay(200) }
* .throttle(TRAILING) { timer(Unit, 500) }
* .throttleTime(TRAILING) { 500.milliseconds }
* ```
*
* produces the following emissions
Expand All @@ -226,7 +221,7 @@ public fun <T> Flow<T>.throttleTime(
* (1..10)
* .asFlow()
* .onEach { delay(200) }
* .throttle(LEADING_AND_TRAILING) { timer(Unit, 500) }
* .throttleTime(LEADING_AND_TRAILING) { 500.milliseconds }
* ```
*
* produces the following emissions
Expand All @@ -236,9 +231,9 @@ public fun <T> Flow<T>.throttleTime(
* ```
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.throttle(
public fun <T> Flow<T>.throttleTime(
throttleConfiguration: ThrottleConfiguration = LEADING,
durationSelector: (value: T) -> Flow<Unit>,
durationSelector: (value: T) -> Duration,
): Flow<T> = flow {
val leading = throttleConfiguration.isLeading
val trailing = throttleConfiguration.isTrailing
Expand Down Expand Up @@ -267,18 +262,20 @@ public fun <T> Flow<T>.throttle(
}
}

val onWindowClosed = suspend {
throttled = null

if (trailing) {
trySend()
}
}

// Now consume the values until the original flow is complete.
while (lastValue !== DONE_VALUE) {
// wait for the next value
select<Unit> {
// When a throttling window ends, send the value if there is a pending value.
throttled?.onJoin?.invoke {
throttled = null

if (trailing) {
trySend()
}
}
throttled?.onJoin?.invoke(onWindowClosed)

values.onReceiveCatching { result ->
result
Expand All @@ -292,20 +289,24 @@ public fun <T> Flow<T>.throttle(
if (leading) {
trySend()
}
throttled = durationSelector(NULL_VALUE.unbox(value))
.take(1)
.launchIn(scope)

when (val duration = durationSelector(NULL_VALUE.unbox(value))) {
Duration.ZERO -> onWindowClosed()
else -> throttled = scope.launch { delay(duration) }
}
}
.onFailure {
it?.let { throw it }

// Once the original flow has completed, there may still be a pending value
// waiting to be emitted. If so, wait for the throttling window to end and then
// send it. That will complete this throttled flow.
if (trailing && throttled != null && lastValue != null) {
throttled!!.join()
throttled = null
trySend()
if (trailing && lastValue != null) {
throttled?.run {
throttled = null
join()
trySend()
}
}

lastValue = DONE_VALUE
Expand Down
10 changes: 5 additions & 5 deletions src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ package com.hoc081098.flowext

import com.hoc081098.flowext.internal.AtomicRef
import com.hoc081098.flowext.utils.NULL_VALUE
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch

/**
* Merges two [Flow]s into one [Flow] by combining each value from self with the latest value from the second [Flow], if any.
Expand All @@ -48,9 +48,9 @@ public fun <A, B, R> Flow<A>.withLatestFrom(

try {
coroutineScope {
other
.onEach { otherRef.value = it ?: NULL_VALUE }
.launchIn(this)
launch(start = CoroutineStart.UNDISPATCHED) {
other.collect { otherRef.value = it ?: NULL_VALUE }
}

collect { value ->
emit(
Expand Down
Loading

0 comments on commit 10a31d4

Please sign in to comment.