Skip to content

Commit

Permalink
Events in Tip Optimization (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Oct 21, 2020
1 parent 539e7ef commit 25ebc33
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Reorganized `QueryRetryPolicy` to handle `IAsyncEnumerable` coming in Cosmos SDK V4 [#246](https://github.com/jet/equinox/pull/246) :pray: [@ylibrach](https://github.com/ylibrach)
- Added Secondary store fallback for Event loading, enabling Streams to be hot-migrated (archived to a secondary/clone, then pruned from the primary/active) between Primary and Secondary stores [#247](https://github.com/jet/equinox/pull/247)
- Replaced `BatchingPolicy`, `RetryPolicy` with `TipOptions`, `QueryOptions` to better align with Cosmos SDK V4 [#253](https://github.com/jet/equinox/pull/253)
- Added support for accumulating events in Tip [#251](https://github.com/jet/equinox/pull/251) see also [#110](https://github.com/jet/equinox/pull/110)
- target `EventStore.Client` v `20.6` (instead of v `5.0.x`) [#224](https://github.com/jet/equinox/pull/224)
- Retarget `netcoreapp2.1` apps to `netcoreapp3.1` with `SystemTextJson`
- Retarget Todobackend to `aspnetcore` v `3.1`
Expand Down
27 changes: 11 additions & 16 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1740,11 +1740,10 @@ based on the events presented.

## Sync stored procedure

This covers what the most complete possible implementation of the JS Stored
This covers the V3 implementation of the JS Stored
Procedure (see
[source](https://github.com/jet/equinox/blob/tip-isa-batch/src/Equinox.Cosmos/Cosmos.fs#L302))
does when presented with a batch to be written. (NB The present implementation
is slightly simplified; see [source](src/Equinox.CosmosStore/CosmosStore.fs#L404).
[source](https://github.com/jet/equinox/blob/master/src/Equinox.CosmosStore/CosmosStore.fs#L396))
does when presented with a batch to be written.

The `sync` stored procedure takes as input, a document that is almost identical
to the format of the _`Tip`_ batch (in fact, if the stream is found to be
Expand All @@ -1762,15 +1761,14 @@ stream). The request includes the following elements:
expectedVersion check is fulfilled
- `u`: array of `unfold`ed events (aka snapshots) that supersede items with
equivalent `c`ase values
- `maxEvents`: the maximum number of events permitted to be retained in the Tip (subject to that not exceeding the `maxStringifyLen` rule). For example:

- if `e` contains 2 events, the _tip_ document's `e` has 2 events and the
`maxEvents` is `5`, the events get appended onto the tip's `e`vents
- if the total length including the new `e`vents would exceed `maxEvents`, the Tip is 'renamed' (gets its `id` set to `i.toString()`) to become a
batch (with the new `e`vents included in that calved Batch), and the new Tip
has a zero-length `e`vents array, and a set of `u`nfolds
(as an atomic transaction on the server side)

- `maxEventsInTip`: the maximum number of events permitted to be retained in the Tip (subject to that not exceeding the `maxStringifyLen` rule). For example:

- if `e` contains 2 events, the _tip_ document's `e` has 2 events and the `maxEventsInTip` is `5`, the events get appended onto the tip's `e`vents
- if the total length including the new `e`vents would exceed `maxEventsInTip`, the Tip is 'renamed' (gets its `id` set to `i.toString()`) to become a
batch (with the new `e`vents included in that calved Batch), and the new Tip has a zero-length `e`vents array
as a `Batch`, and a set of `u`nfolds (as an atomic
transaction on the server side)
- `maxStringifyLen`: secondary constraint on the events retained in the tip (in addition to `maxEventsInTip` constraint) - constrains the maximum length of the events being buffered in the Tip by applying a size limit in characters (as computed via `JSON.stringify(events).length`)
- (PROPOSAL/FUTURE) `thirdPartyUnfoldRetention`: how many events to keep before
the base (`i`) of the batch if required by lagging `u`nfolds which would
otherwise fall out of scope as a result of the appends in this batch (this
Expand Down Expand Up @@ -2256,9 +2254,6 @@ rich relative to the need of consumers to date. Some things remain though:
services and for blue/green deploy scenarios); TBD how we decide when a union
that's no longer in use gets removed)
[#108](https://github.com/jet/equinox/issues/108)
- performance, efficiency and concurrency improvements based on
[`tip-isa-batch`](https://github.com/jet/equinox/tree/tip-isa-batch) schema
generalization [#109](https://github.com/jet/equinox/issues/109)
- low level performance improvements in loading logic (reducing allocations etc)

## Wouldn't it be nice - `Equinox.DynamoDb`
Expand Down
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,17 @@ Some aspects of the implementation are distilled from [`Jet.com` systems dating
- **`Equinox.CosmosStore` 'Tip with Unfolds' schema**: (In contrast to `Equinox.EventStore`'s `AccessStrategy.RollingSnapshots`) when using `Equinox.CosmosStore`, optimized loading and command processing is managed via the `Tip`; a document per stream that facilitates Syncing via a single point-read as the 'Tip' maintains:
a) the present Position of the stream - i.e. the index at which the next events will be appended, which is used for [optimistic concurrency control](https://en.wikipedia.org/wiki/Optimistic_concurrency_control)
b) ephemeral (`deflate+base64` compressed by default, in order to optimize RU costs) [_unfolds_](DOCUMENTATION.md#Cosmos-Storage-Model)
c) (potentially in future) a holding buffer for events since those unfolded events ([presently removed](https://github.com/jet/equinox/pull/58), but [should return](DOCUMENTATION.md#Roadmap), see [#109](https://github.com/jet/equinox/pull/109))
c) pending events, up to a specified count (or `JSON.stringify` length). When the events in tip accumulation limit is reached, they are shifted out to a thereafter-immutable `Batch`.

This yields many of the benefits of the in-stream Rolling Snapshots approach while reducing latency and RU provisioning requirements due to meticulously tuned Request Charge costs:-
- when the stream is empty, the initial `Load` operation involves a single point read that yields a `404 NotFound` response, costing 1.0 RU
- when coupled with the cache, a typical `Reload` operation is a _point read_ [with `IfNoneMatch` on an etag], costing 1.0 RU if in-date [to get the `302 Not Found` response]
- when coupled with snapshots/unfolds mechanism, a cache miss on `Reload` only triggers paying the cost for reading of the compressed snapshot stored in the Tip (i.e. instead of a `302`, the `IfNoneMatch` yields a `200` and returns all relevant information in the same roundtrip)
- writes are via a single invocation of the `Sync` stored procedure which:
a) does a point read
b) performs a concurrency check and then either...
c) applies the write OR returns the conflicting unfolds
b) performs a concurrency check and then either
c) applies the write OR returns the conflicting events and unfolds
- no additional round trips to the store needed at either the `Load`, `Reload` or `Sync` points in the flow

It should be noted that from a querying perspective, the `Tip` shares the same structure as `Batch` documents (a potential future extension would be to carry some events in the `Tip` as [some interim versions of the implementation once did](https://github.com/jet/equinox/pull/58), see also [#109](https://github.com/jet/equinox/pull/109).
- **`Equinox.CosmosStore` `RollingState` and `Custom` 'non-event-sourced' modes**: Uses 'Tip with Unfolds' encoding to avoid having to write event documents at all - this enables one to build, reason about and test your aggregates in the normal manner, but inhibit event documents from being generated. This enables one to benefit from the caching and consistency management mechanisms without having to bear the cost of writing and storing the events themselves (and/or dealing with an ever-growing store size). Search for `transmute` or `RollingState` in the `samples` and/or see [the `Checkpoint` Aggregate in Propulsion](https://github.com/jet/propulsion/blob/master/src/Propulsion.EventStore/Checkpoint.fs). One chief use of this mechanism is for tracking Summary Event feeds in [the `dotnet-templates` `summaryConsumer` template](https://github.com/jet/dotnet-templates/tree/master/propulsion-summary-consumer).

## Components
Expand Down
12 changes: 9 additions & 3 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ module Cosmos =
| [<AltCommandLine "-s2">] Connection2 of string
| [<AltCommandLine "-d2">] Database2 of string
| [<AltCommandLine "-c2">] Container2 of string
| [<AltCommandLine "-te">] TipMaxEvents of int
| [<AltCommandLine "-tl">] TipMaxJsonLength of int
| [<AltCommandLine "-b">] QueryMaxItems of int
interface IArgParserTemplate with
member a.Usage =
Expand All @@ -60,6 +62,8 @@ module Cosmos =
| Connection2 _ -> "specify a connection string for Secondary Cosmos account. Default: use same as Primary Connection"
| Database2 _ -> "specify a database name for Secondary store. Default: use same as Primary Database"
| Container2 _ -> "specify a container name for store. Default: use same as Primary Container"
| TipMaxEvents _ -> "specify maximum number of events to hold in Tip before calving off to a frozen Batch. Default: 256"
| TipMaxJsonLength _ -> "specify maximum length of JSON (as measured by JSON.stringify) to hold in Tip before calving off to a frozen Batch. Default: 30,000"
| QueryMaxItems _ -> "specify maximum number of batches of events to retrieve in per query response. Default: 10"
type Info(args : ParseResults<Arguments>) =
member __.Mode = args.GetResult(ConnectionMode,Microsoft.Azure.Cosmos.ConnectionMode.Direct)
Expand All @@ -76,6 +80,8 @@ module Cosmos =
member __.Timeout = args.GetResult(Timeout,5.) |> TimeSpan.FromSeconds
member __.Retries = args.GetResult(Retries,1)
member __.MaxRetryWaitTime = args.GetResult(RetriesWaitTimeS, 5.) |> TimeSpan.FromSeconds
member __.TipMaxEvents = args.GetResult(TipMaxEvents, 256)
member __.TipMaxJsonLength = args.GetResult(TipMaxJsonLength, 30000)
member __.QueryMaxItems = args.GetResult(QueryMaxItems, 10)

/// Standing up an Equinox instance is necessary to run for test purposes; You'll need to either:
Expand Down Expand Up @@ -106,9 +112,9 @@ module Cosmos =
CosmosStoreConnection(client, databaseId, containerId)
| (client, databaseId, containerId), Some (client2, db2, cont2) ->
CosmosStoreConnection(client, databaseId, containerId, client2 = client2, databaseId2 = db2, containerId2 = cont2)
log.Information("CosmosStore Max Items in Query: {queryMaxItems}",
a.QueryMaxItems)
let ctx = CosmosStoreContext(conn, queryMaxItems = a.QueryMaxItems)
log.Information("CosmosStore Max Events in Tip: {maxTipEvents}e {maxTipJsonLength}b Items in Query: {queryMaxItems}",
a.TipMaxEvents, a.TipMaxJsonLength, a.QueryMaxItems)
let ctx = CosmosStoreContext(conn, queryMaxItems = a.QueryMaxItems, tipMaxEvents = a.TipMaxEvents, tipMaxJsonLength = a.TipMaxJsonLength)
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
StorageConfig.Cosmos (ctx, cacheStrategy, unfolds)

Expand Down
Loading

0 comments on commit 25ebc33

Please sign in to comment.