Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async collect functions to TaskSeq and improve tooltip info #11

Merged
merged 3 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,47 @@
# TaskSeq
An implementation `IAsyncEnumerableM<'T>` as a `taskSeq` CE for F# with accompanying `TaskSeq` module.

## Coming up!!!
## In progress!!!

It's based on [Don Symes `taskSeq.fs`](https://github.com/dotnet/fsharp/blob/d5312aae8aad650f0043f055bb14c3aa8117e12e/tests/benchmarks/CompiledCodeBenchmarks/TaskPerf/TaskPerf/taskSeq.fs)
but expanded with useful utility functions and a few extra binding overloads.

## Current set of `TaskSeq` utility functions

The following is the current surface area of the `TaskSeq` utility functions. This is just a dump of the signatures, proper
documentation will be added soon(ish):

```f#
module TaskSeq =
val toList: t: taskSeq<'T> -> 'T list
val toArray: taskSeq: taskSeq<'T> -> 'T[]
val empty<'T> : IAsyncEnumerable<'T>
val ofArray: array: 'T[] -> IAsyncEnumerable<'T>
val ofList: list: 'T list -> IAsyncEnumerable<'T>
val ofSeq: sequence: seq<'T> -> IAsyncEnumerable<'T>
val ofResizeArray: data: ResizeArray<'T> -> IAsyncEnumerable<'T>
val ofTaskSeq: sequence: seq<#Task<'T>> -> IAsyncEnumerable<'T>
val ofTaskList: list: #Task<'T> list -> IAsyncEnumerable<'T>
val ofTaskArray: array: #Task<'T> array -> IAsyncEnumerable<'T>
val ofAsyncSeq: sequence: seq<Async<'T>> -> IAsyncEnumerable<'T>
val ofAsyncList: list: Async<'T> list -> IAsyncEnumerable<'T>
val ofAsyncArray: array: Async<'T> array -> IAsyncEnumerable<'T>
val toArrayAsync: taskSeq: taskSeq<'a> -> Task<'a[]>
val toListAsync: taskSeq: taskSeq<'a> -> Task<'a list>
val toResizeArrayAsync: taskSeq: taskSeq<'a> -> Task<ResizeArray<'a>>
val toIListAsync: taskSeq: taskSeq<'a> -> Task<IList<'a>>
val toSeqCachedAsync: taskSeq: taskSeq<'a> -> Task<seq<'a>>
val iter: action: ('a -> unit) -> taskSeq: taskSeq<'a> -> Task<unit>
val iteri: action: (int -> 'a -> unit) -> taskSeq: taskSeq<'a> -> Task<unit>
val iterAsync: action: ('a -> #Task<unit>) -> taskSeq: taskSeq<'a> -> Task<unit>
val iteriAsync: action: (int -> 'a -> #Task<unit>) -> taskSeq: taskSeq<'a> -> Task<unit>
val map: mapper: ('T -> 'U) -> taskSeq: taskSeq<'T> -> IAsyncEnumerable<'U>
val mapi: mapper: (int -> 'T -> 'U) -> taskSeq: taskSeq<'T> -> IAsyncEnumerable<'U>
val mapAsync: mapper: ('a -> #Task<'c>) -> taskSeq: taskSeq<'a> -> IAsyncEnumerable<'c>
val mapiAsync: mapper: (int -> 'a -> #Task<'c>) -> taskSeq: taskSeq<'a> -> IAsyncEnumerable<'c>
val collect: binder: ('T -> #IAsyncEnumerable<'U>) -> taskSeq: taskSeq<'T> -> IAsyncEnumerable<'U>
val collectSeq: binder: ('T -> #seq<'U>) -> taskSeq: taskSeq<'T> -> IAsyncEnumerable<'U>
val collectAsync: binder: ('T -> #Task<'b>) -> taskSeq: taskSeq<'T> -> taskSeq<'U> when 'b :> IAsyncEnumerable<'U>
val collectSeqAsync: binder: ('T -> #Task<'b>) -> taskSeq: taskSeq<'T> -> taskSeq<'U> when 'b :> seq<'U>
val zip: taskSeq1: taskSeq<'a> -> taskSeq2: taskSeq<'b> -> IAsyncEnumerable<'a * 'b>
```
25 changes: 16 additions & 9 deletions src/FSharpy.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module TaskSeq =
open FSharpy.TaskSeqBuilders

// Just for convenience
module Internal = FSharpy.TaskSeqInternal
module Internal = TaskSeqInternal

/// Returns taskSeq as an array. This function is blocking until the sequence is exhausted.
let toList (t: taskSeq<'T>) = [
Expand Down Expand Up @@ -132,32 +132,32 @@ module TaskSeq =
// iter/map/collect functions
//

/// Iterates over the taskSeq. This function is non-blocking
/// Iterates over the taskSeq applying the action function to each item. This function is non-blocking
/// exhausts the sequence as soon as the task is evaluated.
let iter action taskSeq = Internal.iter (SimpleAction action) taskSeq

/// Iterates over the taskSeq. This function is non-blocking,
/// Iterates over the taskSeq applying the action function to each item. This function is non-blocking,
/// exhausts the sequence as soon as the task is evaluated.
let iteri action taskSeq = Internal.iter (CountableAction action) taskSeq

/// Iterates over the taskSeq. This function is non-blocking
/// Iterates over the taskSeq applying the async action to each item. This function is non-blocking
/// exhausts the sequence as soon as the task is evaluated.
let iterAsync action taskSeq = Internal.iter (AsyncSimpleAction action) taskSeq

/// Iterates over the taskSeq. This function is non-blocking,
/// Iterates over the taskSeq, applying the async action to each item. This function is non-blocking,
/// exhausts the sequence as soon as the task is evaluated.
let iteriAsync action taskSeq = Internal.iter (AsyncCountableAction action) taskSeq

/// Maps over the taskSeq. This function is non-blocking.
/// Maps over the taskSeq, applying the mapper function to each item. This function is non-blocking.
let map (mapper: 'T -> 'U) taskSeq = Internal.map (SimpleAction mapper) taskSeq

/// Maps over the taskSeq with an index. This function is non-blocking.
/// Maps over the taskSeq with an index, applying the mapper function to each item. This function is non-blocking.
let mapi (mapper: int -> 'T -> 'U) taskSeq = Internal.map (CountableAction mapper) taskSeq

/// Maps over the taskSeq. This function is non-blocking.
/// Maps over the taskSeq, applying the async mapper function to each item. This function is non-blocking.
let mapAsync mapper taskSeq = Internal.map (AsyncSimpleAction mapper) taskSeq

/// Maps over the taskSeq with an index. This function is non-blocking.
/// Maps over the taskSeq with an index, applying the async mapper function to each item. This function is non-blocking.
let mapiAsync mapper taskSeq = Internal.map (AsyncCountableAction mapper) taskSeq

/// Applies the given function to the items in the taskSeq and concatenates all the results in order.
Expand All @@ -166,6 +166,13 @@ module TaskSeq =
/// Applies the given function to the items in the taskSeq and concatenates all the results in order.
let collectSeq (binder: 'T -> #seq<'U>) taskSeq = Internal.collectSeq binder taskSeq

/// Applies the given async function to the items in the taskSeq and concatenates all the results in order.
let collectAsync (binder: 'T -> #Task<#IAsyncEnumerable<'U>>) taskSeq : taskSeq<'U> =
Internal.collectAsync binder taskSeq

/// Applies the given async function to the items in the taskSeq and concatenates all the results in order.
let collectSeqAsync (binder: 'T -> #Task<#seq<'U>>) taskSeq : taskSeq<'U> = Internal.collectSeqAsync binder taskSeq

//
// zip/unzip etc functions
//
Expand Down
14 changes: 13 additions & 1 deletion src/FSharpy.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ module internal TaskSeqInternal =
return res
}

let inline toResizeArrayAndMapAsync mapper taskSeq = (toResizeArrayAsync >> Task.map mapper) taskSeq
let toResizeArrayAndMapAsync mapper taskSeq = (toResizeArrayAsync >> Task.map mapper) taskSeq

let map mapper (taskSequence: taskSeq<_>) =
match mapper with
Expand Down Expand Up @@ -143,6 +143,18 @@ module internal TaskSeqInternal =
yield! binder c :> seq<_>
}

let collectAsync (binder: _ -> #Task<#IAsyncEnumerable<_>>) (taskSequence: taskSeq<_>) = taskSeq {
for c in taskSequence do
let! result = binder c
yield! result :> IAsyncEnumerable<_>
}

let collectSeqAsync (binder: _ -> #Task<#seq<_>>) (taskSequence: taskSeq<_>) = taskSeq {
for c in taskSequence do
let! result = binder c
yield! result :> seq<_>
}

/// Returns taskSeq as an array. This function is blocking until the sequence is exhausted.
let toListResult (t: taskSeq<'T>) = [
let e = t.GetAsyncEnumerator(CancellationToken())
Expand Down