Skip to content

Commit

Permalink
Add/fix lots of documentation (#52)
Browse files Browse the repository at this point in the history
* Add more documentation to Async
* Also temporarily remove `*:` tuple comparison due to a scaladoc crash, see scala/scala3#19925
  • Loading branch information
natsukagami authored Mar 14, 2024
1 parent dc313af commit cf269c8
Show file tree
Hide file tree
Showing 15 changed files with 423 additions and 166 deletions.
184 changes: 150 additions & 34 deletions shared/src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,40 @@ import gears.async.Listener.withLock
import gears.async.Listener.NumberedLock
import scala.util.boundary

/** A context that allows to suspend waiting for asynchronous data sources
/** The async context: provides the capability to asynchronously [[Async.await await]] for [[Async.Source Source]]s, and
* defines a scope for structured concurrency through a [[CompletionGroup]].
*
* As both a context and a capability, the idiomatic way of using [[Async]] is to be implicitly passed around
* functions, as an `using` parameter:
* {{{
* def function()(using Async): T = ???
* }}}
*
* It is not recommended to store [[Async]] in a class field, since it complicates scoping rules.
*
* @param support
* An implementation of the underlying asynchronous operations (suspend and resume). See [[AsyncSupport]].
* @param scheduler
* An implementation of a scheduler, for scheduling computation as they are spawned or resumed. See [[Scheduler]].
*
* @see
* [[Async$.blocking Async.blocking]] for a way to construct an [[Async]] instance.
* @see
* [[Async$.group Async.group]] and [[Future$.apply Future.apply]] for [[Async]]-subscoping operations.
*/
trait Async(using val support: AsyncSupport, val scheduler: support.Scheduler):
/** Wait for completion of async source `src` and return the result */
/** Waits for completion of source `src` and returns the result. Suspends the computation.
*
* @see
* [[Async.Source.awaitResult]] and [[Async$.await]] for extension methods calling [[Async!.await]] from the source
* itself.
*/
def await[T](src: Async.Source[T]): T

/** The cancellation group for this Async */
/** Returns the cancellation group for this [[Async]] context. */
def group: CompletionGroup

/** An Async of the same kind as this one, with a new cancellation group */
/** Returns an [[Async]] context of the same kind as this one, with a new cancellation group. */
def withGroup(group: CompletionGroup): Async

object Async:
Expand Down Expand Up @@ -53,7 +77,7 @@ object Async:
def blocking[T](body: Async.Spawn ?=> T)(using support: AsyncSupport, scheduler: support.Scheduler): T =
group(body)(using Blocking(CompletionGroup.Unlinked))

/** The currently executing Async context */
/** Returns the currently executing Async context. Equivalent to `summon[Async]`. */
inline def current(using async: Async): Async = async

/** [[Async.Spawn]] is a special subtype of [[Async]], also capable of spawning runnable [[Future]]s.
Expand All @@ -63,8 +87,8 @@ object Async:
*/
opaque type Spawn <: Async = Async

/** Runs [[body]] inside a spawnable context where it is allowed to spawning concurrently runnable [[Future]]s. When
* the body returns, all spawned futures are cancelled and waited for.
/** Runs `body` inside a spawnable context where it is allowed to spawn concurrently runnable [[Future]]s. When the
* body returns, all spawned futures are cancelled and waited for.
*/
def group[T](body: Async.Spawn ?=> T)(using Async): T =
withNewCompletionGroup(CompletionGroup().link())(body)
Expand All @@ -86,51 +110,85 @@ object Async:
group.waitCompletion()(using completionAsync)

/** An asynchronous data source. Sources can be persistent or ephemeral. A persistent source will always pass same
* data to calls of `poll and `onComplete`. An ephemeral source can pass new data in every call. An example of a
* persistent source is `Future`. An example of an ephemeral source is `Channel`.
* data to calls of [[Source!.poll]] and [[Source!.onComplete]]. An ephemeral source can pass new data in every call.
*
* @see
* An example of a persistent source is [[gears.async.Future]].
* @see
* An example of an ephemeral source is [[gears.async.Channel]].
*/
trait Source[+T]:

/** Check whether data is available at present and pass it to k if so. If no element is available, does not lock k
* and returns false immediately. If there is (or may be) data available, the listener is locked and if it fails,
* true is returned to signal this source's general availability. If locking k succeeds, only return true iff k's
* complete is called. Calls to `poll` are always synchronous.
/** Checks whether data is available at present and pass it to `k` if so. Calls to `poll` are always synchronous and
* non-blocking.
*
* The process is as follows:
* - If no data is immediately available, return `false` immediately.
* - If there is data available, attempt to lock `k`.
* - If `k` is no longer available, `true` is returned to signal this source's general availability.
* - If locking `k` succeeds:
* - If data is still available, complete `k` and return true.
* - Otherwise, unlock `k` and return false.
*
* Note that in all cases, a return value of `false` indicates that `k` should be put into `onComplete` to receive
* data in a later point in time.
*
* @return
* Whether poll was able to pass data to `k`. Note that this is regardless of `k` being available to receive the
* data. In most cases, one should pass `k` into [[Source!.onComplete]] if `poll` returns `false`.
*/
def poll(k: Listener[T]): Boolean

/** Once data is available, pass it to function `k`. `k` returns true iff the data was consumed in an async block.
* Calls to `onComplete` are usually asynchronous, meaning that the passed continuation `k` is a suspension.
/** Once data is available, pass it to the listener `k`. `onComplete` is always non-blocking.
*
* Note that `k`'s methods will be executed on the same thread as the [[Source]], usually in sequence. It is hence
* important that the listener itself does not perform expensive operations.
*/
def onComplete(k: Listener[T]): Unit

/** Signal that listener `k` is dead (i.e. will always return `false` from now on). This permits original, (i.e.
* non-derived) sources like futures or channels to drop the listener from their waiting sets.
/** Signal that listener `k` is dead (i.e. will always fail to acquire locks from now on), and should be removed
* from `onComplete` queues.
*
* This permits original, (i.e. non-derived) sources like futures or channels to drop the listener from their
* waiting sets.
*/
def dropListener(k: Listener[T]): Unit

/** Utility method for direct polling. */
/** Similar to [[Async.Source!.poll(k:Listener[T])* poll]], but instead of passing in a listener, directly return
* the value `T` if it is available.
*/
def poll(): Option[T] =
var resultOpt: Option[T] = None
poll(Listener.acceptingListener { (x, _) => resultOpt = Some(x) })
resultOpt

/** Utility method for direct waiting with `Async`. */
/** Waits for an item to arrive from the source. Suspends until an item returns.
*
* This is an utility method for direct waiting with `Async`, instead of going through listeners.
*/
final def awaitResult(using ac: Async) = ac.await(this)
end Source

extension [T](src: Source[scala.util.Try[T]])
/** Waits for an item to arrive from the source, then automatically unwraps it. */
/** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns.
* @see
* [[Source!.awaitResult awaitResult]] for non-unwrapping await.
*/
inline def await(using Async) = src.awaitResult.get
extension [E, T](src: Source[Either[E, T]])
/** Waits for an item to arrive from the source, then automatically unwraps it. */
/** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns.
* @see
* [[Source!.awaitResult awaitResult]] for non-unwrapping await.
*/
inline def await(using Async) = src.awaitResult.right.get

/** An original source has a standard definition of `onComplete` in terms of `poll` and `addListener`. Implementations
* should be the resource owner to handle listener queue and completion using an object monitor on the instance.
/** An original source has a standard definition of [[Source.onComplete onComplete]] in terms of [[Source.poll poll]]
* and [[OriginalSource.addListener addListener]].
*
* Implementations should be the resource owner to handle listener queue and completion using an object monitor on
* the instance.
*/
abstract class OriginalSource[+T] extends Source[T]:

/** Add `k` to the listener set of this source */
/** Add `k` to the listener set of this source. */
protected def addListener(k: Listener[T]): Unit

def onComplete(k: Listener[T]): Unit = synchronized:
Expand All @@ -139,7 +197,12 @@ object Async:
end OriginalSource

object Source:
/** Create a [[Source]] containing the given values, resolved once for each. */
/** Create a [[Source]] containing the given values, resolved once for each.
*
* @return
* an ephemeral source of values arriving to listeners in a queue. Once all values are received, attaching a
* listener with [[Source!.onComplete onComplete]] will be a no-op (i.e. the listener will never be called).
*/
def values[T](values: T*) =
import scala.collection.JavaConverters._
val q = java.util.concurrent.ConcurrentLinkedQueue[T]()
Expand All @@ -163,8 +226,14 @@ object Async:

extension [T](src: Source[T])
/** Create a new source that requires the original source to run the given transformation function on every value
* received. Note that [[f]] is **always** run on the computation that produces the values from the original
* source, so this is very likely to run **sequentially** and be a performance bottleneck.
* received.
*
* Note that `f` is **always** run on the computation that produces the values from the original source, so this is
* very likely to run **sequentially** and be a performance bottleneck.
*
* @param f
* the transformation function to be run on every value. `f` is run *before* the item is passed to the
* [[Listener]].
*/
def transformValuesWith[U](f: T => U) =
new Source[U]:
Expand All @@ -182,7 +251,23 @@ object Async:
def dropListener(k: Listener[U]): Unit =
src.dropListener(transform(k))

/** Creates a source that "races" a list of sources.
*
* Listeners attached to this source is resolved with the first item arriving from one of the sources. If multiple
* sources are available at the same time, one of the items will be returned with no priority. Items that are not
* returned are '''not''' consumed from the upstream sources.
*
* @see
* [[raceWithOrigin]] for a race source that also returns the upstream origin of the item.
* @see
* [[Async$.select Async.select]] for a convenient syntax to race sources and awaiting them with [[Async]].
*/
def race[T](sources: Source[T]*): Source[T] = raceImpl[T, T]((v, _) => v)(sources*)

/** Like [[race]], but the returned value includes a reference to the upstream source that the item came from.
* @see
* [[Async$.select Async.select]] for a convenient syntax to race sources and awaiting them with [[Async]].
*/
def raceWithOrigin[T](sources: Source[T]*): Source[(T, Source[T])] =
raceImpl[(T, Source[T]), T]((v, src) => (v, src))(sources*)

Expand Down Expand Up @@ -260,28 +345,59 @@ object Async:

/** Cases for handling async sources in a [[select]]. [[SelectCase]] can be constructed by extension methods `handle`
* of [[Source]].
*
* @see
* [[handle Source.handle]] (and its operator alias [[~~> ~~>]])
* @see
* [[Async$.select Async.select]] where [[SelectCase]] is used.
*/
opaque type SelectCase[T] = (Source[?], Nothing => T)
// ^ unsafe types, but we only construct SelectCase from `handle` which is safe

extension [T](src: Source[T])
/** Attach a handler to [[src]], creating a [[SelectCase]]. */
/** Attach a handler to `src`, creating a [[SelectCase]].
* @see
* [[Async$.select Async.select]] where [[SelectCase]] is used.
*/
inline def handle[U](f: T => U): SelectCase[U] = (src, f)

/** Alias for [[handle]] */
/** Alias for [[handle]]
* @see
* [[Async$.select Async.select]] where [[SelectCase]] is used.
*/
inline def ~~>[U](f: T => U): SelectCase[U] = src.handle(f)

/** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race]],
* [[select]] guarantees exactly one of the sources are polled. Unlike `map`ping a [[Source]], the handler in
* [[select]] guarantees exactly one of the sources are polled. Unlike [[transformValuesWith]], the handler in
* [[select]] is run in the same async context as the calling context of [[select]].
*
* @see
* [[handle Source.handle]] (and its operator alias [[~~> ~~>]]) for methods to create [[SelectCase]]s.
* @example
* {{{
* // Race a channel read with a timeout
* val ch = SyncChannel[Int]()
* // ...
* val timeout = Future(sleep(1500.millis))
*
* Async.select(
* ch.readSrc.handle: item =>
* Some(item * 2),
* timeout ~~> _ => None
* )
* }}}
*/
def select[T](cases: SelectCase[T]*)(using Async) =
val (input, which) = raceWithOrigin(cases.map(_._1)*).awaitResult
val (_, handler) = cases.find(_._1 == which).get
handler.asInstanceOf[input.type => T](input)

/** If left (respectively, right) source succeeds with `x`, pass `Left(x)`, (respectively, Right(x)) on to the
* continuation.
/** Race two sources, wrapping them respectively in [[Left]] and [[Right]] cases.
* @return
* a new [[Source]] that resolves with [[Left]] if `src1` returns an item, [[Right]] if `src2` returns an item,
* whichever comes first.
* @see
* [[race]] and [[select]] for racing more than two sources.
*/
def either[T1, T2](src1: Source[T1], src2: Source[T2]): Source[Either[T1, T2]] =
race(src1.transformValuesWith(Left(_)), src2.transformValuesWith(Right(_)))
Expand Down
13 changes: 9 additions & 4 deletions shared/src/main/scala/async/AsyncOperations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeoutException
import gears.async.AsyncOperations.sleep

/** Defines fundamental operations that require the support of the scheduler. This is commonly provided alongside with
* the given implementation of [[Scheduler]].
* @see
* [[Scheduler]] for the definition of the scheduler itself.
*/
trait AsyncOperations:
/** Suspends the current [[Async]] context for at least [[millis]] milliseconds. */
/** Suspends the current [[Async]] context for at least `millis` milliseconds. */
def sleep(millis: Long)(using Async): Unit

object AsyncOperations:
/** Suspends the current [[Async]] context for at least [[millis]] milliseconds.
/** Suspends the current [[Async]] context for at least `millis` milliseconds.
* @param millis
* The duration to suspend. Must be a positive integer.
* The duration to suspend, in milliseconds. Must be a positive integer.
*/
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
summon[AsyncOperations].sleep(millis)

/** Suspends the current [[Async]] context for at least [[millis]] milliseconds.
/** Suspends the current [[Async]] context for `duration`.
* @param duration
* The duration to suspend. Must be positive.
*/
Expand Down
11 changes: 11 additions & 0 deletions shared/src/main/scala/async/AsyncSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package gears.async

import scala.concurrent.duration._

/** The delimited continuation suspension interface. Represents a suspended computation asking for a value of type `T`
* to continue (and eventually returning a value of type `R`).
*/
trait Suspension[-T, +R]:
def resume(arg: T): R

/** Support for suspension capabilities through a delimited continuation interface. */
trait SuspendSupport:
/** A marker for the "limit" of "delimited continuation". */
type Label[R]

/** The provided suspension type. */
type Suspension[-T, +R] <: gears.async.Suspension[T, R]

/** Set the suspension marker as the body's caller, and execute `body`. */
def boundary[R](body: Label[R] ?=> R): R

/** Should return immediately if resume is called from within body */
Expand All @@ -18,12 +26,15 @@ trait SuspendSupport:
trait AsyncSupport extends SuspendSupport:
type Scheduler <: gears.async.Scheduler

/** Resume a [[Suspension]] at some point in the future, scheduled by the scheduler. */
private[async] def resumeAsync[T, R](suspension: Suspension[T, R])(arg: T)(using s: Scheduler): Unit =
s.execute(() => suspension.resume(arg))

/** Schedule a computation with the suspension boundary already created. */
private[async] def scheduleBoundary(body: Label[Unit] ?=> Unit)(using s: Scheduler): Unit =
s.execute(() => boundary(body))

/** A scheduler implementation, with the ability to execute a computation immediately or after a delay. */
trait Scheduler:
def execute(body: Runnable): Unit
def schedule(delay: FiniteDuration, body: Runnable): Cancellable
Expand Down
3 changes: 2 additions & 1 deletion shared/src/main/scala/async/Cancellable.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package gears.async

/** A trait for cancellable entities that can be grouped */
/** A trait for cancellable entities that can be grouped. */
trait Cancellable:

private var group: CompletionGroup = CompletionGroup.Unlinked
Expand Down Expand Up @@ -28,6 +28,7 @@ trait Cancellable:
end Cancellable

object Cancellable:
/** A special [[Cancellable]] object that just tracks whether its linked group was cancelled. */
trait Tracking extends Cancellable:
def isCancelled: Boolean

Expand Down
1 change: 1 addition & 0 deletions shared/src/main/scala/async/CompletionGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class CompletionGroup extends Cancellable.Tracking:
members.toSeq
.foreach(_.cancel())

/** Wait for all members of the group to complete and unlink themselves. */
private[async] def waitCompletion()(using Async): Unit =
synchronized:
if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise())
Expand Down
Loading

0 comments on commit cf269c8

Please sign in to comment.