diff --git a/src/.editorconfig b/src/.editorconfig new file mode 100644 index 00000000..8a8b2c98 --- /dev/null +++ b/src/.editorconfig @@ -0,0 +1,265 @@ +root = true + +############################### +# Core EditorConfig Options # +############################### +# All files +# [*] # Do not apply to all files not to break something +# Either crlf | lf, default is system-dependent (when not specified at all) +# end_of_line=crlf +# Remove whitespace at the end of any line + +# Code files +[*.{cs,csx,fs,fsi,fsx}] +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space # default=space +indent_size = 4 # default=4 +charset = utf-8 + +# Project files and app specific XML files +[*.{csproj,fsproj,shproj,projitems,props,targets,xaml}] +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 2 +charset = utf-8 + +# XML configuration files +[{nuget.config,packages.config,app*.config,web*.config}] +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 2 +charset = utf-8 + +# XML files +[*.xml] +trim_trailing_whitespace = false # do not trim as it affects CData +insert_final_newline = true +indent_style = space +indent_size = 2 + +# JSON and YAML files +[*.{json,yml,yaml}] +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 2 + +# Proto files +[*.proto] +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 4 + +############################### +# F# Coding Conventions # +############################### +# filetypes that need to be formatted by Fantomas: +[*.{fs,fsx}] + +# files to be ignored for Fantomas may go into this file, if present: +# .fantomasignore + +# indentation size, default=4 +indent_size=4 + +# line length before it gets broken down into multiple lines +# default 120 +max_line_length=120 + +# Either crlf | lf, default is system-dependent (when not specified at all) +# end_of_line=crlf + +# Whether end-of-file has a newline, default=true +insert_final_newline=true + +# false: someLineOfCode +# true: someLineOfCode; +# default false +fsharp_semicolon_at_end_of_line=false + +# false: f(1,2) +# true: f(1, 2) +# default true +fsharp_space_before_parameter=true + +# false: Option.map(fun x -> x) +# true: Option.map (fun x -> x) +# default true +fsharp_space_before_lowercase_invocation=true + +# false: x.ToString() +# true: x.ToString () +# default false +fsharp_space_before_uppercase_invocation=false + +# false: new Ship(withBeans) +# true: new Ship (withBeans) +# default false +fsharp_space_before_class_constructor=false + +# false: __.member Foo(x) = x +# true: __.member Foo (x) = x +# default false +fsharp_space_before_member=false + +# false: type Point = { x: int; y: int } +# true: type Point = { x : int; y : int } +# default false +fsharp_space_before_colon=false + +# false: (a,b,c) +# true: (a, b, c) +# default true +fsharp_space_after_comma=true + +# false: [a; b; 42] +# true: [a ; b ; 42] +# default false +fsharp_space_before_semicolon=false + +# false: [a;b;42] +# true: [a; b; 42] +# default true +fsharp_space_after_semicolon=true + +# false: no indent after `with` in a `try-with` +# true: must indent after `with` in a `try-with` +# default false +fsharp_indent_on_try_with=false + +# false: let a = [1;2;3] +# true: let a = [ 1;2;3 ] +# default true +fsharp_space_around_delimiter=true + +# breaks an if-then-else in smaller parts if it is on one line +# default 40 +fsharp_max_if_then_else_short_width=60 + +# breaks an infix operator expression if it is on one line +# infix: a + b + c +# default 50 +fsharp_max_infix_operator_expression=60 + +# breaks a single-line record declaration +# i.e. if this gets too wide: { X = 10; Y = 12 } +# default 40 +fsharp_max_record_width=80 + +# breaks a record into one item per line if items exceed this number +# i.e. if set to 1, this will be on three lines: { X = 10; Y = 12 } +# requires fsharp_record_multiline_formatter=number_of_items to take effect +# default 1 +fsharp_max_record_number_of_items=1 + +# whether to use line-length (by counting chars) or items (by counting fields) +# for the record settings above +# either number_of_items or character_width +# default character_width +fsharp_record_multiline_formatter=character_width + +# breaks a single line array or list if it exceeds this size +# default 40 +fsharp_max_array_or_list_width=100 + +# breaks an array or list into one item per line if items exceeds this number +# i.e. if set to 1, this will be shown on three lines [1; 2; 3] +# requires fsharp_array_or_list_multiline_formatter=number_of_items to take effect +# default 1 +fsharp_max_array_or_list_number_of_items=1 + +# whether to use line-length (by counting chars) or items (by counting fields) +# for the list and array settings above +# either number_of_items or character_width +# default character_width +fsharp_array_or_list_multiline_formatter=character_width + +# maximum with of a value binding, does not include keyword "let" +# default 80 +fsharp_max_value_binding_width=100 + +# maximum width for function and member binding (rh-side) +# default 40 +fsharp_max_function_binding_width=80 + +# maximum width for expressions like X.DoY().GetZ(10).Help() +# default 50 +fsharp_max_dot_get_expression_width=80 + +# whether open/close brackets go on same colum +# false: type Range = +# { From: float +# To: float } +# true: type Range = +# { +# From: float +# To: float +# } +# default false +fsharp_multiline_block_brackets_on_same_column=true + +# whether a newline should be placed before members +# false: type Range = +# { From: float } +# member this.Length = this.To - this.From +# false: type Range = +# { From: float } +# +# member this.Length = this.To - this.From +# default false +fsharp_newline_between_type_definition_and_members=true + +# deprecated setting, has no effect anymore +# default false +fsharp_keep_if_then_in_same_line=true + +# configures max width of Elmish expressions +# default 40 +fsharp_max_elmish_width=40 + +# Applies to Elmish expressions +# default false +fsharp_single_argument_web_mode=false + +# if a function sign exceeds max_line_length, then: +# false: do not place the equal-sign on a single line +# true: place the equal-sign on a single line +# default false +fsharp_align_function_signature_to_indentation=false + +# see docs: https://github.com/fsprojects/fantomas/blob/master/docs/Documentation.md#fsharp_alternative_long_member_definitions +# default false +fsharp_alternative_long_member_definitions=false + +# places closing paren in lambda on a newline in multiline lambdas +# default false +fsharp_multi_line_lambda_closing_newline=false + +# disables Elmish syntax parsing +fsharp_disable_elmish_syntax=false + +# allows the 'else'-branch to be aligned at same level as 'else' if the ret type allows it +# default false +fsharp_keep_indent_in_branch=false + +# multiline, nested expressions must be surrounded by blank lines +# default true +fsharp_blank_lines_around_nested_multiline_expressions=true + +# whether a bar is placed before DU +# false: type MyDU = Short of int +# true: type MyDU = | Short of int +# default false +fsharp_bar_before_discriminated_union_declaration=false + +# whether to use stroustrup style for records, lists and CEs +# To work reliably, fsharp_multiline_block_brackets_on_same_column must be "true" +fsharp_experimental_stroustrup_style=true + +# from docs: Please do not use this setting for formatting hand written code! +# default false +fsharp_strict_mode=false diff --git a/src/FSharpy.TaskSeq.sln b/src/FSharpy.TaskSeq.sln new file mode 100644 index 00000000..764051df --- /dev/null +++ b/src/FSharpy.TaskSeq.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.3.32811.315 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharpy.TaskSeq", "FSharpy.TaskSeq\FSharpy.TaskSeq.fsproj", "{9A723760-A7AB-4C8D-9A6E-F0A38341827C}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {9A723760-A7AB-4C8D-9A6E-F0A38341827C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9A723760-A7AB-4C8D-9A6E-F0A38341827C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9A723760-A7AB-4C8D-9A6E-F0A38341827C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9A723760-A7AB-4C8D-9A6E-F0A38341827C}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {2AE57787-A847-4460-A627-1EB1D224FBC3} + EndGlobalSection +EndGlobal diff --git a/src/FSharpy.TaskSeq/FSharpy.TaskSeq.fsproj b/src/FSharpy.TaskSeq/FSharpy.TaskSeq.fsproj new file mode 100644 index 00000000..b96bafd8 --- /dev/null +++ b/src/FSharpy.TaskSeq/FSharpy.TaskSeq.fsproj @@ -0,0 +1,12 @@ + + + + net6.0 + true + + + + + + + diff --git a/src/FSharpy.TaskSeq/TaskSeq.fs b/src/FSharpy.TaskSeq/TaskSeq.fs new file mode 100644 index 00000000..27733ff4 --- /dev/null +++ b/src/FSharpy.TaskSeq/TaskSeq.fs @@ -0,0 +1,148 @@ +namespace FSharpy.TaskSeq + +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks + +module TaskSeq = + open TaskSeq + + /// Returns taskSeq as an array. This function is blocking until the sequence is exhausted. + let toList (t: taskSeq<'T>) = [ + let e = t.GetAsyncEnumerator(CancellationToken()) + + try + while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do + yield e.Current + finally + e.DisposeAsync().AsTask().Wait() + ] + + + /// Returns taskSeq as an array. This function is blocking until the sequence is exhausted. + let toArray (taskSeq: taskSeq<'T>) = [| + let e = taskSeq.GetAsyncEnumerator(CancellationToken()) + + try + while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do + yield e.Current + finally + e.DisposeAsync().AsTask().Wait() + |] + + let empty<'T> = taskSeq { + for c: 'T in [] do + yield c + } + + let ofArray (array: 'T[]) = taskSeq { + for c in array do + yield c + } + + let ofList (list: 'T list) = taskSeq { + for c in list do + yield c + } + + let ofSeq (sequence: 'T seq) = taskSeq { + for c in sequence do + yield c + } + + let ofResizeArray (data: 'T ResizeArray) = taskSeq { + for c in data do + yield c + } + + let ofTaskSeq (sequence: #Task<'T> seq) = taskSeq { + for c in sequence do + let! c = c + yield c + } + + let ofTaskList (list: #Task<'T> list) = taskSeq { + for c in list do + let! c = c + yield c + } + + let ofTaskArray (array: #Task<'T> array) = taskSeq { + for c in array do + let! c = c + yield c + } + + let ofAsyncSeq (sequence: Async<'T> seq) = taskSeq { + for c in sequence do + let! c = task { return! c } + yield c + } + + let ofAsyncList (list: Async<'T> list) = taskSeq { + for c in list do + let! c = Task.ofAsync c + yield c + } + + let ofAsyncArray (array: Async<'T> array) = taskSeq { + for c in array do + let! c = Async.toTask c + yield c + } + + /// Unwraps the taskSeq as a Task>. This function is non-blocking. + let toArrayAsync taskSeq = + Internal.toResizeArrayAsync taskSeq + |> Task.map (fun a -> a.ToArray()) + + /// Unwraps the taskSeq as a Task>. This function is non-blocking. + let toListAsync taskSeq = (Internal.toResizeArrayAsync >> Task.map List.ofSeq) taskSeq + + /// Unwraps the taskSeq as a Task>. This function is non-blocking. + let toResizeArrayAsync taskSeq = + Internal.toResizeArrayAsync taskSeq + |> Task.map (fun a -> a.ToArray()) + + /// Unwraps the taskSeq as a Task>. This function is non-blocking. + let toIListAsync taskSeq = + (Internal.toResizeArrayAsync + >> Task.map (fun x -> x :> IList<_>)) + taskSeq + + /// Unwraps the taskSeq as a Task>. This function is non-blocking, + /// exhausts the sequence and caches the results of the tasks in the sequence. + let toSeqCachedAsync taskSeq = + (Internal.toResizeArrayAsync + >> Task.map (fun x -> x :> seq<_>)) + taskSeq + + /// Iterates over the taskSeq. This function is non-blocking + /// exhausts the sequence as soon as the task is evaluated. + let iterAsync action taskSeq = Internal.iteriAsync (fun _ -> action) taskSeq + + /// Iterates over the taskSeq. This function is non-blocking, + /// exhausts the sequence as soon as the task is evaluated. + let iteriAsync action taskSeq = Internal.iteriAsync action taskSeq + + /// Maps over the taskSeq. This function is non-blocking. + let map (mapper: 'T -> 'U) taskSeq = Internal.mapi (fun _ -> mapper) taskSeq + + /// Maps over the taskSeq with an index. This function is non-blocking. + let mapi (mapper: int -> 'T -> 'U) taskSeq = Internal.mapi mapper taskSeq + + /// Maps over the taskSeq. This function is non-blocking. + let mapAsync (mapper: 'T -> Task<'U>) taskSeq = Internal.mapiAsync (fun _ -> mapper) taskSeq + + /// Maps over the taskSeq with an index. This function is non-blocking. + let mapiAsync (mapper: int -> 'T -> Task<'U>) taskSeq = Internal.mapiAsync mapper taskSeq + + /// Applies the given function to the items in the taskSeq and concatenates all the results in order. + let collect (binder: 'T -> #IAsyncEnumerable<'U>) taskSeq = Internal.collect binder taskSeq + + /// Applies the given function to the items in the taskSeq and concatenates all the results in order. + let collectSeq (binder: 'T -> #seq<'U>) taskSeq = Internal.collectSeq binder taskSeq + + /// Zips two task sequences, returning a taskSeq of the tuples of each sequence, in order. May raise ArgumentException + /// if the sequences are or unequal length. + let zip taskSeq1 taskSeq2 = Internal.zip taskSeq1 taskSeq2 diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs new file mode 100644 index 00000000..9b80d8f7 --- /dev/null +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -0,0 +1,545 @@ +namespace FSharpy.TaskSeq + +open System +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks +open System.Runtime.CompilerServices +open System.Threading.Tasks.Sources + +open FSharp.Core.CompilerServices +open FSharp.Core.CompilerServices.StateMachineHelpers + +#nowarn "57" // note: this is *not* an experimental feature, but they forgot to switch off the flag + +[] +module Internal = // cannot be marked with 'internal' scope + let verbose = false + + let inline MoveNext (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext() + + // F# requires that we implement interfaces even on an abstract class + let inline raiseNotImpl () = + NotImplementedException "Abstract Class: method or property not implemented" + |> raise + +type taskSeq<'T> = IAsyncEnumerable<'T> + +type IPriority1 = + interface + end + +type IPriority2 = + interface + end + +[] +type TaskSeqStateMachineData<'T>() = + [] + val mutable cancellationToken: CancellationToken + + [] + val mutable disposalStack: ResizeArray<(unit -> Task)> + + [] + val mutable awaiter: ICriticalNotifyCompletion + + [] + val mutable promiseOfValueOrEnd: ManualResetValueTaskSourceCore + + [] + val mutable builder: AsyncIteratorMethodBuilder + + [] + val mutable taken: bool + + [] + val mutable current: ValueOption<'T> + + [] + val mutable boxed: TaskSeq<'T> + // For tailcalls using 'return!' + [] + val mutable tailcallTarget: TaskSeq<'T> option + + member data.PushDispose(f: unit -> Task) = + match data.disposalStack with + | null -> data.disposalStack <- ResizeArray() + | _ -> () + + data.disposalStack.Add(f) + + member data.PopDispose() = + match data.disposalStack with + | null -> () + | _ -> data.disposalStack.RemoveAt(data.disposalStack.Count - 1) + +and [] TaskSeq<'T>() = + + abstract TailcallTarget: TaskSeq<'T> option + abstract MoveNextAsyncResult: unit -> ValueTask + + interface IAsyncEnumerator<'T> with + member _.Current = raiseNotImpl () + member _.MoveNextAsync() = raiseNotImpl () + + interface IAsyncDisposable with + member _.DisposeAsync() = raiseNotImpl () + + interface IAsyncEnumerable<'T> with + member _.GetAsyncEnumerator(ct) = raiseNotImpl () + + interface IAsyncStateMachine with + member _.MoveNext() = raiseNotImpl () + member _.SetStateMachine(_state) = raiseNotImpl () + + interface IValueTaskSource with + member _.GetResult(_token: int16) = raiseNotImpl () + member _.GetStatus(_token: int16) = raiseNotImpl () + member _.OnCompleted(_continuation, _state, _token, _flags) = raiseNotImpl () + + interface IValueTaskSource with + member _.GetStatus(_token: int16) = raiseNotImpl () + member _.GetResult(_token: int16) = raiseNotImpl () + member _.OnCompleted(_continuation, _state, _token, _flags) = raiseNotImpl () + +and [] TaskSeq<'Machine, 'T + when 'Machine :> IAsyncStateMachine and 'Machine :> IResumableStateMachine>>() = + inherit TaskSeq<'T>() + let initialThreadId = Environment.CurrentManagedThreadId + + [] + val mutable Machine: 'Machine + + member internal ts.hijack() = + let res = ts.Machine.Data.tailcallTarget + + match res with + | Some tg -> + match tg.TailcallTarget with + | None -> res + | (Some tg2 as res2) -> + // Cut out chains of tailcalls + ts.Machine.Data.tailcallTarget <- Some tg2 + res2 + | None -> res + + // Note: Not entirely clear if this is needed, everything still compiles without it + interface IValueTaskSource with + member ts.GetResult(token: int16) = + match ts.hijack () with + | Some tg -> (tg :> IValueTaskSource).GetResult(token) + | None -> + ts.Machine.Data.promiseOfValueOrEnd.GetResult(token) + |> ignore + + member ts.GetStatus(token: int16) = + match ts.hijack () with + | Some tg -> (tg :> IValueTaskSource).GetStatus(token) + | None -> ts.Machine.Data.promiseOfValueOrEnd.GetStatus(token) + + member ts.OnCompleted(continuation, state, token, flags) = + match ts.hijack () with + | Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags) + | None -> ts.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) + + // Needed for MoveNextAsync to return a ValueTask + interface IValueTaskSource with + member ts.GetStatus(token: int16) = + match ts.hijack () with + | Some tg -> (tg :> IValueTaskSource).GetStatus(token) + | None -> ts.Machine.Data.promiseOfValueOrEnd.GetStatus(token) + + member ts.GetResult(token: int16) = + match ts.hijack () with + | Some tg -> (tg :> IValueTaskSource).GetResult(token) + | None -> ts.Machine.Data.promiseOfValueOrEnd.GetResult(token) + + member ts.OnCompleted(continuation, state, token, flags) = + match ts.hijack () with + | Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags) + | None -> ts.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) + + interface IAsyncStateMachine with + member ts.MoveNext() = + match ts.hijack () with + | Some tg -> (tg :> IAsyncStateMachine).MoveNext() + | None -> MoveNext(&ts.Machine) + + member _.SetStateMachine(_state) = () // not needed for reference type + + interface IAsyncEnumerable<'T> with + member ts.GetAsyncEnumerator(ct) = + let data = ts.Machine.Data + + if + (not data.taken + && initialThreadId = Environment.CurrentManagedThreadId) + then + data.taken <- true + data.cancellationToken <- ct + data.builder <- AsyncIteratorMethodBuilder.Create() + (ts :> IAsyncEnumerator<_>) + else + if verbose then + printfn "GetAsyncEnumerator, cloning..." + + let clone = ts.MemberwiseClone() :?> TaskSeq<'Machine, 'T> + data.taken <- true + clone.Machine.Data.cancellationToken <- ct + (clone :> System.Collections.Generic.IAsyncEnumerator<'T>) + + interface IAsyncDisposable with + member ts.DisposeAsync() = + match ts.hijack () with + | Some tg -> (tg :> IAsyncDisposable).DisposeAsync() + | None -> + if verbose then + printfn "DisposeAsync..." + + task { + match ts.Machine.Data.disposalStack with + | null -> () + | _ -> + let mutable exn = None + + for d in Seq.rev ts.Machine.Data.disposalStack do + try + do! d () + with e -> + if exn.IsNone then + exn <- Some e + + match exn with + | None -> () + | Some e -> raise e + } + |> ValueTask + + interface System.Collections.Generic.IAsyncEnumerator<'T> with + member ts.Current = + match ts.hijack () with + | Some tg -> (tg :> IAsyncEnumerator<'T>).Current + | None -> + match ts.Machine.Data.current with + | ValueSome x -> x + | ValueNone -> failwith "no current value" + + member ts.MoveNextAsync() = + match ts.hijack () with + | Some tg -> (tg :> IAsyncEnumerator<'T>).MoveNextAsync() + | None -> + if verbose then + printfn "MoveNextAsync..." + + if ts.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable + ValueTask() + else + let data = ts.Machine.Data + data.promiseOfValueOrEnd.Reset() + let mutable ts = ts + data.builder.MoveNext(&ts) + + // If the move did a hijack then get the result from the final one + match ts.hijack () with + | Some tg -> tg.MoveNextAsyncResult() + | None -> ts.MoveNextAsyncResult() + + override ts.MoveNextAsyncResult() = + let data = ts.Machine.Data + let version = data.promiseOfValueOrEnd.Version + let status = data.promiseOfValueOrEnd.GetStatus(version) + + if status = ValueTaskSourceStatus.Succeeded then + let result = data.promiseOfValueOrEnd.GetResult(version) + ValueTask(result) + else + if verbose then + printfn "MoveNextAsync pending/faulted/cancelled..." + + ValueTask(ts, version) // uses IValueTaskSource<'T> + + override cr.TailcallTarget = cr.hijack () + +and TaskSeqCode<'T> = ResumableCode, unit> +and TaskSeqStateMachine<'T> = ResumableStateMachine> +and TaskSeqResumptionFunc<'T> = ResumptionFunc> +and TaskSeqResumptionDynamicInfo<'T> = ResumptionDynamicInfo> + +type TaskSeqBuilder() = + + member inline _.Delay(f: unit -> TaskSeqCode<'T>) : TaskSeqCode<'T> = TaskSeqCode<'T>(fun sm -> f().Invoke(&sm)) + + member inline _.Run(code: TaskSeqCode<'T>) : IAsyncEnumerable<'T> = + if __useResumableCode then + // This is the static implementation. A new struct type is created. + __stateMachine, IAsyncEnumerable<'T>> + // IAsyncStateMachine.MoveNext + (MoveNextMethodImpl<_>(fun sm -> + //-- RESUMABLE CODE START + __resumeAt sm.ResumptionPoint + + try + //printfn "at Run.MoveNext start" + //Console.WriteLine("[{0}] resuming by invoking {1}....", sm.MethodBuilder.Task.Id, hashq sm.ResumptionFunc ) + let __stack_code_fin = code.Invoke(&sm) + //printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}" + if __stack_code_fin then + //printfn $"at Run.MoveNext, done" + sm.Data.promiseOfValueOrEnd.SetResult(false) + sm.Data.builder.Complete() + elif sm.Data.current.IsSome then + //printfn $"at Run.MoveNext, yield" + sm.Data.promiseOfValueOrEnd.SetResult(true) + else + // Goto request + match sm.Data.tailcallTarget with + | Some tg -> + //printfn $"at Run.MoveNext, hijack" + let mutable tg = tg + MoveNext(&tg) + | None -> + //printfn $"at Run.MoveNext, await" + let boxed = sm.Data.boxed + + sm.Data.awaiter.UnsafeOnCompleted( + Action(fun () -> + let mutable boxed = boxed + MoveNext(&boxed)) + ) + + with exn -> + //Console.WriteLine("[{0}] SetException {1}", sm.MethodBuilder.Task.Id, exn) + sm.Data.promiseOfValueOrEnd.SetException(exn) + sm.Data.builder.Complete() + //-- RESUMABLE CODE END + )) + (SetStateMachineMethodImpl<_>(fun sm state -> ())) + (AfterCode<_, _>(fun sm -> + let ts = TaskSeq, 'T>() + ts.Machine <- sm + ts.Machine.Data <- TaskSeqStateMachineData() + ts.Machine.Data.boxed <- ts + ts :> IAsyncEnumerable<'T>)) + else + failwith "no dynamic implementation as yet" + // let initialResumptionFunc = TaskSeqResumptionFunc<'T>(fun sm -> code.Invoke(&sm)) + // let resumptionFuncExecutor = TaskSeqResumptionExecutor<'T>(fun sm f -> + // // TODO: add exception handling? + // if f.Invoke(&sm) then + // sm.ResumptionPoint <- -2) + // let setStateMachine = SetStateMachineMethodImpl<_>(fun sm f -> ()) + // sm.Machine.ResumptionFuncInfo <- (initialResumptionFunc, resumptionFuncExecutor, setStateMachine) + //sm.Start() + + + member inline _.Zero() : TaskSeqCode<'T> = ResumableCode.Zero() + + member inline _.Combine(task1: TaskSeqCode<'T>, task2: TaskSeqCode<'T>) : TaskSeqCode<'T> = + ResumableCode.Combine(task1, task2) + + member inline _.WhileAsync + ( + [] condition: unit -> ValueTask, + body: TaskSeqCode<'T> + ) : TaskSeqCode<'T> = + let mutable condition_res = true + + ResumableCode.While( + (fun () -> condition_res), + ResumableCode<_, _>(fun sm -> + let mutable __stack_condition_fin = true + let __stack_vtask = condition () + + if __stack_vtask.IsCompleted then + __stack_condition_fin <- true + condition_res <- __stack_vtask.Result + else + let task = __stack_vtask.AsTask() + let mutable awaiter = task.GetAwaiter() + // This will yield with __stack_fin = false + // This will resume with __stack_fin = true + let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) + __stack_condition_fin <- __stack_yield_fin + + if __stack_condition_fin then + condition_res <- task.Result + else + //if verbose then printfn "calling AwaitUnsafeOnCompleted" + sm.Data.awaiter <- awaiter + sm.Data.current <- ValueNone + + if __stack_condition_fin then + if condition_res then body.Invoke(&sm) else true + else + false) + ) + + member inline b.While([] condition: unit -> bool, body: TaskSeqCode<'T>) : TaskSeqCode<'T> = + b.WhileAsync((fun () -> ValueTask(condition ())), body) + + member inline _.TryWith(body: TaskSeqCode<'T>, catch: exn -> TaskSeqCode<'T>) : TaskSeqCode<'T> = + ResumableCode.TryWith(body, catch) + + member inline _.TryFinallyAsync(body: TaskSeqCode<'T>, compensation: unit -> Task) : TaskSeqCode<'T> = + ResumableCode.TryFinallyAsync( + TaskSeqCode<'T>(fun sm -> + sm.Data.PushDispose(fun () -> compensation ()) + body.Invoke(&sm)), + ResumableCode<_, _>(fun sm -> + sm.Data.PopDispose() + let mutable __stack_condition_fin = true + let __stack_vtask = compensation () + + if not __stack_vtask.IsCompleted then + let mutable awaiter = __stack_vtask.GetAwaiter() + let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) + __stack_condition_fin <- __stack_yield_fin + + if not __stack_condition_fin then + sm.Data.awaiter <- awaiter + + __stack_condition_fin) + ) + + member inline _.TryFinally(body: TaskSeqCode<'T>, compensation: unit -> unit) : TaskSeqCode<'T> = + ResumableCode.TryFinally( + TaskSeqCode<'T>(fun sm -> + sm.Data.PushDispose(fun () -> + compensation () + Task.CompletedTask) + + body.Invoke(&sm)), + ResumableCode<_, _>(fun sm -> + sm.Data.PopDispose() + compensation () + true) + ) + + member inline this.Using + ( + disp: #IDisposable, + body: #IDisposable -> TaskSeqCode<'T>, + ?priority: IPriority2 + ) : TaskSeqCode<'T> = + ignore priority + // A using statement is just a try/finally with the finally block disposing if non-null. + this.TryFinally( + (fun sm -> (body disp).Invoke(&sm)), + (fun () -> + if not (isNull (box disp)) then + disp.Dispose()) + ) + + member inline this.Using + ( + disp: #IAsyncDisposable, + body: #IAsyncDisposable -> TaskSeqCode<'T>, + ?priority: IPriority1 + ) : TaskSeqCode<'T> = + ignore priority + // A using statement is just a try/finally with the finally block disposing if non-null. + this.TryFinallyAsync( + (fun sm -> (body disp).Invoke(&sm)), + (fun () -> + if not (isNull (box disp)) then + disp.DisposeAsync().AsTask() + else + Task.CompletedTask) + ) + + member inline this.For(sequence: seq<'TElement>, body: 'TElement -> TaskSeqCode<'T>) : TaskSeqCode<'T> = + // A for loop is just a using statement on the sequence's enumerator... + this.Using( + sequence.GetEnumerator(), + // ... and its body is a while loop that advances the enumerator and runs the body on each element. + (fun e -> this.While((fun () -> e.MoveNext()), (fun sm -> (body e.Current).Invoke(&sm)))) + ) + + member inline this.For(source: #IAsyncEnumerable<'TElement>, body: 'TElement -> TaskSeqCode<'T>) : TaskSeqCode<'T> = + TaskSeqCode<'T>(fun sm -> + this + .Using( + source.GetAsyncEnumerator(sm.Data.cancellationToken), + (fun e -> this.WhileAsync((fun () -> e.MoveNextAsync()), (fun sm -> (body e.Current).Invoke(&sm)))) + ) + .Invoke(&sm)) + + member inline _.Yield(v: 'T) : TaskSeqCode<'T> = + TaskSeqCode<'T>(fun sm -> + // This will yield with __stack_fin = false + // This will resume with __stack_fin = true + let __stack_fin = ResumableCode.Yield().Invoke(&sm) + sm.Data.current <- ValueSome v + sm.Data.awaiter <- null + __stack_fin) + + member inline this.YieldFrom(source: IAsyncEnumerable<'T>) : TaskSeqCode<'T> = + this.For(source, (fun v -> this.Yield(v))) + + member inline this.YieldFrom(source: seq<'T>) : TaskSeqCode<'T> = this.For(source, (fun v -> this.Yield(v))) + + member inline _.Bind(task: Task<'TResult1>, continuation: ('TResult1 -> TaskSeqCode<'T>)) : TaskSeqCode<'T> = + TaskSeqCode<'T>(fun sm -> + let mutable awaiter = task.GetAwaiter() + let mutable __stack_fin = true + + if not awaiter.IsCompleted then + // This will yield with __stack_fin2 = false + // This will resume with __stack_fin2 = true + let __stack_fin2 = ResumableCode.Yield().Invoke(&sm) + __stack_fin <- __stack_fin2 + + if __stack_fin then + let result = awaiter.GetResult() + (continuation result).Invoke(&sm) + else + if verbose then + printfn "calling AwaitUnsafeOnCompleted" + + sm.Data.awaiter <- awaiter + sm.Data.current <- ValueNone + false) + + member inline _.Bind(task: ValueTask<'TResult1>, continuation: ('TResult1 -> TaskSeqCode<'T>)) : TaskSeqCode<'T> = + TaskSeqCode<'T>(fun sm -> + let mutable awaiter = task.GetAwaiter() + let mutable __stack_fin = true + + if not awaiter.IsCompleted then + // This will yield with __stack_fin2 = false + // This will resume with __stack_fin2 = true + let __stack_fin2 = ResumableCode.Yield().Invoke(&sm) + __stack_fin <- __stack_fin2 + + if __stack_fin then + let result = awaiter.GetResult() + (continuation result).Invoke(&sm) + else + if verbose then + printfn "calling AwaitUnsafeOnCompleted" + + sm.Data.awaiter <- awaiter + sm.Data.current <- ValueNone + false) + + // TODO: using return! for tailcalls is wrong. We should use yield! and have F# + // desugar to a different builder method when in tailcall position + // + // Because of this using return! from non-tailcall position e.g. in a try-finally or try-with will + // giv incorrect results (escaping the exception handler - 'close up shop and draw results from somewhere else') + member inline b.ReturnFrom(other: IAsyncEnumerable<'T>) : TaskSeqCode<'T> = + TaskSeqCode<_>(fun sm -> + match other with + | :? TaskSeq<'T> as other -> + sm.Data.tailcallTarget <- Some other + sm.Data.awaiter <- null + sm.Data.current <- ValueNone + // For tailcalls we return 'false' and re-run from the entry (trampoline) + false + | _ -> b.YieldFrom(other).Invoke(&sm)) + +[] +module TaskSeqBuilder = + /// A TaskSeq workflow for IAsyncEnumerable<'T> types. + let taskSeq = TaskSeqBuilder() diff --git a/src/FSharpy.TaskSeq/TaskSeqInternal.fs b/src/FSharpy.TaskSeq/TaskSeqInternal.fs new file mode 100644 index 00000000..1ba5ee47 --- /dev/null +++ b/src/FSharpy.TaskSeq/TaskSeqInternal.fs @@ -0,0 +1,101 @@ +namespace FSharpy.TaskSeq + +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks + +module TaskSeq = + module internal Internal = + let iteriAsync action (taskSeq: taskSeq<_>) = task { + let e = taskSeq.GetAsyncEnumerator(CancellationToken()) + let mutable go = true + let mutable i = 0 + let! step = e.MoveNextAsync() + go <- step + + while go do + action i e.Current + let! step = e.MoveNextAsync() + i <- i + 1 + go <- step + } + + let fold (action: 'State -> 'T -> 'State) initial (taskSeq: taskSeq<_>) = task { + let e = taskSeq.GetAsyncEnumerator(CancellationToken()) + let mutable go = true + let mutable result = initial + let! step = e.MoveNextAsync() + go <- step + + while go do + result <- action result e.Current + let! step = e.MoveNextAsync() + go <- step + + return result + } + + let toResizeArrayAsync taskSeq = task { + let res = ResizeArray() + do! taskSeq |> iteriAsync (fun _ item -> res.Add item) + return res + } + + let mapi mapper (taskSequence: taskSeq<_>) = taskSeq { + let mutable i = 0 + + for c in taskSequence do + yield mapper i c + i <- i + 1 + } + + let mapiAsync (mapper: _ -> _ -> Task<'T>) (taskSequence: taskSeq<_>) = taskSeq { + let mutable i = 0 + + for c in taskSequence do + let! x = mapper i c + yield x + i <- i + 1 + } + + let zip (taskSequence1: taskSeq<_>) (taskSequence2: taskSeq<_>) = taskSeq { + let e1 = taskSequence1.GetAsyncEnumerator(CancellationToken()) + let e2 = taskSequence2.GetAsyncEnumerator(CancellationToken()) + let mutable go = true + let! step1 = e1.MoveNextAsync() + let! step2 = e1.MoveNextAsync() + go <- step1 && step2 + + while go do + yield e1.Current, e2.Current + let! step1 = e1.MoveNextAsync() + let! step2 = e1.MoveNextAsync() + go <- step1 && step2 + + if step1 then + invalidArg "taskSequence1" "The task sequences had different lengths." + + if step2 then + invalidArg "taskSequence2" "The task sequences had different lengths." + } + + let collect (binder: _ -> #IAsyncEnumerable<_>) (taskSequence: taskSeq<_>) = taskSeq { + for c in taskSequence do + yield! binder c :> IAsyncEnumerable<_> + } + + let collectSeq (binder: _ -> #seq<_>) (taskSequence: taskSeq<_>) = taskSeq { + for c in taskSequence do + yield! binder c :> seq<_> + } + + /// Returns taskSeq as an array. This function is blocking until the sequence is exhausted. + let toListResult (t: taskSeq<'T>) = [ + let e = t.GetAsyncEnumerator(CancellationToken()) + + try + while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do + yield e.Current + finally + e.DisposeAsync().AsTask().Wait() + ] diff --git a/src/FSharpy.TaskSeq/Utils.fs b/src/FSharpy.TaskSeq/Utils.fs new file mode 100644 index 00000000..69c654ac --- /dev/null +++ b/src/FSharpy.TaskSeq/Utils.fs @@ -0,0 +1,63 @@ +namespace FSharpy.TaskSeq + +open System.Threading.Tasks + +module Task = + /// Convert an Async<'T> into a Task<'T> + let inline ofAsync (async: Async<'T>) = task { return! async } + + /// Convert a unit-task into a Task + let inline ofTask (task': Task) = task { do! task' } + + /// Convert a Task<'T> into an Async<'T> + let inline toAsync (task: Task<'T>) = Async.AwaitTask task + + /// Convert a Task into a Task + let inline toTask (task: Task) = task :> Task + + /// Convert a Task<'T> into a Task, ignoring the result + let inline ignore (task: Task<'T>) = + TaskBuilder.task { + let! _ = task + return () + } + :> Task + + /// Map a Tas<'T> + let inline map mapper (task: Task<'T>) : Task<'U> = + TaskBuilder.task { + let! result = task + return mapper result + } + + /// Bind a Task<'T> + let inline bind binder (task: Task<'T>) : Task<'U> = + TaskBuilder.task { return! binder task } + +module Async = + /// Convert an Task<'T> into an Async<'T> + let inline ofTask (task: Task<'T>) = Async.AwaitTask task + + /// Convert a unit-task into an Async + let inline ofUnitTask (task: Task) = Async.AwaitTask task + + /// Convert a Task<'T> into an Async<'T> + let inline toTask (async: Async<'T>) = task { return! async } + + /// Convert an Async<'T> into an Async, ignoring the result + let inline ignore (async': Async<'T>) = + async { + let! _ = async' + return () + } + + /// Map an Async<'T> + let inline map mapper (async: Async<'T>) : Async<'U> = + ExtraTopLevelOperators.async { + let! result = async + return mapper result + } + + /// Bind an Async<'T> + let inline bind binder (task: Async<'T>) : Async<'U> = + ExtraTopLevelOperators.async { return! binder task }