Skip to content

Latest commit

 

History

History
executable file
·
2492 lines (1952 loc) · 148 KB

DOCUMENTATION.md

File metadata and controls

executable file
·
2492 lines (1952 loc) · 148 KB

Documentation

Please refer to the FAQ, README.md, the Ideas and the Issues for background info on what's outstanding (aside from there being lots of room for more and better docs).

Background reading

While there is no canonical book on doing event-sourced Domain Modelling, there are some must read books that stand alone as timeless books in general but are also specifically relevant to the considerations involved in building such systems:

Domain Driven Design, Eric Evans, 2003 aka 'The Blue Book'; not a leisurely read but timeless and continues to be authoritative

Domain Modelling Made Functional, Scott Wlaschin, 2018; extremely well edited traversal of Domain Modelling in F# which is a pleasure to read. Does not talk specifically about event sourcing, but is still very relevant.

Implementing Domain Driven Design, Vaughn Vernon, 2013; aka 'The Red Book'. Worked examples and a slightly different take on DDD (the appendix on Functional Event sourcing is not far from what we have around these parts; which is not surprising given there's input from Jérémie Chassaing)

Functional Event Sourcing Decider, Jérémie Chassaing, 2021. Precursor to an excellent book covering this space more broadly. There's teasers with extensive code walk-through and discussion in this 2h45m video on Event Driven Information Systems

  • Your link here - Please add notable materials that helped you on your journey here via PRs!

Overview

The following diagrams are based on the style defined in @simonbrowndotje's C4 model, rendered using @skleanthous's PlantUmlSkin. It's highly recommended to view the talk linked from c4model.com. See README.md acknowledgments section

Context Diagram

Equinox and Propulsion together provide a loosely related set of libraries that you can leverage in an application as you see fit. These diagrams are intended to give a rough orientation; what you actually build is up to you...

Equinox focuses on the Consistent Processing elements of building an event-sourced system, offering tailored components that interact with a specific Consistent Event Store, as laid out here in this C4 System Context Diagram:

Equinox c4model.com Context Diagram

☝️ Propulsion elements (which we consider External to Equinox) support the building of complementary facilities as part of an overall Application:

  • Ingesters: read stuff from outside the Bounded Context of the System. This kind of service covers aspects such as feeding reference data into Read Models, ingesting changes into a consistent model via Consistent Processing. These services are not acting in reaction to events emanating from the Consistent Event Store, as opposed to...
  • Publishers: react to events as they are arrive from the Consistent Event Store by filtering, rendering and producing to feeds for downstreams. While these services may in some cases rely on synchronous queries via Consistent Processing, it's never transacting or driving follow-on work; which brings us to...
  • Reactors: drive reactive actions triggered by either upstream feeds, or events observed in the Consistent Event Store. These services handle anything beyond the duties of Ingesters or Publishers, and will often drive follow-on processing via Process Managers and/or transacting via Consistent Processing. In some cases, a reactor app's function may be to progressively compose a notification for a Publisher to eventually publish.

Container diagram

The Systems and Components involved break out roughly like this:

Equinox c4model.com Container Diagram

Equinox.MemoryStore

Equinox encourages sticking with Test Pyramid principles: focus on unit testing things by default (based on calling interpret/decide, initial and fold from the Aggregate module)

However, the Equinox MemoryStore package can also be relevant as part of your overall testing strategy. The aims are to:

  • provide a mechanism where one can provide an empty and/or specifically prepared set of streams initialized in ways that make sense for your test suite
  • allow one to test with fully configured Service types if necessary
  • enable one to test flows or scenarios (e.g. Process Managers) crossing multiple Service types
  • allow one to validate the above logic works well independent of the effects of any of the stores
  • allow one to reduce reliance on mechanisms such as the CosmosDB simulator

NOTE: MemoryStore is a complement to testing with a real store - it's absolutely not a substitute for testing how your app really performs with your load against your actual store

A primary supported pattern is to be able to be able to define a test suite and then run the suite with the right store for the context - e.g.:

  • for unit tests, you might opt to run some important scenarios with a MemoryStore
  • for integration tests, you might run lots of iterations of a Property Based Test against a memory store, and a reduced number of iterations of the same test against your concrete store
  • for acceptance Tests, you'll likely primarily focus on using your concrete store

Container Diagram for Equinox.MemoryStore

This diagram shows the high level building blocks used in constructing an integration test using Equinox.MemoryStore

Equinox.MemoryStore c4model.com Container Diagram

Component Diagram for Equinox.MemoryStore

This breaks down the components involved internally with the layout above in terms of the actual structures involved:

Equinox.MemoryStore c4model.com Component Diagram

Equinox.EventStore / Equinox.SqlStreamStore

From the point of view of Equinox, SqlStreamStore and EventStore have a lot in common in terms of how Equinox interacts with them. For this reason, it's safe to treat them as equivalent for the purposes of this overview.

Component Diagram for Equinox.EventStore / Equinox.SqlStreamStore

Equinox.EventStore/SqlStreamStore c4model.com Component Diagram

Code Diagrams for Equinox.EventStore / Equinox.SqlStreamStore

This diagram walks through the basic sequence of operations, where:

  • this node has not yet read this stream (i.e. there's nothing in the Cache)
  • when we do read it, it's empty (no events):

Equinox.EventStore/SqlStreamStore c4model.com Code - first Time

Next, we extend the scenario to show:

  • how the State held in the Cache influences the EventStore/SqlStreamStore APIs used
  • how writes are managed:
    • when there's no conflict
    • when there's conflict and we're retrying (handle WrongExpectedVersionException, read the conflicting, loop using those)
    • when there's conflict and we're giving up (throw MaxAttemptsExceededException; no need to read the conflicting events)

Equinox.EventStore/SqlStreamStore c4model.com Code - with cache, snapshotting

After the write, we circle back to illustrate the effect of the caching when we have correct state

Equinox.EventStore/SqlStreamStore c4model.com Code - next time; same process, i.e. cached

In other processes (when a cache is not fully in sync), the sequence runs slightly differently:

Equinox.EventStore/SqlStreamStore c4model.com Code - another process; using snapshotting

Equinox.CosmosStore

Container Diagram for Equinox.CosmosStore

Equinox.CosmosStore c4model.com Container Diagram

Component Diagram for Equinox.CosmosStore

Equinox.CosmosStore c4model.com Component Diagram

Code Diagrams for Equinox.CosmosStore

This diagram walks through the basic sequence of operations, where:

  • this node has not yet read this stream (i.e. there's nothing in the Cache)
  • when we do read it, the Read call returns 404 (with a charge of 1 RU)

Equinox.CosmosStore c4model.com Code - first Time

Next, we extend the scenario to show:

  • how state held in the Cache influences the Cosmos APIs used
  • How reads work when a snapshot is held within the Tip
  • How reads work when the state is built form the events via a Query
  • how writes are managed:
    • when there's no conflict (Sync stored procedure returns no conflicting events)
    • when there's conflict and we're retrying (re-run the decision the conflicting events the call to Sync yielded)
    • when there's conflict and we're giving up (throw MaxAttemptsExceededException)

Equinox.CosmosStore c4model.com Code - with cache, snapshotting

After the write, we circle back to illustrate the effect of the caching when we have correct state (we get a 304 Not Modified and pay only 1 RU)

Equinox.CosmosStore c4model.com Code - next time; same process, i.e. cached

In other processes (when a cache is not fully in sync), the sequence runs slightly differently:

  • we read the Tip document, and can work from that snapshot
  • the same Loading Fallback sequence shown in the initial read will take place if no suitable snapshot that passes the isOrigin predicate is found within the Tip

Equinox.CosmosStore c4model.com Code - another process; using snapshotting

Glossary

Event Sourcing is easier and harder than you think. This document is not a tutorial, and you can and will make a mess on your first forays. This glossary attempts to map terminology from established documentation outside to terms used in this documentation.

Event-sourcing

Term Description
Aggregate Boundary within which a set of Invariants are to be preserved across a set of related Entities and Value Objects
Append Add Events reflecting a Decision to a Stream, contingent on an Optimistic Concurrency Check
Bounded Context Doman Driven Design term for a cohesive set of application functionality. Events should not pass directly between BCs (see Ingestion, Publishing)
Command Arguments supplied to one of a Stream's Decision functions; may result in Events being Appended
CQRS Command/Query Responsibility Segregation: Architectural principle critical to understand (but not necessarily slavishly follow) when building an Event Sourced System
Decision Application logic function representing the mapping of an Aggregate State together with arguments reflecting a Command. Yields a response and/or Events to Append to the Stream in order to manifest the intent implied; the rules it considers in doing so are in effect the Invariants of the Aggregate
Event Details representing the facts of something that has occurred, or a Decision that was made with regard to an Aggregate state as represented in a Stream
Eventually Consistent A Read Model can momentarily lag a Stream's current State as events are being Reacted to
Fold FP Term used to describe process of building State from the sequence of Events observed on a Stream
Idempotent Multiple executions have the same net effect; can safely be processed >1 time without adverse effects
Ingestion The act of importing and reconciling data from external systems into the Models and/or Read Models of a given Bounded Context
Invariants Rules that an Aggregate's Fold and Decision process work to uphold
Optimistic Concurrency Check (non-) Locking/transaction mechanism used to ensure that Appends to a Stream maintain the Aggregate's Invariants in the presence of multiple concurrent writers
Projection Umbrella term for the production, emission or synchronization of models or outputs from the Events being Appended to Streams. Lots of ways of skinning this cat, see Reactions, Read Model, Query, Synchronous Query, Publishing
Publishing Selectively rendering Rich Events for a downstream consumer to Ingest into an external Read Model (as opposed to Replication)
Query Eventually Consistent read from a Read Model managed via Projections. See also Synchronous Query
Reactions Work carried out as a Projection that drives ripple effects, including maintaining Read Models to support Queries or carrying out a chain of activities that conclude in the Publishing of Rich Events
Read Models Denormalized data maintained inside a Bounded Context as Reactions, honoring CQRS. As opposed to: Replication, Synchronous Query, Publishing
Replication Rendering Events as an unfiltered feed in order to facilitate generic comnsumption/syncing. Can be a useful tool to scale or decouple Publishing / Reactions from a Store's feed; BUT: can just as easily be abused to be functionally equivalent to Database Integration -- maintaining a Read Model as a Reaction and/or deliberately Publishing Rich Events is preferred
Rich Events Messages deliberately emitted from a Bounded Context (as opposed to Replication) via Publishing
State Information inferred from traching the sequence of Events on a Stream in support of Decision (and/or Synchronous Queries)
Store Holds Events for a Bounded Context as ordered Streams
Stream Ordered sequence of Events in a Store
Synchronous Query Consistent read direct from Stream State (breaking CQRS and coupling implementation to the State used to support the Decision process). See CQRS, Query, Reactions

CosmosDB

Term Description
Change Feed set of query patterns enabling the running of continuous queries reading Items (documents) in a Range (physical partition) in order of their last update
Change Feed Processor Library from Microsoft exposing facilities to Project from a Change Feed, maintaining Offsets per Range of the Monitored Container in a Lease Container
Container logical space in a CosmosDB holding [loosely] related Items (aka Documents). Items bear logical Partition Keys. Formerly Collection. Can be allocated Request Units.
CosmosDB Microsoft Azure's managed document database system
Database Group of Containers. Can be allocated Request Units.
DocumentDb Original offering of CosmosDB, now entitled the SQL Query Model, Microsoft.Azure.DocumentDb.Client[.Core]
Document id Identifier used to load a document (Item) directly as a point read without a Query
Lease Container Container (separate from the Monitored Container to avoid feedback effects) that maintains a set of Offsets per Range, together with leases reflecting instances of the Change Feed Processors and their Range assignments (aka aux container)
Partition Key Logical key identifying a Stream (a Range is a set of logical partitions identified by such keys). A Logical Partition is limited to a max of 10GB (as is a Range)
Projector Process running a [set of] Change Feed Processors across the Ranges of a Monitored Container
Query Using indices to walk a set of relevant items in a Container, yielding Items (documents). Normally confined to a single Partition Key (unless one enters into the parallel universe of cross-partition queries)
Range Physical Partition managing a subset of the Partition Key space of a Container (based on hashing) consisting of colocated data running as an individual CosmosDB node. Can be split as part of scaling up the RUs allocated to a Container. Typically limited to a maximum capacity of 10 GB.
Replay The ability to re-run the processing of the Change Feed from the oldest Item (document) forward at will
Request Units Pre-provisioned Virtual units representing used to govern the per-second capacity of a Range's query processor (while they are assigned at Container or Database level, the load shedding / rate limiting takes effect at the Range level)
Request Charge Number of Request Units charged for a specific action, apportioned against the RU cacity of the relevant Range for that second
Stored Procedure JavaScript code stored in a Container that (repeatedly) maps an input request to a set of actions to be transacted as a group within CosmosDB. Incurs equivalent Request Charges for work performed; can chain to a continuation internally after a read or write. Limited to 5 seconds of processing time per action.

DynamoDB

Term Description
Table Defined storage area in DynamoDB, defining a schema and (optionally), a Streams configuration. (There's no notion of a Database)
Streams Buffer used to record information about all changes with a 24h retention window
Transactions Feature allowing up to 100 atomic updates across multiple tables and logical partitions. Doubles the RU cost of a write
DynamoStore Index Custom implementation (in Propulsion) that indexes the DynamoDB Streams output to enable traversing all the events in the store akin to how the CosmosDB ChangeFeed enables that
Export A Table can be exported in full to an S3 bucket as a set of json files containing all items

EventStore

Term Description
Category Group of Streams bearing a common prefix {category}-{streamId} (events are indexed into $ec-{Category} by system projections)
Event json or blob payload, together with an Event Type name representing an Event
EventStore Open source Event Sourcing-optimized data store server and programming model with powerful integrated projection facilities
Rolling Snapshot Event written to an EventStore stream in order to ensure minimal store roundtrips when there is a Cache miss
Stream Core abstraction presented by the API - an ordered sequence of Events
WrongExpectedVersion Low level exception thrown to convey the occurence of an Optimistic Concurrency Violation

Equinox

Term Description
Cache System.Net.MemoryCache or equivalent holding State and/or etag information for a Stream with a view to reducing roundtrips, latency and/or Request Charges
Unfolds Snapshot information, stored in an appropriate storage location (not as a Stream's actual Events), but represented as Events, to minimize Queries and the attendant Request Charges when there is a Cache miss
Version When a decision function is invoked, it's presented with a State derived from the Stream's Events and/or Unfolds up to a given position. If the newest event has Index 9 (or it was loaded from an Unfold with i=9) the Version is 10

Programming Model

NB this has lots of room for improvement, having started as a placeholder in #50; improvements are absolutely welcome, as this is intended for an audience with diverse levels of familiarity with event sourcing in general, and Equinox in particular.

Aggregate Module

All the code handling any given Aggregate’s Invariants, Commands and Synchronous Queries should be encapsulated within a single module. It's highly recommended to use the following canonical skeleton layout:

module Aggregate

(* StreamName section *)

let [<Literal>] Category = "category"
let streamId = Equinox.StreamId.gen Id.toString

(* Optionally, Helpers/Types *)

// NOTE - these types and the union case names reflect the actual storage
//        formats and hence need to be versioned with care
[<RequiredQualifiedAccess>]
module Events =

    type Event =
        | ...
    // optionally: `encode`, `tryDecode` (only if you're doing manual decoding)
    let codec = FsCodec ... Codec.Create<Event>(...)

Some notes about the intents being satisfied here:

  • types and cases in Events cannot be used without prefixing with Events. - while it can make sense to assert in terms of them in tests, in general sibling code in adjacent modules should not be using them directly (in general interaction should be via the type Service)
module Fold =

    type State =
    let initial : State = ...
    let evolve state = function
        | Events.X -> (state update)
        | Events.Y -> (state update)
    let fold events = Seq.fold evolve events

    (* Storage Model helpers *)

    let isOrigin : Events.Event = function
       | Events.Snapshotted -> true
       | _ -> false
    let snapshot (state : State) : Event =
       Events.Snapshotted { ... }

let interpretX ... (state : Fold.State) : Events list = ...

type Decision =
    | Accepted
    | Failed of Reason

let decideY ... (state : Fold.State) : Decision * Events list = ...
  • interpret, decide and related input and output types / interfaces are public and top-level for use in unit tests (often unit tests will open the module Fold to use initial and fold)
type Service internal (resolve : Id -> Equinox.Decider<Events.Event, Fold.State) = ...`

    member _.Execute(id, command) : Async<unit> =
        let decider = resolve id
        decider.Transact(interpretX command)

    member _.Decide(id, inputs) : Async<Decision> =
        let decider = resolve id
        decider.Transact(decideX inputs)

let create category = Service(streamId >> Equinox.Decider.resolve Serilog.Log.Logger category)
  • Service's constructor is internal; create is the main way in which one wires things up (using either a concrete store or a MemoryStore) - there should not be a need to have it implement an interface and/or go down mocking rabbit holes.

While not all sections are omnipresent, significant thought and discussion has gone into arriving at this layout. Having everything laid out consistently is a big win, so customizing your layout / grouping is something to avoid doing until you have at least 3 representative aggregates of your own implemented.

Storage Binding Module

Over the top of the Aggregate Module structure, one then binds this to a concrete storage subsystem. For example:

Depending on how you structure your app, you may opt to maintain such module either within the module Aggregate, or somewhere outside closer to the Composition Root.

module EventStore =
    let accessStrategy =
        Equinox.EventStore.AccessStrategy.RollingSnapshots (Fold.isOrigin, Fold.snapshot)
    let create (context, cache) =
        let cacheStrategy =
            Equinox.EventStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
        let cat =
            Equinox.EventStore.EventStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
        create cat

module Cosmos =
    let accessStrategy =
        Equinox.CosmosStore.AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot)
    let create (context, cache) =
        let cacheStrategy =
            Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
        let cat =
            Equinox.CosmosStore.CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
        create cat

MemoryStore Storage Binding Module

For integration testing higher level functionality in Application Services (straddling multiple Domain Services and/or layering behavior over them), you can use the MemoryStore in the context of your tests:

module MemoryStore =
    let create (store : Equinox.MemoryStore.VolatileStore) =
        let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial)
        create cat

Typically that binding module can live with your test helpers rather than making your Domain Assemblies depend on it.

Core concepts

In F#, independent of the Store being used, the Equinox programming model involves (largely by convention, see FAQ), per aggregation of events on a given category of stream:

  • Category: the common part of the Stream Name, i.e., the "Favorites" part of the "Favorites-clientId"

  • streamId: function responsible for mapping from the input elements that define the Aggregate's identity to the streamId portion of the {categoryName}-{streamId} StreamName that's used within the concrete store. In general, the inputs should be strongly typed ids

  • 'event: a discriminated union representing all the possible Events from which a state can be evolved (see events and unfolds in the Storage Model). Typically the mapping of the json to an 'event case is driven by a UnionContractEncoder

  • 'state: the rolling state maintained to enable Decisions or Queries to be made given a command and/or other context (not expected to be serializable or stored directly in a Store; can be held in a .NET MemoryCache)

  • initial: 'state: the [implied] state of an empty stream. See also Null Object Pattern, Identity element

  • fold : 'state -> 'event seq -> 'state: function used to fold one or more loaded (or proposed) events (real ones and/or unfolded ones) into a given running persistent data structure of type 'state

  • (evolve: state -> 'event -> 'state - the folder function from which fold is built, representing the application of a single delta that the 'event implies for the model to the state. Note: evolve is an implementation detail of a given Aggregate; fold is the function used in tests and used to parameterize the Category's storage configuration.. Sometimes named apply)

  • interpret: (context/command etc ->) 'state -> event' list or decide : (context/command etc ->) 'state -> 'result*'event list: responsible for Deciding (in an idempotent manner) how the intention represented by context/command should be mapped with regard to the provided state in terms of: a) the 'events that should be written to the stream to record the decision b) (for the 'result in the decide signature) any response to be returned to the invoker (NB returning a result likely represents a violation of the CQS and/or CQRS principles, see Synchronous Query in the Glossary)

When using a Store with support for synchronous unfolds and/or snapshots, one will typically implement two further functions in order to avoid having every 'event in the stream be loaded and processed in order to build the 'state per Decision or Query (versus a single cheap point read from CosmosDB to read the tip):

  • isOrigin: 'event -> bool: predicate indicating whether a given 'event is sufficient as a starting point i.e., provides sufficient information for the evolve function to yield a correct state without any preceding event being supplied to the evolve/fold functions

  • unfold: 'state -> 'event seq: function used to render events representing the 'state which facilitate short circuiting the building of state, i.e., isOrigin should be able to yield true when presented with this 'event. (in some cases, the store implementation will provide a custom AccessStrategy where the unfold function should only produce a single event; where this is the case, typically this is referred to as toSnapshot : 'state -> 'event).

Decision Flow

When running a decision process, we have the following stages:

  1. establish a known 'state ( as at a given Position in the stream of Events)

  2. present the request/command and the state to the interpret function in order to determine appropriate events (can be many, or none) that represent the decision in terms of events

  3. append to the stream, contingent on the stream still being in the same State/Position it was in step 1: a. if there is no conflict (nobody else decided anything since we decided what we'd do given that command and state), append the events to the stream (retaining the updated position and etag)

    b. if there is a conflict, obtain the conflicting events [that other writers have produced] since the Position used in step 1, fold them into our state, and go back to 2 (aside: the CosmosDB stored procedure can send them back immediately at zero cost or latency, and there is a proposal for EventStore to afford the same facility)

  4. [if it makes sense for our scenario], hold the state, position and etag in our cache. When a Decision or Synchronous Query is needed, do a point-read of the tip and jump straight to step 2 if nothing has been modified.

See Cosmos Storage Model for a more detailed discussion of the role of the Sync Stored Procedure in step 3

Canonical example Aggregate + Service

The following example is a minimal version of the Favorites model, with shortcuts for brevity, that implements all the relevant functions above:

(* Event stream naming + schemas *)

let [<Literal>] Category = "Favorites"
let streamId = Equinox.StreamId.gen ClientId.toString

type Item = { id: int; name: string; added: DateTimeOffset }
type Event =
    | Added of Item
    | Removed of itemId: int
    | Snapshotted of items: Item[]

(* State types/helpers *)

type State = Item list  // NB IRL don't mix Event and State types
let is id x = x.id = id

(* Folding functions to build state from events *)

let evolve state = function
    | Snapshotted items -> List.ofArray items
    | Added item -> item :: state
    | Removed id -> state |> List.filter (is id)
let fold state = Seq.fold evolve state

(*
 * Decision Processing to translate a Command's intent to Events that would
 * Make It So
 *)

type Command =
    | Add of Item  // IRL do not use Event records in Command types
    | Remove of itemId: int

let interpret command state =
    let has id = state |> List.exits (is id)
    match command with
    | Add item -> if has item.id then [] else [Added item]
    | Remove id -> if has id then [Removed id] else []

(*
 * Optional: Snapshot/Unfold-related functions to allow establish state
 * efficiently, without having to read/fold all Events in a Stream
 *)

let toSnapshot state = [Event.Snapshotted (Array.ofList state)]

(*
 * The Service defines operations in business terms, neutral to any concrete
 * store selection or implementation supplied only a `resolve` function that can
 * be used to map from ids (as supplied to the `streamId` function) to an
 * Equinox.Decider; Typically the service should be a stateless Singleton
 *)

type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =

    let execute clientId command : Async<unit> =
        let decider = resolve clientId
        decider.Transact(interpret command)
        
    let read clientId : Async<string list> =
        let decider = resolve clientId
        decider.Query id

    member _.Execute(clientId, command) =
        execute clientId command
    member _.Favorite(clientId, skus) =
        execute clientId (Command.Favorite(DateTimeOffset.Now, skus))
    member _.Unfavorite(clientId, skus) =
        execute clientId (Command.Unfavorite skus)
    member _.List clientId : Async<Events.Favorited []> =
        read clientId

let create resolve : Service =
    Service(streamId >> resolve Category)

Equinox API Usage Guide

The most terse walkthrough of what's involved in using Equinox to do a Synchronous Query and/or Execute a Decision process is in the Programming Model section. In this section we’ll walk through how one implements common usage patterns using the Equinox Decider API in more detail.

ES, CQRS, Event-driven architectures etc

There are a plethora of basics underlying Event Sourcing and its cousin, the CQRS architectural style. There are also myriad ways of arranging event driven processing across a system.

The goal of CQRS is to enable representation of the same data using multiple models. It’s not a panacea, and most definitely not a top level architecture, but you should at least be considering whether it’s relevant in your context. There are various ways of applying the general CQRS concept as part of an architecture.

Applying Equinox in an Event-sourced architecture

There are many trade-offs to be considered along the journey from an initial proof of concept implementation to a working system and evolving that over time to a successful and maintainable model. There is no official magical combination of CQRS and ES that is always correct, so it’s important to look at the following information as a map of all the facilities available - it’s certainly not a checklist; not all achievements must be unlocked.

At a high level we have:

  • Aggregates - a set of information (Entities and Value Objects) within which Invariants need to be maintained, leading us us to logically group them in order to consider them when making a Decision
  • Events - Events that have been accepted into the model as part of a Transaction represent the basic Facts from which State or Projections can be derived
  • Commands - taking intent implied by an upstream request or other impetus (e.g., automated synchronization based on an upstream data source) driving a need to sync a system’s model to reflect that implied need (while upholding the Aggregate's Invariants). The Decision process is responsible proposing Events to be appended to the Stream representing the relevant events in the timeline of that Aggregate.
  • State - the State at any point is inferred by folding the events in order; this state typically feeds into the Decision with the goal of ensuring Idempotent handling (if its a retry and/or the desired state already pertained, typically one would expect the decision to be "no-op, no Events to emit")
  • Projections - while most State we'll fold from the events will be dictated by what we need (in addition to a Command's arguments) to be able to make the Decision implied by the command, the same Events that are necessary for Command Handling to be able to uphold the Invariants can also be used as the basis for various summaries at the single aggregate level, Rich Events exposed as notification feeds at the boundary of the Bounded Context, and/or as denormalized representations forming a Materialized View.
  • Queries - as part of the processing, one might wish to expose the state before or after the Decision and/or a computation based on that to the caller as a result. In its simplest form (just reading the value and emitting it without any potential Decision/Command even applying), such a Synchronous Query is a gross violation of CQRS - reads should ideally be served from a Read Model

Programming Model walkthrough

Flows and Deciders

Equinox’s Command Handling consists of < 200 lines including interfaces and comments in https://github.com/jet/equinox/tree/master/src/Equinox - the elements you'll touch in a normal application are:

  • module Impl - internal implementation of Optimistic Concurrency Control / retry loop used by Decider. It's recommended to at least scan this file as it defines the Transaction semantics that are central to Equinox and the overall Decider concept.
  • type Decider - surface API one uses to Transact or Query against a specific stream's state
  • type LoadOption Discriminated Union - used to specify optimization overrides to be applied when a Decider's Query or Transact operations establishes the state of the stream

Its recommended to read the examples in conjunction with perusing the code in order to see the relatively simple implementations that underlie the abstractions; the 2 files can tell many of the thousands of words about to follow!

Decider Members

type Equinox.Decider(...) =
StoreIntegration
    // Run interpret function with present state, retrying with Optimistic Concurrency
    member _.Transact(interpret : State -> Event list) : Async<unit>

    // Run decide function with present state, retrying with Optimistic Concurrency, yielding Result on exit
    member _.Transact(decide : State -> Result*Event list) : Async<Result>

    // Runs a Null Flow that simply yields a `projection` of `Context.State`
    member _.Query(projection : State -> View) : Async<View>

Favorites walkthrough

In this section, we’ll use possibly the simplest toy example: an unbounded list of items a user has 'favorited' (starred) in an e-commerce system.

See samples/Tutorial/Favorites.fsx. It’s recommended to load this in Visual Studio and feed it into the F# Interactive REPL to observe it step by step. Here, we'll skip some steps and annotate some aspects with regard to trade-offs that should be considered.

Events + initial(+evolve)+fold

type Event =
    | Added of string
    | Removed of string

let initial : string list = []
let evolve state = function
    | Added sku -> sku :: state
    | Removed sku -> state |> List.filter (fun x -> x <> sku)
let fold s xs = Seq.fold evolve s xs

Events are represented as an F# Discriminated Union; see the article on the UnionContractEncoder for information about how that's typically applied to map to/from an Event Type/Case in an underlying Event storage system.

The evolve function is responsible for computing the post-State that should result from taking a given State and incorporating the effect that single Event implies in that context and yielding that result without mutating either input.

While the evolve function operates on a state and a single event, fold (named for the standard FP operation of that name) walks a chain of events, propagating the running state into each evolve invocation. It is the fold operation that's typically used a) in tests and b) when passing a function to an Equinox Resolver to manage the behavior.

It should also be called out that Events represent Facts about things that have happened - an evolve or fold should not throw Exceptions or log. There should be absolutely minimal conditional logic.

In order to fulfill the without mutating either input constraint, typically fold and evolve either deep clone to a new mutable structure with space for the new events, or use a [persistent/immutable/incremental data structure, such as F#'s list] [https://en.wikipedia.org/wiki/Persistent_data_structure,]. The reason this is necessary is that the result from fold can also be used for one of the following reasons:

  • computing a 'proposed' state that never materializes due to a failure to save and/or an Optimistic Concurrency failure
  • the store can sometimes take a state from the cache and folding in different events when the conflicting events are supplied after having been loaded for the retry in the loop
  • concurrent executions against the stream may be taking place in parallel within the same process; this is permitted, Equinox makes no attempt to constrain the behavior in such a case

Commands + interpret

type Command =
    | Add of string
    | Remove of string
let interpret command state =
    match command with
    | Add sku -> if state |> List.contains sku then [] else [Added sku]
    | Remove sku -> if state |> List.contains sku |> not then [] else [Removed sku]

Command handling should almost invariably be implemented in an Idempotent fashion. In some cases, a blind append can arguably be an OK way to do this, especially if one is doing simple add/removes that are not problematic if repeated or reordered. However it should be noted that:

  • each write roundtrip (i.e. each Transact), and ripple effects resulting from all subscriptions having to process an event are not free either. As the cache can be used to validate whether an Event is actually necessary in the first instance, it's highly recommended to follow the convention as above and return an empty Event list in the case of a Command not needing to trigger events to move the model toward it's intended end-state

  • understanding the reasons for each event typically yields a more correct model and/or test suite, which pays off in more understandable code

  • under load, retries frequently bunch up, and being able to dedupicate them without hitting the store and causing a conflict can significantly reduce feedback effects that cause inordinate impacts on stability at the worst possible time

It should also be noted that, when executing a Command, the interpret function is expected to behave statelessly; as with fold, multiple concurrent calls within the same process can occur._

A final consideration to mention is that, even when correct idempotent handling is in place, two writers can still produce conflicting events. In this instance, the Transact loop's Optimistic Concurrency control will cause the 'loser' to re-interpret the Command with an updated state [incorporating the conflicting events the other writer (thread/process/machine) produced] as context

Decider usage

let [<Literal>] Category = "Favorites"
let streamId = Equinox.StreamId.gen ClientId.toString

type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =

    let execute clientId command : Async<unit> =
        let decider = resolve clientId
        decider.Transact(interpret command)

    let read clientId : Async<string list> =
        let decider = resolve clientId
        decider.Query id

let create resolve = Service(streamId >> resolve Category)

Read above will do a roundtrip to the Store in order to fetch the most recent state (in AnyCachedValue or AllowStale modes, the store roundtrip can be optimized out by reading through the cache). This Synchronous Read can be used to Read-your-writes to establish a state incorporating the effects of any Command invocation you know to have been completed.

Execute runs an Optimistic Concurrency Controlled Transact loop in order to effect the intent of the [write-only] Command. This involves:

  1. establish state
  2. use interpret to determine what (if any) Events need to be appended
  3. submit the Events, if any, for appending
  4. retrying b+c where there's a conflict (i.e., the version of the stream that pertained in (a) has been superseded)
  5. after maxAttempts / 3 retries, a MaxResyncsExhaustedException is thrown, and an upstream can retry as necessary (depending on SLAs, a timeout may further constrain the number of retries that can occur)

Aside from reading the various documentation regarding the concepts underlying CQRS, it's important to consider that (unless you're trying to leverage the Read-your-writes guarantee), doing reads direct from an event-sourced store is generally not considered a best practice (the opposite, in fact). Any state you surface to a caller is by definition out of date the millisecond you obtain it, so in many cases a caller might as well use an eventually-consistent version of the same state obtained via a [n eventually-consistent] Projection (see terms above).

All that said, if you're in a situation where your cache hit ratio is going to be high and/or you have reason to believe the underlying Event-Streams are not going to be long, pretty good performance can be achieved nonetheless; just consider that taking this shortcut will impede scaling and, at worst, can result in you ending up with a model that's potentially both:

  • overly simplistic - you're passing up the opportunity to provide a Read Model that directly models the requirement by providing a Materialized View

  • unnecessarily complex - the increased complexity of the fold function and/or any output from unfold (and its storage cost) is a drag on one's ability to read, test, extend and generally maintain the Command Handling/Decision logic that can only live on the write side

Service Members

    member _.Favorite(clientId, sku) =
        execute clientId (Add sku)
    member _.Unfavorite(clientId, skus) =
        execute clientId (Remove skus)
    member _.List clientId : Async<string list> =
        read clientId
  • while the decision processing logic in the Service type can arguably be extracted into a standalone type (called something like Handler) in order to separate the stream logic from the business function being accomplished, it's become clear in the course of writing tutorials and explaining verbally that the extra concept count is not justified. This can be further exacerbated by the need to hover in an IDE and/or add type annotations in order to understand what types are flowing about.

  • while the Command pattern can help clarify a high level flow, there's no substitute for representing actual business functions as well-named methods representing specific behaviors that are meaningful in the context of the application's Ubiquitous Language, can be documented and tested.

  • the resolve parameter affords one a sufficient seam that facilitates testing independently with MemoryStore (which does necessitate a reference to a separate Assembly] as desired.

Todo[Backend] walkthrough

See the TodoBackend.com sample for reference info regarding this sample, and the .fsx file from where this code is copied. Note that the bulk if the design of the events stems from the nature of the TodoBackend spec; there are many aspects of the implementation that constitute less than ideal design; please note the provisos below...

Events

module Events =
    type Todo = { id: int; order: int; title: string; completed: bool }
    type Event =
        | Added     of Todo
        | Updated   of Todo
        | Deleted   of int
        | Cleared
        | Snapshotted of Todo[]

The fact that we have a Cleared Event stems from the fact that the spec defines such an operation. While one could implement this by emitting a Deleted event per currently existing item, there many reasons to do model this as a first class event:

  1. Events should reflect user intent in its most direct form possible; if the user clicked Delete All, it's not the same to implement that as a set of individual deletes that happen to be united by having timestamp with a very low number of ms of each other.
  2. Because the Cleared Event establishes a known State, one can have the isOrigin flag the event as being the furthest one needs to search backwards before starting to fold events to establish the state. This also prevents the fact that the stream gets long in terms of numbers of events from impacting the efficiency of the processing
  3. While having a Cleared event happens to work, it also represents a technical trick in a toy domain and should not be considered some cure-all Pattern - real Todo apps don't have a 'declare bankruptcy' function. And example alternate approaches might be to represent each Todo list as it's own stream, and then have a TodoLists aggregate coordinating those.

The Snapshotted event is used to represent Rolling Snapshots (stored in-stream) and/or Unfolds (stored in Tip document-Item); For a real Todo list, using this facility may well make sense - the State can fit in a reasonable space, and the likely number of Events may reach an interesting enough count to justify applying such a feature:

  1. it should also be noted that Caching may be a better answer - note Snapshotted is also an isOrigin event - there's no need to go back any further if you meet one.
  2. we use an Array in preference to a [F#] list; while there are ListConverters out there (notably not in FsCodec), in this case an Array is better from a GC and memory-efficiency stance, and does not need any special consideration when using Newtonsoft.Json to serialize.

State + initial + evolve/fold

type State = { items : Todo list; nextId : int }
let initial = { items = []; nextId = 0 }
let evolve s = function
    | Added item -> { s with items = item :: s.items; nextId = s.nextId + 1 }
    | Updated value -> { s with items = s.items |> List.map (function { id = id } when id = value.id -> value | item -> item) }
    | Deleted id -> { s with items = s.items |> List.filter (fun x -> x.id <> id) }
    | Cleared -> { s with items = [] }
    | Snapshotted items -> { s with items = List.ofArray items }
let fold : State -> Events.Event seq -> State = Seq.fold evolve
let isOrigin = function Cleared | Snapshotted _ -> true | _ -> false
let snapshot state = Snapshotted (Array.ofList state.items)
  • for State we use records and lists as the state needs to be a Persistent data structure.
  • in general the evolve function is straightforward idiomatic F# - while there are plenty ways to improve the efficiency (primarily by implementing fold using mutable state), in reality this would reduce the legibility and malleability of the code.

Commands + interpret

type Command = Add of Todo | Update of Todo | Delete of id: int | Clear
let interpret c (state : State) =
    match c with
    | Add value -> [Added { value with id = state.nextId }]
    | Update value ->
        match state.items |> List.tryFind (function { id = id } -> id = value.id) with
        | Some current when current <> value -> [Updated value]
        | _ -> []
    | Delete id -> if state.items |> List.exists (fun x -> x.id = id) then [Deleted id] else []
    | Clear -> if state.items |> List.isEmpty then [] else [Cleared]
  • Note Add does not adhere to the normal idempotency constraint, being unconditional. If the spec provided an id or token to deduplicate requests, we'd track that in the fold and use it to rule out duplicate requests.

  • For Update, we can lean on structural equality in when current <> value to cleanly rule out redundant updates

  • The current implementation is 'good enough' but there's always room to argue for adding more features. For Clear, we could maintain a flag about whether we've just seen a clear, or have a request identifier to deduplicate, rather than risk omitting a chance to mark the stream clear and hence leverage the isOrigin aspect of having the event.

Service

type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =

    let execute clientId command : Async<unit> =
        let decider = resolve clientId
        decider.Transact(interpret command)
    let handle clientId command : Async<Todo list> =
        let decider = resolve clientId
        decider.Transact(fun state ->
            let events = interpret command state
            let state' = fold state events
            state'.items,events)
    let query clientId (projection : State -> T) : Async<T> =
        let decider = resolve clientId
        decider.Query projection

    member _.List clientId : Async<Todo seq> =
        query clientId (fun s -> s.items |> Seq.ofList)
    member _.TryGet(clientId, id) =
        query clientId (fun x -> x.items |> List.tryFind (fun x -> x.id = id))
    member _.Execute(clientId, command) : Async<unit> =
        execute clientId command
    member _.Create(clientId, template: Todo) : Async<Todo> = async {
        let! updated = handle clientId (Command.Add template)
        return List.head updated }
    member _.Patch(clientId, item: Todo) : Async<Todo> = async {
        let! updated = handle clientId (Command.Update item)
        return List.find (fun x -> x.id = item.id) updated }
  • handle represents a command processing flow where we (idempotently) apply a command, but then also emit the state to the caller, as dictated by the needs of the call as specified in the TodoBackend spec. We use the fold function to compute the post-state, and then project from that, along with the (pending) events as computed.

  • While we could theoretically use Projections to service queries from an eventually consistent Read Model, this is not in alignment with the Read-you-writes expectation embodied in the tests (i.e. it would not pass the tests), and, more importantly, would not work correctly as a backend for the app. Because we have more than one query required, we make a generic query method, even though a specific read method (as in the Favorite example) might make sense to expose too

  • The main conclusion to be drawn from the Favorites and TodoBackend Service implementations's use of Decider Methods is that, while there can be commonality in terms of the sorts of transactions one might encapsulate in this manner, there's also It Depends factors; for instance:

    1. the design doesnt provide complete idempotency and/or follow the CQRS style
    2. the fact that this is a toy system with lots of artificial constraints and/or simplifications when compared to aspects that might present in a more complete implementation.
  • the streamId helper (and optional Match Active Patterns) provide succinct ways to map an incoming clientId (which is not a string in the real implementation but instead an id using FSharp.UMX in an unobtrusive manner.

Queries

Queries are handled by Equinox.Deciders' Query function.

A query projects a value from the 'state of an Aggregate. Queries should be used sparingly, as loading and folding the events each time is against the general principle of Command Query Responsibility Segregation (CQRS). A query should not simply expose the 'state of an aggregate, as this will inevitably lead to the leaking of decision logic outside of the Aggregate's module.

// Query function exposing part of the state
member _.ReadAddress(clientId) =
    let decider = resolve clientId
    decider.Query(fun state -> state.address)

// Return the entire state we hold for this aggregate (NOTE: generally not a good idea)
member _.Read(clientId) =
    let decider = resolve clientId
    decider.Query id

Command+Decision Handling functions

Commands or Decisions are handled via Equinox.Decider's Transact method

Commands (interpret signature)

The normal command pattern involves taking the execution context (e.g., the principal on behalf of which the processing is happening), a command (with relevant parameters) reflecting the intent and the present 'state of the Aggregate into account and mapping that to one or more Events that represent that intent as a decision on the stream.

In this case, the Decision Process is interpreting the Command in the context of a 'state.

The function signature is: let interpret (context, command, args) state : Events.Event list

Note the 'state is the last parameter; it's computed and supplied by the Equinox Flow.

If the interpret function does not yield any events, there will be no trip to the store them.

A command may be rejected by throwing from within the interpret function.

Note that emitting an event dictates that the processing may be rerun should a conflicting write have taken place since the loading of the state

let interpret (context, command) state : Events.Event list =
    match tryCommand context command state with
    | None ->
        [] // not relevant / already in effect
    | Some eventDetails -> // accepted, mapped to event details record
        [Event.HandledCommand eventDetails]

type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>)

    // Given the supplied context, apply the command for the specified clientId
    member _.Execute(clientId, context, command) : Async<unit> =
        let decider = resolve clientId
        decider.Transact(fun state -> interpretCommand (context, command) state)

    // Given the supplied context, apply the command for the specified clientId
    // Throws if this client's data is marked Read Only
    member _.Execute(clientId, context, command) : Async<unit> =
        let decider = resolve clientId
        decider.Transact(fun state ->
            if state.isReadOnly then raise AccessDeniedException() // Mapped to 403 externally
            interpretCommand (context, command) state)

Decisions (Transacting Commands that also emit a result using the decide signature)

In some cases, depending on the domain in question, it may be appropriate to record some details of the request (that are represented as Events that become Facts), even if the 'command' is logically ignored. In such cases, the necessary function is a hybrid of a projection and the preceding interpret signature: you're both potentially emitting events and yielding an outcome or projecting some of the 'state'.

In this case, the signature is: let decide (context, command, args) state : 'result * Events.Event list

Note that the return value is a tuple of `('result,Event list):

  • the fst element is returned from decider.Transact
  • the snd element of the tuple represents the events (if any) that should represent the state change implied by the request.with

Note if the decision function yields events, and a conflict is detected, the flow may result in the decide function being rerun with the conflicting state until either no events are emitted, or there were on further conflicting writes supplied by competing writers.

let decide (context, command) state : int * Events.Event list =
   // ... if `snd` contains event, they are written
   // `fst` (an `int` in this instance) is returned as the outcome to the caller

type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =

    // Given the supplied context, attempt to apply the command for the specified clientId
    // NOTE Try will return the `fst` of the tuple that `decide` returned
    // If >1 attempt was necessary (e.g., due to conflicting events), the `fst`
    // from the last attempt is the outcome
    member _.Try(clientId, context, command) : Async<int> =
        let decider = resolve clientId
        decider.Transact(fun state ->
            decide (context, command) state)

DOs

  • Identify Invariants you're seeking to maintain. Events are ordered and updates consistency checked for this reason; it'll also be an important part of how you test things.

  • In general, you want to default to separating reads from writes for ease of understanding, testing, maintenance and scaling (see CQRS)

  • Any command's processing should take into account the current 'state of the aggregate, interpreting the state in an idempotent manner; applying the same Command twice should result in no events being written when the same logical request is made the second time.

DONTs

  • Write blindly: blind writes (ignoring idempotence principles) is normally a design smell

  • Mixing Commands and Queries - in general, the read and write paths should be separated as much as possible (see CQRS)

Testing interpret functions

The canonical interpret and decide signatures above make unit testing possible without imposing the use of any support libraries or DSLs.

Given an opening state and an interpret command, you can validate the handling is idempotent as follows:

let fold, initial = Aggregate.Fold.fold, Aggregate.Fold.initial
// Alternately: open Aggregate.Fold

let validateInterpret contextAndOrArgsAndOrCommand state =
    let events = interpret contextAndOrArgsAndOrCommand state
    // TODO assert/match against the events to validate correct events
    //      considering the contextAndOrArgsAndOrCommand
    let state' = fold state events
    // TODO assert the events, when `fold`ed, yield the correct successor state
    //      (in general, prefer asserting against `events` than `state'`)
    state'

// Validate handling is idempotent in nature
let validateIdempotent contextAndOrArgsAndOrCommand state' =
    let events' = interpret contextAndOrArgsAndOrCommand state'
    match events' with
    | [] -> ()
    // TODO add clauses to validate edge cases that should still generate events on a re-run
    | xs -> failwithf "Not idempotent; Generated %A in response to %A" xs contextAndOrArgsAndOrCommand

Example FsCheck.xUnit test to validate command is always valid given the Aggregate's initial state:

let [<Property>] properties contextAndOrArgsAndOrCommand =
    let state' = validateInterpret contextAndOrArgsAndOrCommand initial
    validateIdempotent contextAndOrArgsAndOrCommand state'

With xUnit TheoryData

type InterpretCases() as this =
    inherit TheoryData()
    do this.Add( case1 )
    do this.Add( case2 )

let [<Theory; ClassData(nameof(InterpretCases)>] examples args =
    let state' = validateInterpret contextAndOrArgsAndOrCommand initial
    validateIdempotent contextAndOrArgsAndOrCommand state'

Handling sequences of Commands as a single transaction

In some cases, a Command is logically composed of separable actions against the aggregate. It's advisable in general to represent each aspect of the processing in terms of the above interpret function signature. This allows that aspect of the behavior to be unit tested cleanly. The overall chain of processing can then be represented as a composed method which can then summarize the overall transaction.

Idiomatic approach - composed method based on side-effect free functions

There's an example of such a case in the Cart's Domain Service:

let interpretMany fold interpreters (state : 'state) : 'state * 'event list =
    ((state,[]),interpreters)
    ||> Seq.fold (fun (state : 'state, acc : 'event list) interpret ->
        let events = interpret state
        let state' = fold state events
        state', acc @ events)

type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.State>) =

    member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
        let decider = resolve cartId
        let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
        decider.Transact(fun state -> async {
            match prepare with None -> () | Some prep -> do! prep
            return interpretMany Fold.fold (Seq.map interpret commands) state }, opt)

Alternate approach - composing with an Accumulator encapsulating the folding

NOTE: This is an example of an alternate approach provided as a counterpoint - there's no need to read it as the preceding approach is the recommended one is advised as a default strategy to use

As illustrated in Cart's Domain Service, an alternate approach is to encapsulate the folding (Equinox in V1 exposed an interface that encouraged such patterns; this was removed in two steps, as code written using the idiomatic approach is intrinsically simpler, even if it seems not as Easy at first)

/// Maintains a rolling folded State while Accumulating Events pended as part
/// of a decision flow
type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originState : 'state) =
    let accumulated = ResizeArray<'event>()

    /// The Events that have thus far been pended via the `decide` functions
    /// `Execute`/`Decide`d during the course of this flow
    member _.Accumulated : 'event list =
        accumulated |> List.ofSeq

    /// The current folded State, based on the Stream's `originState` + any
    /// events that have been Accumulated during the the decision flow
    member _.State : 'state =
        accumulated |> fold originState

    /// Invoke a decision function, gathering the events (if any) that it
    /// decides are necessary into the `Accumulated` sequence
    member x.Transact(interpret : 'state -> 'event list) : unit =
        interpret x.State |> accumulated.AddRange
    /// Invoke an Async decision function, gathering the events (if any) that
    /// it decides are necessary into the `Accumulated` sequence
    member x.Transact(interpret : 'state -> Async<'event list>) : Async<unit> = async {
        let! events = interpret x.State
        accumulated.AddRange events }
    /// Invoke a decision function, while also propagating a result yielded as
    /// the fst of an (result, events) pair
    member x.Transact(decide : 'state -> 'result * 'event list) : 'result =
        let result, newEvents = decide x.State
        accumulated.AddRange newEvents
        result
    /// Invoke a decision function, while also propagating a result yielded as
    /// the fst of an (result, events) pair
    member x.Transact(decide : 'state -> Async<'result * 'event list>) : Async<'result> = async {
        let! result, newEvents = decide x.State
        accumulated.AddRange newEvents
        return result }

type Service ... =
    member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
        let decider = resolve cartId
        let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
        decider.Transact(fun state -> async {
            match prepare with None -> () | Some prep -> do! prep
            let acc = Accumulator(Fold.fold, state)
            for cmd in commands do
                acc.Transact(interpret cmd)
            return acc.State, acc.Accumulated
        }, opt)

Equinox Architectural Overview

There are virtually unlimited ways to build an event-sourced model. It's critical that, for any set of components to be useful, that they are designed in a manner where one combines small elements to compose a whole, versus trying to provide a hardwired end-to-end 'framework'.

While going the library route leaves plenty seams needing to be tied together at the point of consumption (with resulting unavoidable complexity), it's unavoidable if one is to provide a system that can work in the real world.

This section outlines key concerns that the Equinox Programming Model is specifically taking a view on, and those that it is going to particular ends to leave open.

Concerns leading to need for a programming model

F#, out of the box has a very relevant feature set for building Domain models in an event sourced fashion (DUs, persistent data structures, total matching, list comprehensions, async builders etc). However, there are still lots of ways to split the process of folding the events, encoding them, deciding events to produce etc.

In the general case, it doesnt really matter what way one opts to model the events, folding and decision processing.

However, given one has a specific store (or set of stores) in mind for the events, a number of key aspects need to be taken into consideration:

  1. Coding/encoding events - Per aggregate or system, there is commonality in how one might wish to encode and/or deal with versioning of event representations. Per store, the most efficient way to bridge to that concern can vary. Per domain and encoding system, the degree to which one wants to unit or integration test this codec process will vary.

  2. Caching - Per store, there are different tradeoffs/benefits for Caching. Per system, caching may or may not even make sense. For some stores, it makes sense to integrate caching into the primary storage.

  3. Snapshotting - The store and/or the business need may provide a strong influence on whether or not (and how) one might employ a snapshotting mechanism.

Store-specific concerns mapping to the programming model

This section enumerates key concerns feeding into how Stores in general, and specific concrete Stores bind to the Programming Model:

EventStore

TL;DR caching not really needed, storing snapshots has many considerations in play, projections built in

Overview: EventStore is a mature and complete system, explicitly designed to address key aspects of building an event-sourced system. There are myriad bindings for multiple languages and various programming models. The docs present various ways to do snapshotting. The projection system provides ways in which to manage snapshotting, projections, building read models etc.

Key aspects relevant to the Equinox programming model:

  • In general, EventStore provides excellent caching and performance characteristics intrinsically by virtue of its design

  • Projections can be managed by either tailing streams (including the synthetic $all stream) or using the Projections facility - there's no obvious reason to wrap it, aside from being able to uniformly target CosmosDB (i.e. one could build an Equinox.EventStore.Projection library and an eqx project stats es with very little code).

  • In general event streams should be considered append only, with no mutations or deletes

  • For snapshotting, one can either maintain a separate stream with a maximum count or TTL rule, or include faux Compaction events in the normal streams (to make it possible to combine reading of events and a snapshot in a single roundtrip). The latter is presently implemented in Equinox.EventStore

  • While there is no generic querying facility, the APIs are designed in such a manner that it's generally possible to achieve any typically useful event access pattern needed in an optimal fashion (projections, catchup subscriptions, backward reads, caching)

  • While EventStore allows either json or binary data, its generally accepted that json (presented as UTF-8 byte arrays) is a good default for reasons of interoperability (the projections facility also strongly implies json)

Azure CosmosDB concerns

TL;DR caching can optimize RU consumption significantly. Due to the intrinsic ability to mutate easily, the potential to integrate rolling snapshots into core storage is clear. Providing ways to cache and snapshot matter a lot on CosmosDB, as lowest-common-denominator queries loading lots of events cost in performance and cash. The specifics of how you use the changefeed matters more than one might thing from the CosmosDB high level docs.

Overview: CosmosDB has been in production for >5 years and is a mature Document database. The initial DocumentDb offering is at this point a mere projected programming model atop a generic Document data store. Its changefeed mechanism affords a base upon which one can manage projections, but there is no directly provided mechanism that lends itself to building Projections that map directly to EventStore's facilities in this regard (i.e., there is nowhere to maintain consumer offsets in the store itself).

Key aspects relevant to the Equinox programming model:

  • CosmosDB has pervasive optimization feedback per call in the form of a Request Charge attached to each and every action. Working to optimize one's request charges per scenario is critical both in terms of the effect it has on the amount of Request Units/s one you need to pre-provision (which translates directly to costs on your bill), and then live predictably within if one is not to be throttled with 429 responses. In general, the request charging structure can be considered a very strong mechanical sympathy feedback signal

  • Point reads of single documents based on their identifier are charged as 1 RU plus a price per KB and are optimal. Queries, even ones returning that same single document, have significant overhead and hence are to be avoided

  • One key mechanism CosmosDB provides to allow one to work efficiently is that any point-read request where one supplies a valid etag is charged at 1 RU, regardless of the size one would be transferring in the case of a cache miss (the other key benefit of using this is that it avoids unnecessarily clogging of the bandwidth, and optimal latencies due to no unnecessary data transfers)

  • Indexing things surfaces in terms of increased request charges; at scale, each indexing hence needs to be justified

  • Similarly to EventStore, the default ARS encoding CosmosDB provides, together with interoperability concerns, means that straight json makes sense as an encoding form for events (UTF-8 arrays)

  • Collectively, the above implies (arguably counter-intuitively) that using the powerful generic querying facility that CosmosDB provides should actually be a last resort.

  • See Cosmos Storage Model for further information on the specific encoding used, informed by these concerns.

  • Because reads, writes and updates of items in the Tip document are charged based on the size of the item in units of 1KB, it's worth compressing and/or storing snapshots outside of the Tip-document (while those factors are also a concern with EventStore, the key difference is their direct effect of charges in this case).

The implications of how the changefeed mechanism works also have implications for how events and snapshots should be encoded and/or stored:

  • Each write results in a potential cost per changefeed consumer, hence one should minimize changefeed consumers count

  • Each update of a document can have the same effect in terms of Request Charges incurred in tracking the changefeed (each write results in a document "moving to the tail" in the consumption order - if multiple writes occur within a polling period, you'll only see the last one)

  • The ChangeFeedProcessor presents a programming model which needs to maintain a position. Typically one should store that in an auxiliary collection in order to avoid feedback and/or interaction between the changefeed and those writes

It can be useful to consider keeping snapshots in the auxiliary collection employed by the changefeed in order to optimize the interrelated concerns of not reading data redundantly, and not feeding back into the oneself (although having separate roundtrips obviously has implications).

Equinox.CosmosStore CosmosDB Storage Model

This article provides a walkthrough of how Equinox.CosmosStore encodes, writes and reads records from a stream under its control.

The code (see source) contains lots of comments and is intended to be read - this just provides some background.

Batches

Events are stored in immutable batches consisting of:

  • partitionKey: string // stream identifier, e.g. "Cart-{guid}"
  • index: int64 // base index position of this batch (0 for first event in a stream)
  • nextIndex: int64 // base index ('i') position value of the next record in the stream - NB this always corresponds to i+e.length (in the case of the Tip record, there won't actually be such a record yet)
  • id: string // same as i (CosmosDB forces every item (document) to have one[, and it must be a string])
  • events: Event[] // (see next section) typically there is one item in the array (can be many if events are small, for RU and performance/efficiency reasons; RU charges are per 1024 byte block)
  • ts // CosmosDB-intrinsic last updated date for this record (changes when replicated etc, hence see t below)

Events

Per Event, we have the following:

  • case - the case of this union in the Discriminated Union of Events this stream bears (aka Event Type)
  • data - json data (CosmosDB maintains it as actual json; you are free to index it and/or query based on that if desired)
  • metadata - carries ancillary information for an event; also json
  • t - creation timestamp

Tip [Batch]

The tip is always readable via a point-read, as the id has a fixed, well-known value: "-1"). It uses the same base layout as the aforementioned Batch (Tip isa Batch), adding the following:

  • id: always -1 so one can reference it in a point-read GET request and not pay the cost and latency associated with a full indexed query
  • _etag: CosmosDB-managed field updated per-touch (facilitates NotModified result, see below)
  • u: Array of _unfold_ed events based on a point-in-time state (see State, Snapshots, Events and Unfolds, Unfolded Events and unfold in the programming model section). Not indexed. While the data is json, the actual data and metadata fields are compressed and encoded as base64 (and hence can not be queried in any reasonable manner).

State, Snapshots, Events and Unfolds

In an Event Sourced system, we typically distinguish between the following basic elements

  • Events - Domain Events representing real world events that have occurred (always past-tense; it's happened and is not up for debate), reflecting the domain as understood by domain experts - see Event Storming. Examples: The customer favorited the item, the customer add SKU Y to their saved for later list, A charge of $200 was submitted successfully with transaction id X.

  • State - derived representations established from Events. A given set of code in an environment will, in service of some decision making process, interpret the Events as implying particular state changes in a model. If we change the code slightly or add a field, you wouldn't necessarily expect a version of your code from a year ago to generate you equivalent state that you can simply blast into your object model and go. (But you can easily and safely hold a copy in memory as long as your process runs as this presents no such interoperability or versioning concerns). State is not necessarily always serializable, nor should it be.

  • Snapshots - A snapshot is an intentionally roundtrippable version of a State, that can be saved and restored. Typically one would do this to save the (latency, roundtrips, RUs, deserialization and folding) cost of loading all the Events in a long running sequence of Events to re-establish the State. The EventStore folks have a great walkthrough on Rolling Snapshots.

  • Projections - the term projection is heavily overloaded, meaning anything from the proceeds of a SELECT statement, the result of a map operation, an EventStore projection to an event being propagated via Kafka (no, further examples are not required!).

.... and:

  • Unfolds - the term unfold is based on the well known 'standard' FP function of that name, bearing the signature 'state -> 'event seq. => For Equinox.CosmosStore, one might say unfold yields projection s as event s to snapshot the state as at that position in the stream.

Generating and saving unfolded events

Periodically, along with writing the events that a decision function yields to represent the implications of a command given the present state, we also unfold the resulting state' and supply those to the Sync function too. The unfold function takes the state and projects one or more snapshot-events that can be used to re-establish a state equivalent to that we have thus far derived from watching the events on the stream. Unlike normal events, unfolded events do not get replicated to other systems, and can also be jetisonned at will (we also compress them rather than storing them as fully expanded json).

NB, in the present implementation, unfolds are generated, transmitted and updated upon every write; this makes no difference from a Request Charge perspective, but is clearly suboptimal due to the extra computational effort and network bandwidth consumption. This will likely be optimized by exposing controls on the frequency at which unfolds are triggered

Reading from the Storage Model

The dominant pattern is that reads request Tip with an IfNoneMatch precondition citing the etag it bore when we last saw it. That, when combined with a cache means one of the following happens when a reader is trying to establish the state of a stream prior to processing a Command:

  • NotModified (depending on workload, can be the dominant case) - for 1 RU, minimal latency and close-to-0 network bandwidth, we know the present state

  • NotFound (there's nothing in the stream) - for equivalently low cost (1 RU), we know the state is initial

  • Found - (if there are multiple writers and/or we don't have a cached version) - for the minimal possible cost (a point read, not a query), we have all we need to establish the state:

    • i: a version number
    • e: events since that version number
    • u: unfolded (auxiliary) events computed at the same time as the batch of events was sent (aka informally as snapshots) - (these enable us to establish the state without further queries or roundtrips to load and fold all preceding events)

Building a state from the Storage Model and/or the Cache

Given a stream with:

[
    { "id":0, "i":0, "e": [{"c":"c1", "d":"d1"}]},
    { "id":1, "i":1, "e": [{"c":"c2", "d":"d2"}]},
    { "id":2, "i":2, "e": [{"c":"c2", "d":"d3"}]},
    { "id":3, "i":3, "e": [{"c":"c1", "d":"d4"}]},
    { "id":-1,
        "i": 4,
        "e": [{"i":4, "c":"c3", "d":"d5"}],
        "u": [{"i":4, "c":"s1", "d":"s5Compressed"}, {"i":3, "c":"s2", "d":"s4Compressed"}],
        "_etag": "etagXYZ"
    }
]

If we have state4 based on the events up to {i:3, c:c1, d: d4} and the Tip Item-document, we can produce the state by folding in a variety of ways:

  • fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ] (but would need a query to load the first 2 batches, with associated RUs and roundtrips)
  • fold state4 [ C3 d5 ] (only need to pay to transport the tip document as a point read)
  • (if isOrigin (S1 s5) = true): fold initial [S1 s5] (point read + transport + decompress s5)
  • (if isOrigin (S2 s4) = true): fold initial [S2 s4; C3 d5] (only need to pay to transport the tip document as a point read and decompress s4 and s5)

If we have state3 based on the events up to {i:3, c:c1, d: d4}, we can produce the state by folding in a variety of ways:

  • fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ] (but query, round-trips)
  • fold state3 [C1 d4 C3 d5] (only pay for point read+transport)
  • fold initial [S2 s4; C3 d5] (only pay for point read+transport)
  • (if isOrigin (S1 s5) = true): fold initial [S1 s5] (point read + transport + decompress s5)
  • (if isOrigin (S2 s4) = true): fold initial [S2 s4; C3 d5] (only need to pay to transport the tip document as a point read and decompress s4 and s5)

If we have state5 based on the events up to C3 d5, and (being the writer, or a recent reader), have the etag: etagXYZ, we can do a HTTP GET with etag: IfNoneMatch etagXYZ, which will return 304 Not Modified with < 1K of data, and a charge of 1.00 RU allowing us to derive the state as:

  • state5

See Programming Model for what happens in the application based on the events presented.

Sync stored procedure

This covers the V3 implementation of the JS Stored Procedure (see source) does when presented with a batch to be written.

The sync stored procedure takes as input, a document that is almost identical to the format of the Tip batch (in fact, if the stream is found to be empty, it is pretty much the template for the first document created in the stream). The request includes the following elements:

  • expectedVersion: the position the requester has based their [proposed] events on (no, providing an etag to save on Request Charges is not possible in the Stored Proc)

  • the expectedEtag enables competing writers to maintain and update unfold data in a consistent fashion (backing off and retrying in the case of conflict, without any events being written per state change) (See AccessStrategy.RollingState, AccessStrategy.Custom)

  • e: array of Events (see Event, above) to append if, and only if, the expectedVersion check is fulfilled

  • u: array of unfolded events (aka snapshots) that supersede items with equivalent case values

  • maxEventsInTip: the maximum number of events permitted to be retained in the Tip (subject to that not exceeding the maxStringifyLen rule). For example:

    • if e contains 2 events, the tip document's e has 2 events and the maxEventsInTip is 5, the events get appended onto the tip's events
    • if the total length including the new events would exceed maxEventsInTip, the Tip is 'renamed' (gets its id set to i.toString()) to become a batch (with the new events included in that calved Batch), and the new Tip has a zero-length events array as a Batch, and a set of unfolds (as an atomic transaction on the server side)
  • maxStringifyLen: secondary constraint on the events retained in the tip (in addition to maxEventsInTip constraint) - constrains the maximum length of the events being buffered in the Tip by applying a size limit in characters (as computed via JSON.stringify(events).length)

  • (PROPOSAL/FUTURE) thirdPartyUnfoldRetention: how many events to keep before the base (i) of the batch if required by lagging unfolds which would otherwise fall out of scope as a result of the appends in this batch (this will default to 0, so for example if a writer says maxEvents 10 and there is an unfold based on an event more than 10 old it will be removed as part of the appending process)

Equinox.CosmosStore.Core.Events

The Equinox.CosmosStore.Core namespace provides a lower level API that can be used to manipulate events stored within a Azure CosmosDB using optimized native access patterns.

The higher level APIs (i.e. not Core), as demonstrated by the dotnet new templates are recommended to be used in the general case, as they provide the following key benefits:

  • Domain logic is store-agnostic, leaving it easy to:

    • Unit Test in isolation (verifying decisions produce correct events)
    • Integration test using the MemoryStore, where relevant
  • Decouples encoding/decoding of events from the decision process of what events to write (means your Domain layer does not couple to a specific storage layer or encoding mechanism)

  • Enables efficient caching and/or snapshotting (providing Equinox with fold, initial, isOrigin, unfold and a codec allows it to manage this efficiently)

  • Provides Optimistic Concurrency Control with retries in the case of conflicting events

Example Code

open Equinox.CosmosStore.Core
// open MyCodecs.Json // example of using specific codec which can yield UTF-8
                      // byte arrays from a type using `Json.toBytes` via Fleece
                      // or similar

type EventData with
    static member FromT eventType value =
        EventData.FromUtf8Bytes(eventType, Json.toBytes value)

// Load connection string from your Key Vault (example here is the CosmosDB
// simulator's well known key)
// see https://github.com/jet/equinox-provisioning-cosmosdb
let connectionString : string =
    "AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"

// Forward to Log (you can use `Log.Logger` and/or `Log.ForContext` if your app
// uses Serilog already)
let outputLog = LoggerConfiguration().WriteTo.NLog().CreateLogger()
// Serilog has a `ForContext<T>()`, but if you are using a `module` for the
// wiring, you might create a tagged logger like this:
let gatewayLog =
    outputLog.ForContext(Serilog.Core.Constants.SourceContextPropertyName, "Equinox")

let discovery = Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION")
let connector : Equinox.CosmosStore.CosmosStoreConnector =
    CosmosStoreConnector(
        discovery,
        requestTimeout = TimeSpan.FromSeconds 5.,
        maxRetryAttemptsOnRateLimitedRequests = 1,
        maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds 3.)

// If storing in a single collection, one specifies the db and collection when using Connect()
// alternately use factory.CreateUnitialized, which defers that until the stream one is writing to becomes clear
let! storeClient = CosmosStoreClient.Connect(connector.CreateAndInitialize, "databaseName", "containerName")

let ctx = EventsContext(storeClient, gatewayLog)

//
// Write an event
//

let expectedSequenceNumber = 0 // new stream
let streamName, eventType, eventJson = "stream-1", "myEvent", Request.ToJson event
let eventData = EventData.fromT(eventType, eventJson) |> Array.singleton

let! res =
    Events.append
        ctx
        streamName 
        expectedSequenceNumber 
        eventData
match res with
| AppendResult.Ok -> ()
| c -> failwithf "conflict %A" c

Access Strategies

An Access Strategy defines any optimizations regarding how one arrives at a State of an Aggregate based on the Events stored in a Stream in a Store.

The specifics of an Access Strategy depend on what makes sense for a given Store, i.e. Equinox.CosmosStore necessarily has a significantly different set of strategies than Equinox.EventStore (although there is an intersection).

Access Strategies only affect performance; you should still be able to infer the state of the aggregate based on the fold of all the events ever written on top of an initial state

NOTE: its not important to select a strategy until you've actually actually modelled your aggregate, see what if I change my access strategy

Equinox.CosmosStore.AccessStrategy

TL;DR Equinox.CosmosStore: (see also: the storage model for a deep dive, and glossary, below the table for definition of terms)

  • keeps all the Events for a Stream in a single CosmosDB logical partition

  • Transaction guarantees are provided at the logical partition level only. As for most typical Event Stores, the mechanism is based on Optimistic Concurrency Control. There's no holding of a lock involved - it's based on conveying your premise alongside with the proposed change; In terms of what we are doing, you observe a state, propose events, and the store is responsible for applying the change, or rejecting it if the state turns out to longer be the case when you get around to syncing the change. The change is rejected if your premise (things have not changed since I saw them) is invalidated (whereupon you loop, working from the updated state).

  • always has a special 'index' document (we term it the Tip document), per logical partition/stream that is accessible via an efficient point read (it always has a CosmosDB id value of "-1")

  • Events are stored in Batches in immutable documents in the logical partition, the Tip document is the only document that's ever updated (it's always updated, as it happens...).

  • As all writes touch the Tip, we have a natural way to invalidate any cached State for a given Stream; we retain the _etag of the Tip document, and updates (or consistent reads) are contingent on it not having changed.

  • The (optimistic) concurrency control of updates is by virtue of the fact that every update to the Tip touches the document and thus alters (invalidates) the _etag value. This means that, in contrast to how many SQL based stores (and most CosmosDB based ones) implement concurrency control, we don't rely on primary key constraint to prevent two writers writing conflicting events to the same stream.

  • A secondary benefit of not basing consistency control on a primary key constraint or equivalent, is that we no longer having to insert an Event every time we are updating something. (This fact is crucial for the RollingState and Custom strategies).

  • The interpret/decide function is expected to deduplicate writes by not producing events if the state implies such updates would be redundant. Equinox does not have any internal mechanism to deduplicate events, thus having correct deduplication is the key to reducing round-trips and hence minimizing RU consumption (and the adverse effects that the retry cycles due to contention have, which will most likely arise when load is at its highest).

  • The unfolds maintained in Tip have the bodies (the d and m fields) 1) deflated 2) base64 encoded (as everyone is reading the Tip, its worthwhile having the writer take on the burden of compressing, with the payback being that write amplification effects are reduced by readers paying less RUs to read them). The snapshots can be inspected securely via the eqx tool's dump facility, or unsecurely online via the decode button on jgraph's drawio-tools at https://jgraph.github.io/drawio-tools/tools/convert.html, if the data is not sensitive.

  • The Tip document, (and the fact we hold its _etag in our cache alongside the State we have derived from the Events), is at the heart of why consistent reads are guaranteed to be efficient (Equinox does the read of the Tip document contingent on the _etag not having changed; a read of any size costs only 1 RU if the result is 304 NOT Modified)

  • Specific Access Strategies:

    • define what we put in the Tip
    • control how we short circuit the process of loading all the Events and folding them from the start, if we encounter a Snapshot or Reset Event
    • allow one to post-process the events we are writing as required for reasons of optimization

Cosmos Access Strategy overviews

Strategy TL;DR Tip document maintains Best suited for
Unoptimized Keep Events only (but there's still an (empty) Tip document) Count of event (and ability to have any insertion invalidate the cache for any reader) No load, event counts or event sizes worth talking about.
initial implementations.
LatestKnownEvent Special mode for when every event is completely independent, so we completely short-circuit loading the events and folding them and instead only use the latest event (if any) A copy of the most recent event, together with the count 1) Maintaining a local copy of a Summary Event representing information obtained from a partner service that is authoritative for that data
2) Tracking changes to a document where we're not really modelling events as such, but with optimal efficiency (every read is a point read of a single document)
Snapshot Keep a (single) snapshot in Tip at all times, guaranteed to include all events A single unfold produced by toSnapshot based on the state' (i.e., including any events being written) every time, together with the event count Typical event-sourced usage patterns.
Good default approach.
Very applicable where you have lots of small 'delta' events that you only consider collectively.
MultiSnapshot As per Snapshot, but toSnapshots can return arbitrary (0, one or many) unfolds Multiple (consistent) snapshots (and _etag/event count for concurrency control as per other strategies) Blue-green deployments where an old version and a new version of the app cannot share a single snapshot schema
RollingState "No event sourcing" mode - no events, just state. Perf as good as Snapshot, but don't even store the events so we will never hit CosmosDB stream size limits toSnapshot state': the squashed state + events which replace the incoming state Maintaining state of an Aggregate with lots of changes
a) that you don't need a record of the individual changes of yet
b) you would like to model, test and develop as if one did
DO NOT use if
a) you want to be able to debug state transitions by looking at individual change events
b) you need to react to and/or project events relating to individual updates (CosmosDB does not provide a way to provide a notification of every single update, even if it does have a guarantee of always showing the final state on the Change Feed eventually)
Custom General form that all the preceding strategies are implemented in terms of Anything transmute yields as the fst of its result (but, typically the equivalent of what Snapshot writes) Limited by your imagination, e.g. emitting events once per hour but otherwise like RollingState

Glossary

  • decide/interpret: Application function that inspects a state to propose events, under control of a Transact loop.

  • Transact loop: Equinox Core function that runs your decide/interpret, and then syncs any generated events to the Store.

  • state: The (potentially cached) version of the State of the Aggregate that Transact supplied to your decide/ interpret.

  • events: The changes the decide/interpret generated (that Transact is trying to sync to the Store).

  • fold: standard function, supplied per Aggregate, which is used to apply Events to a given State in order to evolve the State per implications of each Event that has occurred

  • state': The State of an Aggregate (post-state in FP terms), derived from the current state + the proposed events being synced.

  • Tip: Special document stored alongside the Events (in the same logical partition) which holds the unfolds associated with the current State of the stream.

  • sync: Stored procedure we use to manage consistent update of the Tip alongside insertion of Event Batch Documents (contingent on the Tip's _etag not having changed)

  • unfold: JSON objects maintained in the Tip, which represent Snapshots taken at a given point in the event timeline for this stream.

    • the fold/evolve function is presented Snapshots as if it was yet-another-Event
    • the only differences are
      1. they are not stored as in (immutable) Event documents as other Events are
      2. every write replaces all the unfolds in Tip with the result of the toSnapshot(s) function as defined in the given Access Strategy.
  • isOrigin: A predicate function supplied to an Access Strategy that defines the starting point from which we'll build a state.

    • Must yield true for relevant Snapshots or Reset Events.
  • initial: The (Application-defined) state value all loaded events fold into, if an isOrigin event is not encountered while walking back through th unfolds and Events and instead hit the start of the stream.

  • Snapshot: a single serializable representation of the state'

    • Facilitates optimal retrieval patterns when a stream contains a significant number of events
    • NOTE: Snapshots should not ever yield an observable difference in the state when compared to building it from the timeline of events; it should be solely a behavior-preserving optimization.
  • Reset Event: an event (i.e. a permanent event, not a Snapshot) for which an isOrigin predicate yields true

    • e.g., for a Cart aggregate, a CartCleared event means there is no point in looking at any preceding events in order to determine what's in the cart; we can start folding from that point.)
    • Multiple Reset Event Types are possible per Category, and a stream can often have multiple reset points (e.g., each time a Cart is Cleared, we enter a known state)
    • A Tombstone Event can also be viewed as a Reset Event, e.g. if you have a (long running) bank account represented as a Stream per year, one might annually write a TrancheCarriedForwardAndClosed event which a) bears everything we care about (the final balance) b) signifies the fact that this tranche has now transitioned to read-only mode. Conversely, a Closed event is not by itself a Tombstone Event - while you can infer the Open/Closed mode aspect of the Stream's State, you would still need to look further back through its history to be able to determine the balance that applied at the point the period was marked Closed.

Cosmos Read and Write policies

Strategy Reads involve Writes involve
Unoptimized Querying for, and folding all events (although the cache means it only reads events it has not seen)
the Tip is never read, even e.g. if someone previously put a snapshot in there
1) Insert a document with the events
2) Update Tip to reflect updated event count (as a transaction, as with all updates)
LatestKnownEvent Reading the Tip (never the events) 1) Inserting a document with the new event.
2) Updating the Tip to a) up count/invalidate the _etag b) CC the event for efficient access
Snapshot 1) read Tip; stop if isOrigin accepts a snapshot from within
2) read backwards until the provided isOrigin function returns true for an Event, or we hit start of stream
1) Produce proposed state'
2) write events to new document + toSnapshot state' result into Tip with new event count
MultiSnapshot As per Snapshot, stop if isOrigin yields true for any unfold (then fall back to folding from base event or a reset event) 1) Produce state'
2) Write events to new document + toSnapshots state' to Tip (could be 0 or many, vs exactly one)
RollingState Read Tip
(can fall back to building from events as per Snapshot mode if nothing in Tip, but normally there are no events)
1) produce state'
2) update Tip with toSnapshot state'
3) no events are written
4) Concurrency Control is based on the _etag of the Tip, not an expectedVersion / event count
Custom As per Snapshot or MultiSnapshot
1) see if any unfolds pass the isOrigin test
2) Otherwise, work backward until a Reset Event or start of stream
1) produce state'
2) use transmute events state to determine a) the unfolds (if any) to write b) the events (if any) to emit
3) execute the insert and/or upsert operations, contingent on the _etag of the opening state

DynamoStore

Document stores share many common traits. Thus, applying Mechanical Sympathy in optimizing the storage representation and access patterns will naturally yield an overlapping set of access patterns that work well. This commonality definitely applies when contrasting CosmosDB and DynamoDB. As a result Equinox.DynamoStore can and does implement pretty much the same the same feature set, API patterns and access strategies as Equinox.CosmosStore.

The implementation uses the excellent FSharp.AWS.DynamoDB library 🙏 @eiriktsarpalis @samritchie, which wraps the standard AWS AWSSDK.DynamoDBv2 SDK Package. It also leans on significant preparatory research carried out under the fsharp.org mentorship program 🙏 @pierregoudjo.

Contrasted with CosmosStore

The following focuses on explaining the differences in terms of low level technical detail; the actual usage experience is identical.

The vast majority of the API design and implementation concerns detailed regarding CosmosStore apply (scroll up ☝️)

Request Charge Structures

In broad terms, the charging structure and rate limiting scheme in DynamoDB has only minor differences that manifest in terms of the call patterns that Equinox uses. The most relevant variance in the charge structure is that TransactWriteItems costs twice the (Read and) Write Capacity Units of an equivalent single-item PutItem / UpdateItem calls; therefore its use needs to be considered carefully.

Sync logic in a Stored Procedure vs TransactWriteItem/PutItem/UpdateItem API calls

The Append operation adds events and/or updates the unfolds in the Tip. Instead of using a Stored Procedure as CosmosStore does, the implementation involves conditional PutItem and UpdateItem requests to accumulate events in the Tip (where there is space available).

At the point where the Tip exceeds any of the configured and/or implicit limits, a TransactWriteItems request is used (see implementation in FSharp.AWS.DynamoDB), to Calve a Batch of items from the Tip. The calving is triggered by any of:

  • maximum event count (not limited by default)
  • maximum accumulated event size (default 32KiB)
  • DynamoDB Item Size Limit (hard limit of 400KiB)

Further information:

  • DynamoDB Transactions: Use Cases and Examples by Alex DeBrie provides a thorough review of the TransactWriteItems facility (TL;DR: it's far more general than the stream level atomic transactions afforded by CosmosDB's Stored Procedures)

Differences in read and resync paths

  • DynamoDB does not support an etag-checked Read API, which means a cache hit is not as efficient in RC terms as it is on CosmosDB (and the data travels and is deserialized unnecessarily).
  • Concurrency conflicts necessitate an additional roundtrip to resync as the DynamoDB Service does not yield the item in the event of a ConditionalCheckFailedException
  • DynamoStore's Position structure (in the StreamToken held by Equinox while a Transact call is in flight) includes the (compressed) events in the Tip, holds byte counts for use in the size calculations required to guarantee a Calve request will happen if the 400KiB size limit (or, more realistically, lower limits dictated by the increasing charges based on Item size) is due to be breached. The CosmosStore equivalent consists of only the _etag and the event count (the stored procedure uses JSON.stringify to compute an indicative size, but the 2 MB UTF-8 JSON size limit is ample given the fact that RU costs get prohibitive long before you hit such a limit).

Event Bodies are BLOBS vs JsonElements

CosmosStore dictates (as of V4) that event bodies be supplied as System.Text.Json.JsonElements in order that events can be included in the Document/ Items as JSON directly. This is also to underscore the fact that the only reasonable format to use is valid JSON; binary data would need to be base64 encoded.

DynamoStore accepts and yields event bodies as arbitrary ReadOnlyMemory<byte> BLOBs (the AWS SDK round-trips such blobs as a MemoryStream and does not impose any restrictions on the blobs in terms of required format)

CosmosStore defaults to compressing (with System.IO.Compression.DeflateStream) the event bodies for Unfolds; DynamoStore round-trips an Encoding field per blob (one for the data, one for the metadata) Events and Unfolds in order to enable the IEventCodec to decompress the blobs as required. In both cases, minimizing Request Charges is imperative: request size directly maps to financial charges, 429s, reduced throughput and a lowered scaling ceiling.

Features not ported and/or not easily implementable

  • Equinox.Cosmos.Core.Events.appendAtEnd/NonIdempotentAppend has not been ported (there's no obvious clean and efficient way to do a conditional insert/update/split as the CosmosDB stored proc can, and this is a low usage feature)

Support for Reactions / Change Feed equivalent features

Azure CosmosDB's ChangeFeed mechanism naturally supports replays of all the documents/Items in a Store (with ordering guarantees at stream level, but not across streams). The API implementation spins up a querying loop per consumer, which can efficiently do a 'SELECT * query based on its intrinsic ordering to cover both bulk reading and tailing requirements.

On the other hand, the DynamoDB Streams facility retains 24h of individual insert/update/delete records with concurrent readers capped at ~2.

In order to be able to provide facilities equivalent to those of Propulsion.CosmosStore's CosmosStoreReader (a lightweight wrapper over the CosmosDB ChangeFeed API) provides, there are ancillary components that collectively provide equivalent functionality.

Propulsion.DynamoStore.Lambda

CosmosDB intrinsically maintains and surfaces the documents/Items (and physical partition metadata as that shifts over time) in such a manner that any number of consumers can concurrently walk all the data across all the physical partitions and be guaranteed to have traversed every change (though notably not including deletes; this is of course fine as our model is append-only) when one has reached the current 'end' of each physical partition (even in the face of physical partition splits and/or document updates during the walk).

It should be noted that these walks are not free; each reader induces RU consumption on the Container that impacts the capacity available for other reads and writes. There is also an amplification effect: each write immediately triggers N reads of the same size.

DynamoDB does not provide an equivalent mechanism for an online traversal that's guaranteed to see all Items. (You can do an export to an S3 bucket that is guaranteed to include all items at a point in time, but you'd then need to supplement that with subsequent changes via DynamoDB Streams).

The mechanism employed is to provision DynamoDB Streams for the Table holding the events, which is fed to an AWS Lambda via a DynamoDB Streams Trigger that is configured to reliably ingest the writes in the order of their occurence at the stream level via an Indexer.

Relevant background articles from Amazon:

DynamoStoreIndexer

The DynamoStoreIndexer represents each batch (1-10,000 DDB Streams Records fed to the Lambda) as an Ingested event in a sequence of $AppendsEpoch-<trancheId>-<epoch> streams in near real time after they are Appended.

AppendsEpoch

Each Span in an Ingested event within an AppendsEpoch stream consists of:

  • the Stream Name
  • Index/offset within that stream
  • the Event Types

NOTE: as illustrated in the slide deck, as DynamoDB varies the number of Shards, the number of concurrent instances of the Lambda can also rise. In the present implementation, multiple Lambda instances will be competing to write to the Tip of a single chain of epochs.

Its conceivable that one might internally partition the overall Index (based on the StreamName), but the current implementation does not address this, hence the value of the FeedTrancheId is always 0

AppendsIndex

$AppendsIndex-0 maintains the epoch number where the Indexer will be appending.

DynamoStoreSource

Propulsion.DynamoStore.DynamoStoreSource fulfils the role that CosmosStoreSource fulfils when using Propulsion.CosmosStore: catching up on any events a consumer group has not yet seen (in a batched fashion), and thereafter tailing the index to at sub-second intervals.

DynamoStoreSource Reader logic

The reader logic in DynamoStoreSource walks the chain of Index Epoch Streams in sequence.

There is a configurable LoadMode which offers:

As the Propulsion Projector completes the processing of the full set of items in a batch of items submitted to it, checkpointing (asynchronously) moves the Position in the $ReaderCheckpoint forward.

ReaderCheckpoint

$ReaderCheckpoint maintains the current Position (as an int64 that encodes the Epoch index and the offset within that Epoch). Each consumer group has a single position.

The state lives in its own stream in the Index Table, named $ReaderCheckpoint-dynamoStore_0_<consumerGroup>, where:

  • dynamoStore is the well-known Feed.SourceId, another example is eventStoreDb
  • 0 is the well-known Feed.TrancheId
  • consumerGroup is the application-supplied consumer group (equivalent to the ProcessorName in CosmosDB ChangeFeedProcessor parlance or consumer group name in Kafka terms)

Separated Index Table

By convention, the $AppendsIndex-0, $AppendsEpoch-0_<epoch> and $ReaderCheckpoint-dynamoStore_0_<consumerGroup> streams are maintained in a separated <tableName>-index Table in DynamoDB. The reasons for this separation include:

  • being able to independently scale (and ring-fence) the activity in order to vary the ingestion and consumption capacity
  • removing ripple effects where an inactive system keeps reading checkpoint update events, which then self-perpetuate as the updated position is written
  • (future proofing: e.g. being able to re-index an entire dataset based on an S3 backup dataset)

DynamoStoreIndexer+Source vs CosmosDB ChangeFeed + CosmosStoreSource

Taking an example case where you have a single Container in CosmosDB / Table in DynamoDB, and 5 reactor applications that consume events as they are written, we can illustrate the differences by looking at the moving parts in involved.

With CosmosDB's ChangeFeed and Propulsion.CosmosStoreSource:

  • an -aux Container that maintains 5 sets of checkpoints and leases (a document per physical partition within the container).
  • a polling loop that reads every document from the (single) Events Container (there is an individual loop per physical partition). NOTE the leasing mechanism and the fact that processing is split by physical partition also means one can distribute processing activity across multiple processing nodes.
  • because the CosmosDB ChangeFeed read APIs don't provide for filtering etc, every document that's updated triggers 5 sets of reads of the entire document (even if you only appended one event or only updated an unfold). This implies you need to be particularly careful about limiting how many readers you have and/or how large documents/Items in the store get.
  • if you use Azure Functions to process the ChangeFeed, it's pretty much the same equations in terms of activity, charges and scaling but you can conceptually service all your reactions without a hosting application.
  • There is nothing special you need to do to enable the ChangeFeed.
  • There is no buffering of events involved at any level; it's all query loops driven by consumers.
  • Architecture Astronauts frequently jump to an incorrect conclusion, that the single correct use of the ChangeFeed is thus to push events to Kafka.

With DynamoDB Streams, Propulsion.DynamoStore.Lambda and Propulsion.DynamoStore.DynamoStoreSource:

  • The DynamoDB Table that stores the events must have DynamoDB Streams configured (in either NEW_IMAGE or NEW_AND_OLD_IMAGES mode).
  • The DynamoDB Index Table does not require DynamoDB Streams configured (if you wanted to use it to drive replication, you'd simply use DynamoStoreSource to tail the epochs directly).
  • The Lambda package must be provisioned, with permissions to write to the Index Table (and alerting so you will know if it's IteratorAge metric exceeds your latency requirements and/or is approaching the 24h limit after which the guaranteed delivery of notifications would be lost).
  • The Lambda must coupled to the Events Table by establishing a DynamoDB Streams Trigger
  • The indexing activity runs in AWS Lambda. Aside from throughput being limited by the read and write capacity of the Index Table, it's self managing. The running costs are pretty trivial.
  • Each of the 5 reactor applications read from the Index table. Because the format is pretty compact, tens or hundreds of competing readers are conceivable as long as you configure the table to provide the relevant read capacity units (AutoScale mode may make sense).
  • You will ideally want to have your reactions be predicated on just Stream Names and Event Types. Failing that, the use of Hydrated mode will induce load on the Events Table in proportion to the number of reactor applications that require that (although a low number of readers doing that is unlikely to be problematic).

Propulsion.DynamoStore side notes / FAQs

What about using the Lambda Parallelization Factor instead?

As described in AWS Lambda Supports Parallelization Factor for Kinesis and DynamoDB Event Sources and New AWS Lambda scaling controls for Kinesis and DynamoDB event sources, there is a powerful feature that enables one to configure a Lambda per stream to process reactions in parallel. This works up to a point and clearly has fewer moving parts. It should thus be seriously considered. The key weaknesses of the approach relative to a more idiomatic event sourcing approach are:

  • the lack of ability to replay
  • the fact that the DynamoDB Streams facility realistically limits you to 1/2 such Lambdas.

Stream Management Policies

The stores supported by Equinox are primarily intended to house Domain Events (Facts) from an event-sourced model. Such events are retained indefinitely in an immutable form.

Often, the management of Ephemeral Events (that one might equivalently record on a bus, queue or a topic in systems such as Apache Kafka) involves needs that overlap significantly with those of managing Domain Events. However, there's a point at which maintaining equivalent levels of access to such data is of significantly lesser value than it is for Domain Events.

In theory, it can be argued that events with an ephemeral aspect are not True Event-Sourcing Events, and as such should be considered entirely separately.

In practice, for myriad reasons, stores such as EventStoreDB, CosmosDB and SqlStreamStore become candidates for and/or victims of the blurring of the divide between ephemeral events and Domain Events.

For the above reasons, a key aspect of designing, maintaining and evolving an event-sourced system involves the management of the overall set of events comprising the system's state:

  • grouping events into streams in accordance with the goals of the system as a whole (i.e. how one models the system in terms of aggregates), with consideration for how well a given structure aligns with the characteristics of a given Store
  • implementing policies reflecting the relevance of a stream and/or its events over time via various mechanisms: from shifting them to lower performance storage, archiving them to a separated store that's not accessible from the current online system all the way to outright deletion
  • drawing the line with regard to ephemeral events representing state that truly does not belong alongside your Domain Events

Aggregate streams

While the store's capabilities and restrictions are where the rubber meets the road in your streams/events layout, it should not be the primary driver.

When considering which events should be united in a given stream-category, some key factors are:

  • is there an invariant that the Aggregate is seeking to uphold? (remember, the stream is the principal unit of consistency control)
  • do all the events relate to a meaningful atomic structure within your system?
  • when making decisions based on grouping the events in a given way, is the resulting state a reasonable size? (this feeds into whether it's feasible to snapshot the state)
  • is the state cohesive, or is it possible to partition the grouping even further? (think SRP and ISP)
  • is there a natural characteristic of the aggregate that bounds the number of events that will occur over its lifetime? (e.g., "the appointments" vs splitting by day/month/year/facility)

Topic streams

When you don't load and fold events to arrive at a state.

In some cases, a stream may not even have a meaningful state, invariant or a business process that it's supporting:

  • example: a stream is used to queue up commands and/or post outcomes as part of some process. In such a case, the 'state' boils down to checkpointing how far a given consumer has walked along the topic (as opposed to maintaining a rolling state derived primarily from the events that one queries, renders or uses to support a decision flow).

Such topic-streams are not aggregates as such, and are not addressed as a primary use case in the Equinox Programming Model.

However, such topic-streams are nonetheless subject to almost identical considerations in terms of how we deal with managing the lifetime of the data.

Store-specific stream bounding / event retention considerations

Across both Aggregate and Topic use cases, there are specific facilities afforded (and restrictions imposed) by the specific store you're using. For instance:

  • Stream size limits - EventStoreDB: EventStore does not impose any limitations on the maximum size or event count that a single stream can bear. This allows one to maintain a perpetual queue and/or an ordered sequence of events, with or without using a retention policy to control the trimming of expired/excess events.
  • Stream size limits - CosmosDB: The total size of the events and Tip-document of a stream must adhere to the CosmosDB logical partition limit of 20GB.
  • Retention policies - EventStoreDB: Streams can have retention policies defined via each stream's metadata stream. The server cluster manages the application of these rules. The scavenging process removes the events, compacting the data by rewriting chunks with deleted, extraneous or aged-out events elided.
  • Retention policies - CosmosDB: the CosmosDB TTL facility allows one to define a TTL at the document level. CosmosDB removes expired items automatically (whenever residual RU capacity allows).

NOTE: Equinox does not presently expose specific controls to allow specification of either a CosmosDB TTL or EventStoreDB stream metadata.

Mutation, archival and pruning of Events

Considerations regarding mutation or deletion of events

You don't rewrite events or streams in a Store, for reasons

For Domain Events in an event-sourced model, their permanence and immutability is typically considered axiomatic; readers expect to be able to cache them forever, rely on their index on a stream remaining fixed etc. Some (rare) corner cases where one might wish to deviate from such axioms in terms of Domain Events in a model include:

  • rewriting streams as an expedient solution to a bug etc: as with the rewriting in history in git, the first rule is: DONT. (But it's technically possible and in some cases this nuclear option can solve a problem)
  • intentionally removing data: for GDPR or CCPA reasons, you may opt to mutate or remove events as part of addressing a need to conclusively end-of-life some data (many better solutions are available...)

It should be noted with regard to such requirements:

  • EventStoreDB does not present any APIs for mutation of events, though deleting events is a fully supported operation (although that can be restricted). Rewrites are typically approached by doing an offline database rebuild.
  • Equinox.Cosmos and Equinox.CosmosStore include support for pruning events (only) from the head of a stream. Obviously, there's nothing stopping you deleting or altering the Batch documents out of band via the underlying CosmosDB APIs directly (Note however that the semantics of document ordering within a logical partition means its strongly advised not to mutate any event Batch documents as this will cause their ordering to become incorrect relative to other events, invalidating a key tenet that Change Feed Processors rely on).

Growth handling strategies

No matter what the vendor tells you, it's literally not going to scale linearly...

A more typical concern for an event-sourced model is managing what should happen when an Aggregate falls out of scope. For instance, a pick ticket entity in a warehouse is only of historical interest after a certain period of time (the customer's Order History maintains long-lived state pertaining to orders and/or associated returns etc.)

With regard to such needs, here are some store-specific considerations:

  • EventStoreDB caches only in-use streams and events. Hosting streams that are no longer relevant is considered a completely normal use case:

    • streams and/or regions of streams that are no longer relevant don't induce a major cost on the system
    • each client maintains a single connection to the server cluster; there is no incremental cost in terms of the potential network or client process resource consumption related directly to the size of your dataset
    • however, there is a non-zero cost; the overall dataset needs to be colocated and backed up as a whole (there are also internal index structures maintained alongside the event chunk files, with rebuild times directly related to the store's event count etc).
  • For CosmosDB, the costs and impacts of retaining events and/or streams that are no longer relevant are more direct; ways they manifest include:

    • the billing model imposes a linear cost per GB that applies equally to all event-batch documents in your store, plus the size of the associated indexes (which strongly relate to the number of items stored). These costs are multiplied by the number of regions to which you replicate.
    • the total size of your dataset affects the minimum number of nodes across which the data will spread. i.e. 1 TB of data will require at least 10,000 RU/s to be allocated to it regardless of traffic
    • the more nodes you have, the more TCP connections and other related fixed resources each client instance requires
    • the RU/s allocated to your container can only be spread equally across all nodes. Thus, if you have 100GB spread over 5 nodes and allocate 10,000 RU/s to the Container, each node gets 2,000 RU/s and callers get 429s if there happen to be more than that incurred for that given node in that second (with significant latency impact as all such rate-limited clients need to back off for >= 1s for each rate-limited attempt).
    • the cost of over-provisioning to ensure appropriate capacity for spikes in load and/or to handle hotspots (where one node happens to host a stream that's accessed disproportionately heavily relative to data on other nodes) is multiplied by the number of nodes. Example: if you have a single node with 5GB of data with 2,000 RU/s allocated and want to double the peak capacity, you simply assign it 4,000 RU/s; if you have 100GB over 5 nodes, you need to double your 5x2,000 to 5x4,000 to achieve the same effect
    • there are significant jumps in cost for writes based on the indexing cost as the number of items in a logical partition increases (empirically derived data; subject to change: for instance inserts of a a minimal (<100 bytes) event that initially costs ~20RU becomes > 40RU with 128 items, > 50RU with 1600 items, >60 at 2800 items and >110RU at 4900 items as snapshots or event sizes hit certain thresholds).

There are myriad approaches to resolving these forces. Let's examine the trade-offs of some relevant ones...

Database epochs

Perhaps we can Just leave it all behind and switch to a new blank database?

In some systems, where there's a relevant natural cycle in the domain, the answer to managing database growth may be simpler than you think. For instance:

  • you may be able to start with a blank database for each trading day for the bulk of the events your system operates on.
  • your domain may have a natural end of year business process that dictates a formal closing of accounts with selective migration of relevant summarized data to be carried forward into a successor epoch. In such instances, each closed year can be managed as a separated (effectively read-only) dataset.

As a fresh epoch of data becomes the active dataset, other options open up:

  • one might migrate the now-of-secondary-importance data to cheaper hardware or network resources
  • one might archive the database once you've validated the transition has been effected completely

Stream epochs

Replace a perpetual stream with a series of finite epoch-streams, allowing superseded ones to be archived or deleted

As covered above, long streams bring associated costs. A key one that hasn't been mentioned is that, because the unit of storage is a stream, there's no easy way to distinguish historic events from current ones. This has various effects on processing costs such as (for Aggregate streams), that of loading and folding the state (or generating a snapshot).

Analogous to how data can be retired (as described in Database epochs), it may be possible to manage the growth cycle of continuous streams by having readers and writers coordinate the state of given stream cooperatively via the following elements:

  • one Series aggregate: maintains the current active epoch id for the series
  • many Epoch streams: independent streams (sharing a root name), sufficed by the epoch id
  • having a deterministic way of coordinating to ensure each (independent) writer will recognize that a given epoch is closed (e.g., based on event count, elapsed time since the epoch started, total event payload bytes, etc.)

Depending on whether there's state associated with a given stream, the system periodically transitions the Series to a new Epoch by algorithms with mechanisms such as:

  • Topic-stream: write a Closed event; have all writes be contingent on no such event preceding any write to an epoch-stream
  • Aggregate stream:
    1. write a Closed event to the outgoing epoch-stream, followed by (as a separate action with idempotent semantics) ...
    2. write a CarriedForward event to open the new Epoch (again, all writers follow the same rules in order to be able to make writes idempotent even in the face of concurrent writers)

The writing of the event to move the active Epoch id forward in the Series aggregate can take place at any point after the Closed event has been written to the outgoing epoch-stream (including concurrently with the writing of the CarriedForward event). The reason for this is that the active epoch can be inferred by walking forward from any given epoch until one arrives at an epoch that's not Closed.

WIP implementation of a dotnet new template illustrating the Stream Epochs approach

Monitoring a primary dataset for Archival/Pruning

Move or delete out-of-scope data from a primary (hot) dataset to a cheaper (warm/cold) stream

As with 'Database epochs', once a given 'Stream epoch' has been marked active in a Series, we gain options as to what to do with the preceding ones:

  • we may opt to retain them in order to enable replaying of projections for currently-unknown reasons
  • if we intend to retain them for a significant period: we can replicate/sync/mirror/archive them to a archive store, then prune them from the primary dataset
  • if they are only relevant to assist troubleshooting over some short term: we can delete them after a given period (without copying them anywhere)

When writing to a archival store, there's also an opportunity to vary the writing process from that forced by the constraints imposed when writing as part of normal online transaction processing:

  • it will often make sense to have the archiver add a minimal placeholder to the archival store regardless of whether a given stream is being archived, which can then be used to drive the walk of the primary instead of placing avoidable load on the primary by having to continually loop over all the data in order to re-assess archival criteria over time
  • when copying from primary to archive, there's an opportunity to optimally pack events into batches (for instance in Equinox.CosmosStore, batching writes means less documents, which reduces document count, per-document overhead, the overall data and index size in the container and hence query costs)
  • when writing to warm archival storage, it may make sense to compress the events (under normal circumstances, compressing event data is rarely considered a worthwhile tradeoff).
  • where the nature of traffic on the system has peaks and troughs, there's an opportunity to shift the process of traversing the data for archival purposes to a window outside of the peak load period (although, in general, the impact of reads for the purposes of archival won't be significant enough to warrant optimizing this factor)

Archiver + Pruner roles

Outlining the roles of the proArchiver and proPruner templates

It's conceivable that one might establish a single service combining the activities of:

  1. copying (archiving) to the archive store in reaction to changes in the primary
  2. pruning from the primary when the copying is complete
  3. deleting immediately
  4. continually visiting all streams in the primary in order to archive and/or prune streams that have fallen out of use

However, splitting the work into two distinct facilities allows better delineation of responsibilities:

  • clarifies the relative responsibilities (and allows them to be considered individually)
  • allows the load (deletes can be costly in RU terms on CosmosDB) on the primary dataset to be more closely controlled

Archiver

An archiver tails a monitored store and bears the following responsibilities:

  • minimizing the load on the source it's monitoring
  • listens to all event writes (via $all in the case of EventStoreDB or a ChangeFeed Processor in the case of CosmosDB)
  • ensuring the archive becomes aware of all new streams (especially in the case of Equinox.CosmosStore streams in AccessStrategy.RollingState mode, which do not yield a new event-batch per write)

Pruner

The pruner cyclically (i.e., when it reaches the end, it loops back to the start) walks the archive store:

  • visiting each stream, identifying the current write position in the archive
  • uses that as input into a decision as to whether / how many events can be trimmed from the primary (deletion does not need to take place right away - Equinox will deal with events spread over a Primary/Archive pair of Containers via the Loading Fallback mechanism
  • (for Equinox.CosmosStore) can optimize the packing of the events (e.g. if the most recent 4 events have arrived as 2 batches, the pruner can merge the two batches to minimize storage and index size). When writing to a primary collection, batches are never mutated for packing purposes both due to write costs and read amplification.
  • (for Equinox.CosmosStore) can opt to delete from the primary if one or more full Batches have been copied to the archive (note the unit of deletion is a Batch - mutating a Batch in order to remove an event would trigger a reordering of the document's position in the logical partition)

Ideas

Things that are incomplete and/or require work

This is a very loose laundry list of items that have occurred to us to do, given infinite time. No conclusions of likelihood of starting, finishing, or even committing to adding a feature should be inferred, but most represent things that would be likely to be accepted into the codebase.

  • Extend samples and templates; see #57

Wouldn't it be nice - Equinox.EventStoreDb

  • Provide a low level walking events in F# API akin to Equinox.CosmosStore.Core.Events; this would allow consumers to jump from direct use of EventStore.Client -> Equinox.EventStore.Core.Events -> Equinox.Decider (with the potential to swap stores once one gets to using Equinox.Decider)
  • Get conflict handling as efficient and predictable as for Equinox.CosmosStore jet#38
  • provide for snapshots to be stored out of the stream, and loaded in a customizable manner in a manner analogous to the proposed comparable Equinox.CosmosStore facility
  • Provide a facility in FsCodec.IEventCodec to walk the Event DU to generate a list of event types; use that to generate the server-side event loading filter e.g. when a Decider used a highly selective subset of the known Event Types
  • (If Server started to support it), provide a hint when loading as to the isOrigin Event Type so backward load can stop when it meets an Embedded Snapshot or Reset (e.g. CartCleared) event
  • Port MessageDb.AccessStrategy.AdjacentSnapshots, automatically configuring the metadata of the snapshots stream to a maxCount of 1 event for automatic purging of superseded snapshots

Wouldn't it be nice - Equinox.MessageDb

Wouldn't it be nice - Equinox.SqlStreamStore

  • Provide support for an isOrigin overload that works on the event type string; implement a two phase load algorithm that loads the events first, and then the bodies only from the origin point forward

Wouldn't it be nice - Equinox.CosmosStore

  • Switching to using MS V4 SDK eventually (Parked in #197). See also #232
  • Refactor the usage of the Stored Proc to instead use the V3 API's Transactional Batch support (will likely happen as a backport from Equinox.DynamoStore)
  • Enable snapshots to be stored outside of the main collection in Equinox.CosmosStore #61
  • Multiple writers support for unfolds (at present a sync completely replaces the unfolds in the Tip; this will be extended by having the stored proc maintain the union of the unfolds in play (both for semi-related services and for blue/green deploy scenarios); TBD how we decide when a union that's no longer in use gets removed) #108
  • low level performance improvements in loading logic (reducing allocations etc)
  • Propulsion.CosmosStore: provide a Serverless mode that can be used with Azure Functions to execute batch of projections based on a set of documents from the change feed