Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Equinox.Cosmos Storage + Programming Model description #50

Closed
bartelink opened this issue Nov 24, 2018 · 4 comments
Closed

WIP: Equinox.Cosmos Storage + Programming Model description #50

bartelink opened this issue Nov 24, 2018 · 4 comments

Comments

@bartelink
Copy link
Collaborator

bartelink commented Nov 24, 2018

NB this is long and needs lots of editing.

Storage model (see source)

Batches

Events are stored in immutable batches consisting of:

  • partitionKey: string // stream identifier
  • index: int64 // base index of this batch (0 for first event in a stream)
  • id: string // same as i (CosmosDb forces every doc to have one, and it must be a string)
  • events: Event[] // (see next section) typically there is one item in the array (can be many if events are small, for RU and perf-efficiency reasons)
  • ts // CosmosDb-intrinsic last updated date for this record (changes when replicated etc, hence see t below)

Events

Per Event, we have the following:

  • case - the case of this union in the Discriminated Union of events this stream bears (aka Event Type)
  • data - json data (CosmosDb maintains it as actual json; it can be indexed and queried if desired)
  • metadata - carries ancillary information for an event
  • t - creation timestamp

Tip Batch

The tip is readable via a point-read, as the id has a fixed known value (-1). It uses the same base layout as an Event-Batch, but adds the following:

  • _etag: CosmosDb-managed field updated per-touch (facilitates NotModified result, see below)
  • id: always -1 so one can reference it in a point-read GET request and not pay the cost and latency associated with a full query
  • u: Array of _unfold_ed events based on a point-in-time state (see State, Snapshots, Events and Unfolds, Unfolded Events and unfold in the programming model section)

State, Snapshots, Events and Unfolds

In an Event Sourced system, we typically distinguish between the following basic elements

  • Events - Domain Events representing actual real world events that have occurred, reflecting the domain as understood by domain experts - see Event Storming. The customer favorited the item, the customer saved SKU Y for later, $200 got charged with transaction id X.

  • State - derived representations established from Events. A given set of code in an environment will, in service of some decision making process, interpret those events as implying a particular state in a model. If we change the code slightly or add a field, you wouldn't necessarily expect a version of your code from a year ago to generate you equivalent state that you can simply blast into your object model and go. (But you could easily hold a copy in memory as long as your process runs)

  • Snapshots - A snapshot is an intentionally roundtrippable version of a State, which can be saved and restored. Typically one would do this to save the cost of loading all the Events in a long running sequence of Events to re-establish the State. The EventStore folks have a great walkthrough on Rolling Snapshots.

  • Projections - the term projection is heavily overloaded, meaning anything from the proceeds of a SELECT statement, the result of a map operation, an EventStore projection, an event being propagated via Kafka (no, further examples are not required).

.... and:

  • Unfolds - the term unfold is based on the FP function of that name, bearing the signature 'state -> 'event seq. When using Equinox.Cosmos, the unfold produces projections, represented as _event_s to snapshot the state at a position in the stream.

Generating and saving unfolded events

Periodically, along with writing the events that a decision function yields to represent the implications of a command given the present state, we also unfold the resulting state' and supply those to the sync function too. The unfold function takes the state and projects one or more snapshot-events which can be used to reestablish the same state we have thus far derived from watching the events on the stream. Unlike normal events, unfolded events do not get replicated to other systems, and can also be thrown away at will (we also compress them rather than storing them as fully expanded json).

Reading from the Storage Model

Most reads request tip with anIfNoneMatch precondition citing the `etag it bore when we last saw it, which, when combined with a cache means one of the following happens when a reader is trying to establish the state of a stream prior to processing a Command:

  • NotModified (depending on workload, can be the dominant case) - for 1 RU, minimal latency and close-to-0 network bandwidth, we know the present state
  • NotFound (there's nothing in the stream) - for equivalently low cost, we know the state is initial
  • Found - (if there are multiple writers and/or we don't have a cached version) - for the minimal possible cost (a point read, not a query), we have all we need to establish the state:-
    i: a version number
    e: events since that version number
    u: unfolded auxiliary events computed at the same time as the batch of events was sent (aka projections/snapshots) - (these enable us to establish the state without further queries or roundtrips to load and fold all preceding events)

Building a state from the Storage Model and/or the Cache

Given a stream with:

{ id:0, i:0, e: [{c:c1, d:d1}]},
{ id:1, i:1, e: [{c:c2, d:d2}]}, 
{ id:2, i:2, e: [{c:c2, d:d3}]}, 
{ id:3, i:3, e: [{c:c1, d:d4}]}, 
{ id:-1,
  i:4,
  e: [{i:4, c:c3, d:d5}],
  u: [{i:4, c:s1, d:s5Compressed}, {i:3, c:s2, d:s4Compressed}],
  _etag: "etagXYZ"
}  

If we have state4 based on the events up to {i:3, c:c1, d: d4} and the index document, we can produce the state by folding in a variety of ways:

  • fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ] (but would need a query to load the first 2 batches, with associated RUs and roundtrips)
  • fold state4 [ C3 d5 ] (only need to pay to transport the tip document as a point read)
  • (if isStart (S1 s5) = true): fold initial [S1 s5] (point read + transport + decompress s5)
  • (if isStart (S2 s4) = true): fold initial [S2 s4; C3 d5] (only need to pay to transport the tip document as a point read and decompress s4 and s5)

If we have state3 based on the events up to {i:3, c:c1, d: d4}, we can produce the state by folding in a variety of ways:

  • fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ] (but query, roundtrips)
  • fold state3 [C1 d4 C3 d5] (only pay for point read+transport)
  • fold initial [S2 s4; C3 d5] (only pay for point read+transport)
  • (if isStart (S1 s5) = true): fold initial [S1 s5] (point read + transport + decompress s5)
  • (if isStart (S2 s4) = true): fold initial [S2 s4; C3 d5] (only need to pay to transport the tip document as a point read and decompress s4 and s5)

If we have state5 based on the events up to C3 d5, and (being the writer, or a recent reader), have the etag: etagXYZ, we can do a HTTP GET with etag: IfNoneMatch etagXYZ, which will return 302 Not Modified with < 1K of data, and a charge of 1.00 RU allowing us to derive the state as:

  • state5

Programming model

In F#, the Equinox programming model involves, per aggregation of events on a given category of stream:

  • 'state: the state required to support the decision or query being supported (not serializable or stored; can be held in a .NET MemoryCache)
  • initial: 'state: the implied state of an empty stream
  • 'event: a discriminated union representing all the possible Events from which a state be evolved (see e and u in the data model). Typically the mapping of the json to an 'event case is driven by a UnionContractEncoder
  • fold : 'state -> 'event seq -> 'state: function used to fold events (real ones and/or unfolded ones) into the running 'state
  • evolve: state -> 'event -> 'state - the folder function from which fold is built, representing the application of the delta the 'event implies for the model to the state
  • decide: 'state -> 'command -> event' list: responsible for (in an idempotent manner) interpreting a command in the context of a state as the events that should be written to the stream to record the decision

When using the Equinox.Cosmos adapter, one will typically implement two further functions in order to avoid having to have every 'event in the stream having to be loaded and processed in order to build the 'state (versus a single cheap point read from CosmosDb to read the tip):

  • unfold: 'state -> 'event seq: function used to render events representing the state which facilitate quickly re-establishing a state without needing to go back to the first event that occurred on a stream
  • isStart: 'event -> bool: predicate indicating whether a given 'event is sufficient as a starting point e.g.

High level Command Processing flow

When running a decision process, we thus have the following stages:

  1. establish a known 'state (based on a given position in the stream of Events)
  2. let the decide function look at the request/command and yield a set of events (or none) that represent the effect of that decision in terms of events
  3. update the stream _contingent on the stream still being in the same State/Position it was in step 1
    3a. if there is no conflict (nobody else decided anything since we decided what we'd do), append the events to the stream (record the new position and etag)
    3b. if there is a conflict, take the conflicting events that other writers have produced since step 1, fold them into our state, and go back to 2 (the CosmosDb stored procedure sends them back immediately at zero cost or latency)
  4. if it makes sense for our scenario, hold the state, position and etag in our cache. When a reader comes along, do a point-read of the tip and jump straight to step 2 if nothing has been modified.

Sync stored procedure high level flow (see source)

The sync stored procedure takes a document as input which is almost identical to the format of the tip batch (in fact, if the stream is found to be empty, it forms the template for the first document created in the stream). The request includes the following elements:

  • expectedVesion: the position the requestor is basing their proposed batch of events on (no, an etag would not be relevant)
  • e: array of Events (see Event, above) to append if the expectedVersion check is fulfilled
  • u: array of unfolded events which supersede items with equivalent case values (aka snapshots, projectiosn)
  • maxEvents: the maximum number of events to record in an individual batch. For example:
    • if e contains 2 events, the tip document's e has 2 documents and the maxEvents is 5, the events get merged into the tip
    • if maxEvents is 1, the tip gets frozen as a Batch, and the new request becomes the tip (as an atomic transaction on the server side)
  • (PROPOSAL/FUTURE) thirdPartyUnfoldRetention: how many events to keep before the base (i) of the batch if required by lagging unfolds which would otherwise fall out of scope as a result of the appends in this batch (this will default to 0, so for example if a writer says maxEvents 10 and there is an unfold based on an event more than 10 old it will be removed as part of the appending process)

Example

The following example is a minimal version of the Favorites model, with shortcuts for brevity (yes, and imperfect performance characteristics):

(* Event schemas *)

type Item = { id: int; name: string; added: DateTimeOffset } 
type Event =
    | Added of Item
    | Removed of itemId
    | Compacted of items: Item[]

(* State types *)

type State = Item list

let contains state id = state |> List.exists (fun x -> x.id=id)

(* Folding functions to build state from events *)

let evolve state event =
    match event with
    | Compacted items -> List.ofArray items
    | Added item -> item :: state
    | Removed id -> List.filter (not (contains state id)) 
let fold state events = Seq.fold evolve state events 

(* Decision Processing *)

type Command =
    | Add item
    | Remove itemId: int

let decide command state =
    match command with
    | Add (id, name, date) ->
        if contains id then [] else [Added {id=id; name=name; date=date}]
    | Remove id -> 
        if contains id then [Removed id] else []

(* Equinox.Cosmos Unfold Functions to allow loading without queries *)

let unfold state =
    [Event.Compacted state]
let isOrigin = function
    | Compacted _ -> true
    | _ -> false
@voronoipotato
Copy link
Contributor

Maybe this should go in the wiki with a WIP label, it looks quite useful.

@bartelink
Copy link
Collaborator Author

bartelink commented Nov 29, 2018

Good point - in fact, the thought crossed my mind just this morning (reason I made it an Issue is that some form of this needs to go in the README too). But the perfect shouldn't be the enemy of the good, so I'm on it...

@bartelink
Copy link
Collaborator Author

@voronoipotato
Copy link
Contributor

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants