-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathSequence.fs
57 lines (41 loc) · 2.36 KB
/
Sequence.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Manages a sequence of ids, without provision for returning unused ones in cases where we're potentially leaving a gap
// see Gapless.fs for a potential approach for handling such a desire
module Sequence
open System
let [<Literal>] private CategoryName = "Sequence"
let private streamId = FsCodec.StreamId.gen SequenceId.toString
// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
type Reserved = { next : int64 }
type Event =
| Reserved of Reserved
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>()
module Fold =
type State = { next : int64 }
let initial = { next = 0L }
let private evolve _ignoreState = function
| Events.Reserved e -> { next = e.next }
let fold (state: State) (events: seq<Events.Event>) : State =
Seq.tryLast events |> Option.fold evolve state
let snapshot (state : State) = Events.Reserved { next = state.next }
let decideReserve (count : int) (state : Fold.State) : int64 * Events.Event[] =
state.next, [| Events.Reserved { next = state.next + int64 count } |]
type Service internal (resolve: SequenceId -> Equinox.Decider<Events.Event, Fold.State>) =
/// Reserves an id, yielding the reserved value. Optional <c>count</c> enables reserving more than the default count of <c>1</c> in a single transaction
member _.Reserve(series,?count) : Async<int64> =
let decider = resolve series
decider.Transact(decideReserve (defaultArg count 1))
let create cat = Service(streamId >> Equinox.Decider.forStream (Serilog.Log.ForContext<Service>()) cat)
module Cosmos =
open Equinox.CosmosStore
let private create (context, cache, accessStrategy) =
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
CosmosStoreCategory(context, CategoryName, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
module LatestKnownEvent =
let category (context, cache) =
let accessStrategy = AccessStrategy.LatestKnownEvent
create (context, cache, accessStrategy)
module RollingUnfolds =
let category (context, cache) =
create (context, cache, AccessStrategy.RollingState Fold.snapshot)