-
Notifications
You must be signed in to change notification settings - Fork 25
/
Internal.fs
394 lines (334 loc) · 18.6 KB
/
Internal.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
module Propulsion.Internal
open System
module TimeSpan =
let seconds (x: float) = TimeSpan.FromSeconds x
let minutes (x: float) = TimeSpan.FromMinutes x
let hours (x: float) = TimeSpan.FromHours x
let ms (x: float) = TimeSpan.FromMilliseconds x
let toMs (ts: TimeSpan): int = ts.TotalMilliseconds |> int
let humanize: TimeSpan -> string = function
| x when x.TotalDays >= 1. -> x.ToString "d\dhh\hmm\m"
| x when x.TotalHours >= 1. -> x.ToString "h\hmm\mss\s"
| x when x.TotalMinutes >= 1. -> x.ToString "m\mss\.ff\s"
| x -> x.ToString("s\.fff\s")
module Stopwatch =
let inline create () = System.Diagnostics.Stopwatch()
let inline start () = System.Diagnostics.Stopwatch.StartNew()
let inline timestamp () = System.Diagnostics.Stopwatch.GetTimestamp()
let inline ticksToSeconds ticks = double ticks / double System.Diagnostics.Stopwatch.Frequency
let inline ticksToTimeSpan ticks = ticksToSeconds ticks |> TimeSpan.FromSeconds
let inline elapsedTicks (ts: int64) = timestamp () - ts
let inline elapsedSeconds (ts: int64) = elapsedTicks ts |> ticksToSeconds
let inline elapsed (ts: int64) = elapsedTicks ts |> ticksToTimeSpan // equivalent to .NET 7 System.Diagnostics.Stopwatch.GetElapsedTime()
type System.Diagnostics.Stopwatch with
member x.ElapsedSeconds = float x.ElapsedMilliseconds / 1000.
member x.ElapsedMinutes = x.ElapsedSeconds / 60.
/// Manages a time cycle defined by `period`. Can be explicitly Trigger()ed prematurely
type IntervalTimer(period: TimeSpan) =
let sw = Stopwatch.start ()
let mutable force = false
let periodT = period.TotalSeconds * double System.Diagnostics.Stopwatch.Frequency |> int64
let periodMs = int64 period.TotalMilliseconds
member _.Trigger() =
force <- true
member _.Restart() =
force <- false
sw.Restart()
member val Period = period
member _.IsDue = sw.ElapsedTicks > periodT || force
member _.IsTriggered = force
member _.RemainingMs = match periodMs - sw.ElapsedMilliseconds with t when t <= 0L -> 0 | t -> int t
// NOTE asking the question is destructive - the timer is reset as a side effect
member x.IfDueRestart() =
if x.IsDue then x.Restart(); true
else false
/// Awaits reaction to a Trigger() invocation
member x.SleepUntilTriggerCleared(?timeout, ?sleepMs) =
if not x.IsTriggered then () else
// The processing loops run on 1s timers, so we busy-wait until they wake
let timeout = IntervalTimer(defaultArg timeout (TimeSpan.FromSeconds 2))
while x.IsTriggered && not timeout.IsDue do
System.Threading.Thread.Sleep(defaultArg sleepMs 1)
type System.Net.Http.HttpClient with
member x.GetToFile(uri: Uri, filePath) = task {
use! s = x.GetStreamAsync(uri)
use fs = System.IO.File.Create(filePath) // Will silently truncate if it exists
do! s.CopyToAsync(fs) }
module Uri =
let tryParseHttp uriOrFilepath =
match Uri.TryCreate(uriOrFilepath, UriKind.Absolute) with
| true, uri when uri.Scheme = Uri.UriSchemeHttp || uri.Scheme = Uri.UriSchemeHttps -> Some uri
| _, _ -> None
module Exception =
let rec inner (e: exn) =
match e with
| :? AggregateException as ae when ae.InnerExceptions.Count = 1 -> inner ae.InnerException
| e -> e
let (|Inner|) = inner
let [<return: Struct>] (|Log|_|) log (e: exn) = log e; ValueNone
type CancellationToken = System.Threading.CancellationToken
type IAsyncEnumerable<'T> = System.Collections.Generic.IAsyncEnumerable<'T>
type Task = System.Threading.Tasks.Task
type Task<'T> = System.Threading.Tasks.Task<'T>
open System.Threading.Tasks
module Channel =
open System.Threading.Channels
let unboundedSr<'t> = Channel.CreateUnbounded<'t>(UnboundedChannelOptions(SingleReader = true))
let unboundedSw<'t> = Channel.CreateUnbounded<'t>(UnboundedChannelOptions(SingleWriter = true))
let unboundedSwSr<'t> = Channel.CreateUnbounded<'t>(UnboundedChannelOptions(SingleWriter = true, SingleReader = true))
let boundedSw<'t> c = Channel.CreateBounded<'t>(BoundedChannelOptions(c, SingleWriter = true))
let waitToWrite (w: ChannelWriter<_>) ct = let vt = w.WaitToWriteAsync(ct) in vt.AsTask() :> Task
let tryWrite (w: ChannelWriter<_>) = w.TryWrite
let write (w: ChannelWriter<_>) = w.TryWrite >> ignore
let inline awaitRead (r: ChannelReader<_>) ct = let vt = r.WaitToReadAsync(ct) in vt.AsTask()
let inline tryRead (r: ChannelReader<_>) () =
let mutable msg = Unchecked.defaultof<_>
if r.TryRead(&msg) then ValueSome msg else ValueNone
let inline apply (r: ChannelReader<_>) f =
let mutable worked = false
let mutable msg = Unchecked.defaultof<_>
while r.TryRead(&msg) do
worked <- true
f msg
worked
let inline readAll (r: ChannelReader<_>) () = seq {
let mutable msg = Unchecked.defaultof<_>
while r.TryRead(&msg) do
yield msg }
module Async =
let ofUnitTask (t: Task) = Async.AwaitTaskCorrect t
let ofTask (t: Task<'t>) = Async.AwaitTaskCorrect t
let inline call (start: CancellationToken -> Task<'T>): Async<'T> = async {
let! ct = Async.CancellationToken
return! start ct |> ofTask }
let inline callUnit (start: CancellationToken -> Task): Async<unit> = async {
let! ct = Async.CancellationToken
return! start ct |> ofUnitTask }
let inline startImmediateAsTask (computation: Async<'T>) ct: Task<'T> = Async.StartImmediateAsTask(computation, ct)
let inline executeAsTask ct (computation: Async<'T>): Task<'T> = startImmediateAsTask computation ct
let parallelLimit maxDop xs: Async<'t []> = Async.Parallel(xs, maxDegreeOfParallelism = maxDop)
module Task =
let inline run create = Task.Run<unit>(Func<Task<unit>> create)
let inline start create = run create |> ignore<Task>
let inline delay (ts: TimeSpan) ct = Task.Delay(ts, ct)
let inline Catch (t: Task<'t>) = task { try let! r = t in return Ok r with e -> return Error e }
let private parallel_ maxDop ct (xs: seq<CancellationToken -> Task<'t>>): Task<'t []> =
Async.Parallel(xs |> Seq.map Async.call, ?maxDegreeOfParallelism = match maxDop with 0 -> None | x -> Some x) |> Async.executeAsTask ct
/// Runs an inner task with a dedicated Linked Token Source. Cancels via the ct upon completion, before Disposing the LCTS
let inline runWithCancellation (ct: CancellationToken) ([<InlineIfLambda>]f: CancellationToken -> Task) = task {
use cts = System.Threading.CancellationTokenSource.CreateLinkedTokenSource(ct) // https://stackoverflow.com/questions/6960520/when-to-dispose-cancellationtokensource
try do! f cts.Token
finally cts.Cancel() }
let parallelLimit maxDop ct xs: Task<'t []> =
parallel_ maxDop ct xs
let sequential ct xs: Task<'t []> =
parallel_ 1 ct xs
let parallelUnlimited ct xs: Task<'t []> =
parallel_ 0 ct xs
let inline ignore<'T> (a: Task<'T>): Task<unit> = task { let! _ = a in return () }
let ofUnitTask (x: Task): Task<unit> = task { return! x }
let periodically (f: CancellationToken -> Task<unit>) interval (ct: CancellationToken) = task {
let t = new System.Threading.PeriodicTimer(interval) // no use as ct will Dispose
use _ = ct.Register(Action t.Dispose)
while not ct.IsCancellationRequested do
match! t.WaitForNextTickAsync CancellationToken.None with
| false -> ()
| true -> do! f ct }
type Sem(max) =
let inner = new System.Threading.SemaphoreSlim(max)
member _.HasCapacity = inner.CurrentCount <> 0
member _.State = struct(max - inner.CurrentCount, max)
member _.TryTake() = inner.Wait 0
member _.Release() = inner.Release() |> ignore
member _.Wait(ct: CancellationToken) = inner.WaitAsync(ct)
member x.WaitButRelease(ct: CancellationToken) = // see https://stackoverflow.com/questions/31621644/task-whenany-and-semaphoreslim-class/73197290?noredirect=1#comment129334330_73197290
if x.TryTake() then x.Release(); Task.CompletedTask
else let tco = TaskContinuationOptions.OnlyOnRanToCompletion ||| TaskContinuationOptions.ExecuteSynchronously
x.Wait(ct).ContinueWith((fun _ -> x.Release()), ct, tco, TaskScheduler.Default)
/// Manage a controlled shutdown by accumulating reservations of the full capacity.
member x.WaitForCompleted(ct: CancellationToken) = task {
for _ in 1..max do do! x.Wait(ct)
return struct (0, max) }
/// Helper for use in Propulsion.Tool and/or equivalent apps; needs to be (informally) exposed
type Async with
/// Asynchronously awaits the next keyboard interrupt event, throwing a TaskCanceledException
/// Honors cancellation so it can be used with Async.Parallel to have multiple pump loops couple their fates
static member AwaitKeyboardInterruptAsTaskCanceledException() = async {
let! ct = Async.CancellationToken
let tcs = TaskCompletionSource()
use _ = ct.Register(fun () ->
tcs.TrySetCanceled() |> ignore)
use _ = Console.CancelKeyPress.Subscribe(fun (a: ConsoleCancelEventArgs) ->
a.Cancel <- true // We're using this exception to drive a controlled shutdown so inhibit the standard behavior
tcs.TrySetException(TaskCanceledException "Execution cancelled via Ctrl-C/Break; exiting...") |> ignore)
return! tcs.Task |> Async.ofUnitTask }
type OAttribute = System.Runtime.InteropServices.OptionalAttribute
type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute
module ValueTuple =
let inline fst struct (f, _s) = f
let inline snd struct (_f, s) = s
let inline ofKvp (x: System.Collections.Generic.KeyValuePair<_, _>) = struct (x.Key, x.Value)
let inline toKvp struct (k, v) = System.Collections.Generic.KeyValuePair(k, v)
let inline groupWith ([<InlineIfLambda>] f) xs =
Seq.groupBy fst xs
|> Seq.map (fun (k, xs) -> struct (k, xs |> Seq.map snd |> f))
module ValueOption =
let inline ofOption x = match x with Some x -> ValueSome x | None -> ValueNone
let inline toOption x = match x with ValueSome x -> Some x | ValueNone -> None
let inline map f x = match x with ValueSome x -> ValueSome (f x) | ValueNone -> ValueNone
module Seq =
let partition predicate xs =
let ham = ResizeArray()
let spam = ResizeArray()
for x in xs do
if predicate x then ham.Add x
else spam.Add x
ham.ToArray(), spam.ToArray()
let tryPickV f (xs: _ seq) =
use e = xs.GetEnumerator()
let mutable res = ValueNone
while ValueOption.isNone res && e.MoveNext() do
res <- f e.Current
res
let inline chooseV f xs = seq { for x in xs do match f x with ValueSome v -> yield v | ValueNone -> () }
module Array =
let inline any xs = (not << Array.isEmpty) xs
let inline chooseV f xs = [| for item in xs do match f item with ValueSome v -> yield v | ValueNone -> () |]
module Stats =
open System.Collections.Generic
/// Gathers stats relating to how many items have been observed, indexed by a string name
type Counters() =
let cats = Dictionary<string, int64>()
member _.Ingest(cat, ?weight) =
let weight = defaultArg weight 1L
match cats.TryGetValue cat with
| true, catCount -> cats[cat] <- catCount + weight
| false, _ -> cats[cat] <- weight
member _.Count = cats.Count
member x.Any = x.Count <> 0
member _.All = cats |> Seq.map ValueTuple.ofKvp
member x.StatsDescending = x.All |> Seq.sortByDescending ValueTuple.snd
member _.Clear() = cats.Clear()
let private logStatsPadded (log: Serilog.ILogger) names =
let maxGroupLen = names |> Seq.map String.length |> Seq.max // NOTE caller must guarantee >1 item
fun (label: string) stats -> log.Information(" {label} {stats}", label.PadRight maxGroupLen, stats)
let dumpCounterSet (log: Serilog.ILogger) totalLabel (cats: IReadOnlyDictionary<string, Counters>) =
let keys = cats.Keys
let emit = logStatsPadded log keys
let summary =
cats.Values
|> Seq.collect _.All
|> Seq.groupBy ValueTuple.fst
|> Seq.map (fun (g, xs) -> struct (g, Seq.sumBy ValueTuple.snd xs))
|> Seq.sortByDescending ValueTuple.snd
emit totalLabel summary
for cat in keys |> Seq.sort do
emit cat cats[cat].StatsDescending
[<Struct>]
type private Data =
{ min : TimeSpan
p50 : TimeSpan
p95 : TimeSpan
p99 : TimeSpan
max : TimeSpan
avg : TimeSpan
stddev : TimeSpan voption }
open MathNet.Numerics.Statistics
let private logLatencyPercentiles (log: Serilog.ILogger) (label: string) (xs: TimeSpan seq) =
let sortedLatencies = xs |> Seq.map _.TotalSeconds |> Seq.sort |> Seq.toArray
let pc p = SortedArrayStatistics.Percentile(sortedLatencies, p) |> TimeSpan.FromSeconds
let l = {
avg = ArrayStatistics.Mean sortedLatencies |> TimeSpan.FromSeconds
stddev =
let stdDev = ArrayStatistics.StandardDeviation sortedLatencies
// stddev of singletons is NaN
if Double.IsNaN stdDev then ValueNone else TimeSpan.FromSeconds stdDev |> ValueSome
min = SortedArrayStatistics.Minimum sortedLatencies |> TimeSpan.FromSeconds
max = SortedArrayStatistics.Maximum sortedLatencies |> TimeSpan.FromSeconds
p50 = pc 50
p95 = pc 95
p99 = pc 99 }
let stdDev = match l.stddev with ValueNone -> Double.NaN | ValueSome d -> d.TotalSeconds
log.Information(" {kind} {count,5} : max={max:n3}s p99={p99:n3}s p95={p95:n3}s p50={p50:n3}s min={min:n3}s avg={avg:n3}s stddev={stddev:n3}s",
label, sortedLatencies.Length, l.max.TotalSeconds, l.p99.TotalSeconds, l.p95.TotalSeconds, l.p50.TotalSeconds, l.min.TotalSeconds, l.avg.TotalSeconds, stdDev)
let logLatencyPercentilesPadded log names =
let maxGroupLen = names |> Seq.map String.length |> Seq.max // NOTE caller must guarantee >1 item
fun (label: string) -> logLatencyPercentiles log (label.PadRight maxGroupLen)
/// Operations on an instance are safe cross-thread
type ConcurrentLatencyStats(label) =
let buffer = System.Collections.Concurrent.ConcurrentStack<TimeSpan>()
member _.Record value = buffer.Push value
member _.Dump(log: Serilog.ILogger) =
if not buffer.IsEmpty then
logLatencyPercentiles log label buffer
buffer.Clear() // yes, there is a race
/// Not thread-safe, i.e. suitable for use in a Stats handler only
type LatencyStats(label) =
let buffer = ResizeArray<TimeSpan>()
member _.Record value = buffer.Add value
member _.Dump(log: Serilog.ILogger) =
if buffer.Count <> 0 then
logLatencyPercentiles log label buffer
buffer.Clear()
/// Not thread-safe, i.e. suitable for use in a Stats handler only
type LatencyStatsSet() =
let buckets = Dictionary<string, ResizeArray<TimeSpan>>()
member _.Record(bucket, value: TimeSpan) =
match buckets.TryGetValue bucket with
| false, _ -> let n = ResizeArray() in n.Add value; buckets.Add(bucket, n)
| true, buf -> buf.Add value
member _.Dump(log: Serilog.ILogger, ?totalLabel, ?labelSortOrder) =
if buckets.Count <> 0 then
let emit = logLatencyPercentilesPadded log buckets.Keys
totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect _.Value))
for name in Seq.sortBy (defaultArg labelSortOrder id) buckets.Keys do
emit name buckets[name]
member _.DumpGrouped(bucketGroup, log: Serilog.ILogger, ?totalLabel) =
if buckets.Count <> 0 then
let clusters = buckets |> Seq.groupBy (fun kv -> bucketGroup kv.Key) |> Seq.sortBy fst |> Seq.toArray
let emit = logLatencyPercentilesPadded log (clusters |> Seq.map fst)
totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect _.Value))
for name, items in clusters do
emit name (items |> Seq.collect _.Value)
member _.Clear() = buckets.Clear()
/// Not thread-safe, i.e. suitable for use in a Stats handler only
type CategoryCounters() =
let cats = Dictionary<string, Counters>()
member _.Ingest(category, counts) =
let cat =
match cats.TryGetValue category with
| false, _ -> let acc = Counters() in cats.Add(category, acc); acc
| true, acc -> acc
for event, count : int in counts do cat.Ingest(event, count)
member _.Categories = cats.Keys
member _.StatsDescending cat =
match cats.TryGetValue cat with
| true, acc -> acc.StatsDescending
| false, _ -> Seq.empty
member _.DumpGrouped(log: Serilog.ILogger, totalLabel) =
if cats.Count <> 0 then
dumpCounterSet log totalLabel cats
member _.Clear() = cats.Clear()
/// Not thread-safe, i.e. suitable for use in a Stats handler only
type EventTypeLatencies() =
let inner = LatencyStatsSet()
member _.Record(category: string, eventType: string, latency) =
let key = $"{category}/{eventType}"
inner.Record(key, latency)
member _.Dump(log, totalLabel) =
let inline catFromKey (key: string) = key.Substring(0, key.IndexOf '/')
inner.DumpGrouped(catFromKey, log, totalLabel = totalLabel)
inner.Dump log
member _.Clear() = inner.Clear()
type LogEventLevel = Serilog.Events.LogEventLevel
module Log =
let inline miB x = float x / 1024. / 1024.
/// Attach a property to the captured event record to hold the metric information
// Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124
let withScalarProperty (key: string) (value: 'T) (log: Serilog.ILogger) =
let enrich (e: Serilog.Events.LogEvent) =
e.AddPropertyIfAbsent(Serilog.Events.LogEventProperty(key, Serilog.Events.ScalarValue(value)))
log.ForContext({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt,_) = enrich evt })
let [<return: Struct>] (|ScalarValue|_|): Serilog.Events.LogEventPropertyValue -> obj voption = function
| :? Serilog.Events.ScalarValue as x -> ValueSome x.Value
| _ -> ValueNone