Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加/优化部分 Collectable(s) 相关的API、说明等 #773

Merged
merged 2 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@
package love.forte.simbot.event

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.toCollection
import kotlinx.coroutines.future.future
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactor.asFlux
import love.forte.simbot.common.collectable.collectBy
import love.forte.simbot.common.collection.asIterator
import love.forte.simbot.suspendrunner.runInNoScopeBlocking
import reactor.core.publisher.Flux
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.function.Function
import java.util.stream.Collector
import java.util.stream.Collector.Characteristics.*
import java.util.stream.Stream
import java.util.stream.StreamSupport
import kotlin.coroutines.CoroutineContext
Expand All @@ -54,50 +53,28 @@ import kotlin.coroutines.EmptyCoroutineContext
public fun EventProcessor.pushAndAsFlux(event: Event): Flux<EventResult> =
push(event).asFlux()

@Suppress("UNCHECKED_CAST")
internal suspend fun <T, R> kotlinx.coroutines.flow.Flow<T>.collectBy(
@Deprecated(
"Use collectBy", ReplaceWith(
"collectBy(scope, launchContext, collector)",
"love.forte.simbot.common.collectable.collectBy"
),
DeprecationLevel.HIDDEN
)
@JvmName("collectBy")
internal suspend fun <T, R> Flow<T>.collectBy0(
scope: CoroutineScope,
launchContext: CoroutineContext = EmptyCoroutineContext,
collector: Collector<T, *, R>
): R {
val container = collector.supplier().get()
val accumulator = collector.accumulator() as java.util.function.BiConsumer<Any?, T>
val characteristics = collector.characteristics()

if (CONCURRENT in characteristics && UNORDERED in characteristics) {
// collect in launch
collect { result ->
scope.launch(launchContext) { accumulator.accept(container, result) }
}
} else {
collect { result ->
accumulator.accept(container, result)
}
}

return if (IDENTITY_FINISH in characteristics) {
container as R
} else {
(collector.finisher() as Function<Any?, R>).apply(container)
}
}

@Suppress("UNCHECKED_CAST")
internal suspend fun <T, R> kotlinx.coroutines.flow.Flow<T>.collectBy(collector: Collector<T, *, R>): R {
val container = collector.supplier().get()
val accumulator = collector.accumulator() as java.util.function.BiConsumer<Any?, T>
val characteristics = collector.characteristics()

collect { result ->
accumulator.accept(container, result)
}

return if (IDENTITY_FINISH in characteristics) {
container as R
} else {
(collector.finisher() as Function<Any?, R>).apply(container)
}
}
): R = collectBy(scope, launchContext, collector)

@Deprecated(
"Use collectBy",
ReplaceWith("collectBy(collector)", "love.forte.simbot.common.collectable.collectBy"),
DeprecationLevel.HIDDEN
)
@JvmName("collectBy")
internal suspend fun <T, R> Flow<T>.collectBy0(collector: Collector<T, *, R>): R =
collectBy(collector)

//region async
/**
Expand Down
2 changes: 1 addition & 1 deletion simbot-api/src/jvmTest/kotlin/DispatcherTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import love.forte.simbot.event.collectBy
import love.forte.simbot.common.collectable.collectBy
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
import java.util.function.Function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import kotlin.jvm.JvmSynthetic
*
* 当 [Collectable] 中实际没有需要被挂起的类型时,考虑对外提供普通可迭代的收集器类型 [IterableCollectable]。
*
* 在 JVM 中,会使用 `Collectables` 静态类提供更多辅助API。
*
* @see IterableCollectable
*
* @author ForteScarlet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,31 @@

package love.forte.simbot.common.collectable

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.future.future
import kotlinx.coroutines.reactor.asFlux
import love.forte.simbot.annotations.InternalSimbotAPI
import love.forte.simbot.common.async.Async
import love.forte.simbot.common.async.asAsync
import love.forte.simbot.common.collection.asIterator
import love.forte.simbot.common.function.Action
import love.forte.simbot.suspendrunner.reserve.SuspendReserve
import love.forte.simbot.suspendrunner.runInAsync
import love.forte.simbot.suspendrunner.runInNoScopeBlocking
import reactor.core.publisher.Flux
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.function.Function
import java.util.stream.Collector
import java.util.stream.Collectors
import java.util.stream.Stream
import java.util.stream.StreamSupport
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.streams.asSequence
import kotlin.streams.asStream

Expand Down Expand Up @@ -207,6 +218,42 @@ public fun <T> IterableCollectable<T>.asStream(): Stream<T> = asSequence().asStr
*/
public fun <T> SequenceCollectable<T>.asStream(): Stream<T> = asSequence().asStream()

/**
* 将一个 [Collectable] 转化为 [Stream]。
* 如果是 [IterableCollectable] 或 [SequenceCollectable],
* 则会使用它们的 `asSequence` 进行转化,
* 否则会使用 [Collectable] 中合适的方法进行适当的**阻塞转化**。
*
* 如果 collectable 不是 [IterableCollectable] 或 [SequenceCollectable],
* 则需要提供 [produceScope] 来将异步的收集器转化为 stream。默认使用 [GlobalScope]。
* 注意:请参考并了解有关 [GlobalScope] 的各种注意事项,避免出现预期外的结果或错误。
*
* @param produceScope 如果 collectable 不是 [IterableCollectable] 或 [SequenceCollectable],
* 则需要提供一个作用域来将异步的收集器转化为 stream。默认使用 [GlobalScope]。
* 注意:请参考并了解有关 [GlobalScope] 的各种注意事项,避免出现预期外的结果或错误。
*/
@OptIn(DelicateCoroutinesApi::class)
@JvmOverloads
public fun <T> Collectable<T>.asStream(produceScope: CoroutineScope? = null): Stream<T> {
return when (this) {
is IterableCollectable -> asStream()
is SequenceCollectable -> asStream()
else -> {
val scope = produceScope ?: GlobalScope
val iter = asFlow().asIterator(
producerScope = scope,
hasNext = { runInNoScopeBlocking { hasNext() } },
next = { runInNoScopeBlocking { next() } })

StreamSupport.stream(
{ Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED) },
Spliterator.ORDERED,
false
)
}
}
}


/**
* 将 [Stream] 转换为 [Collectable] 的函数.
Expand All @@ -220,8 +267,109 @@ private class StreamCollectableImpl<T>(private val stream: Stream<T>) : Sequence
override fun toList(): List<T> = stream.collect(Collectors.toUnmodifiableList())
}

/// List

// reactor
/**
* 将 [Collectable] 阻塞地收集为 [List]。
* 会根据类型适当地优化与避免阻塞挂起操作。
*
* @see runInNoScopeBlocking
*/
public fun <T> Collectable<T>.toList(): List<T> = when (this) {
is SynchronouslyIterateCollectable -> toList()
else -> runInNoScopeBlocking { asFlow().toList() }
}

/**
* 将 [Collectable] 异步地收集为 [List]。
* 如果 [scope] 为 `null`,则会视情况选择使用一个内部的 CoroutineScope 异步调度器
* (see [runInAsync])
* 或使用 [CompletableFuture.supplyAsync]。
*
* @see runInAsync
* @see CompletableFuture.supplyAsync
*/
@OptIn(InternalSimbotAPI::class)
@JvmOverloads
public fun <T> Collectable<T>.toListAsync(scope: CoroutineScope? = null): CompletableFuture<List<T>> = when (this) {
is SynchronouslyIterateCollectable -> scope?.future { toList() }
?: CompletableFuture.supplyAsync { toList() }

else -> scope?.future { asFlow().toList() }
?: runInAsync { asFlow().toList() }
}

/// collector

/**
* 使用 [Collector] **阻塞地**收集 [Collectable] 中的元素。
*/
public fun <T, R> Collectable<T>.collect(collector: Collector<T, *, R>): R {
return when (this) {
is SynchronouslyIterateCollectable -> when (this) {
is SequenceCollectable -> asSequence().asStream().collect(collector)
else -> asSequence().asStream().collect(collector)
}

else -> {
runInNoScopeBlocking { asFlow().collectBy(collector) }
}
}
}

/**
* 使用 [Collector] **异步地**收集 [Collectable] 中的元素。
* 如果 [scope] 为 `null`,则会视情况选择使用一个内部的 CoroutineScope 异步调度器
* (see [runInAsync])
* 或使用 [CompletableFuture.supplyAsync]。
*
* @see runInAsync
* @see CompletableFuture.supplyAsync
*/
@OptIn(InternalSimbotAPI::class)
@JvmOverloads
public fun <T, R> Collectable<T>.collectAsync(
scope: CoroutineScope? = null,
collector: Collector<T, *, R>
): CompletableFuture<R> {
return when (this) {
is SynchronouslyIterateCollectable -> when (this) {
is SequenceCollectable -> scope?.future { asSequence().asStream().collect(collector) }
?: CompletableFuture.supplyAsync { asSequence().asStream().collect(collector) }

else -> scope?.future { asSequence().asStream().collect(collector) }
?: CompletableFuture.supplyAsync { asSequence().asStream().collect(collector) }
}

else -> {
scope?.future { asFlow().collectBy(scope = scope, collector = collector) }
runInAsync { asFlow().collectBy(scope = this, collector = collector) }
}
}
}

/// transform

/**
* 使用 [SuspendReserve.Transformer] 对 [Collectable.asFlow] 的结果进行转化,
* 例如可以使用 `SuspendReserves.flux()` 转化为 [Flux] 或 `SuspendReserves.list()`
* 转化为 [List]。
* 注意:部分转化器可能会要求运行时存在一些依赖,请注意参考它们的注释与说明。
*
* 建议主要使用 [transform] 转化为其他响应式类型,例如 [Flux]。
* 对列表等普通的集合类型可以选择其他可能有更多判断与优化的API,
* 例如 [Collectable.toList]。
*
*/
@OptIn(DelicateCoroutinesApi::class)
@JvmOverloads
public fun <T, R> Collectable<T>.transform(
scope: CoroutineScope = GlobalScope,
transformer: SuspendReserve.Transformer<Flow<T>, R>
): R =
transformer(scope, EmptyCoroutineContext) { asFlow() }

/// reactor

/**
* 将 [Collectable] 转化为 [Flux]。
Expand All @@ -234,3 +382,55 @@ public fun <T : Any> Collectable<T>.asFlux(): Flux<T> =
asFlow().asFlux()


/**
* 使用 [Collector] 收集 [Flow] 中的元素。
*/
@Suppress("UNCHECKED_CAST")
@JvmSynthetic
public suspend fun <T, R> Flow<T>.collectBy(
scope: CoroutineScope,
launchContext: CoroutineContext = EmptyCoroutineContext,
collector: Collector<T, *, R>
): R {
val container = collector.supplier().get()
val accumulator = collector.accumulator() as java.util.function.BiConsumer<Any?, T>
val characteristics = collector.characteristics()

if (Collector.Characteristics.CONCURRENT in characteristics && Collector.Characteristics.UNORDERED in characteristics) {
// collect in launch
collect { result ->
scope.launch(launchContext) { accumulator.accept(container, result) }
}
} else {
collect { result ->
accumulator.accept(container, result)
}
}

return if (Collector.Characteristics.IDENTITY_FINISH in characteristics) {
container as R
} else {
(collector.finisher() as Function<Any?, R>).apply(container)
}
}

/**
* 使用 [Collector] 收集 [Flow] 中的元素。
*/
@Suppress("UNCHECKED_CAST")
@JvmSynthetic
public suspend fun <T, R> Flow<T>.collectBy(collector: Collector<T, *, R>): R {
val container = collector.supplier().get()
val accumulator = collector.accumulator() as java.util.function.BiConsumer<Any?, T>
val characteristics = collector.characteristics()

collect { result ->
accumulator.accept(container, result)
}

return if (Collector.Characteristics.IDENTITY_FINISH in characteristics) {
container as R
} else {
(collector.finisher() as Function<Any?, R>).apply(container)
}
}
Loading
Loading