Skip to content

Commit

Permalink
Removing Capacity union from configuration #5
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzoukr committed Dec 7, 2018
1 parent f3c3e63 commit f7305ab
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 81 deletions.
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 1.3.0 - November 24 2018
* Removing Capacity union from configuration #5
* Reflexing changes in throughput range announced on MS Build 2018

### 1.2.0 - November 24 2018
* Fixing private functions visibility
* Unused DatabaseName configuration property renamed to TableName and used for specifying destination table
Expand Down
33 changes: 10 additions & 23 deletions src/CosmoStore/CosmosDb/CosmosDb.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,26 @@ namespace CosmoStore.CosmosDb

open System

type Capacity =
| Fixed
| Unlimited
with
member x.ThroughputLimits =
match x with
| Fixed -> 400, 10000
| Unlimited -> 1000, 100000

member x.CorrectThroughput (i:int) =
let throughput = Math.Round((decimal i) / 100m, 0) * 100m |> int
let min,max = x.ThroughputLimits
if throughput > max then max
else if throughput < min then min
else throughput

member x.UsePartitionKey =
match x with
| Fixed -> false
| Unlimited -> true
module internal Throughput =
let min = 400
let max = 1_000_000

let correct (i:int) =
let throughput = Math.Round((decimal i) / 100m, 0) * 100m |> int
if throughput > max then max
else if throughput < min then min
else throughput

type Configuration = {
DatabaseName : string
ServiceEndpoint : Uri
AuthKey : string
Capacity: Capacity
Throughput: int
}
with
static member CreateDefault serviceEndpoint authKey = {
DatabaseName = "EventStore"
ServiceEndpoint = serviceEndpoint
AuthKey = authKey
Capacity = Fixed
Throughput = Fixed.ThroughputLimits |> fst
Throughput = Throughput.min
}
21 changes: 9 additions & 12 deletions src/CosmoStore/CosmosDb/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ open FSharp.Control.Tasks.V2
open CosmoStore
open System.Reflection
open System.IO
open CosmoStore.CosmosDb

let private collectionName = "Events"
let private partitionKey = "streamId"
Expand All @@ -19,12 +20,11 @@ let private createDatabase dbName (client:DocumentClient) =
return ()
}

let private createCollection (dbUri:Uri) (capacity:Capacity) (throughput:int) (client:DocumentClient) =
let private createCollection (dbUri:Uri) (throughput:int) (client:DocumentClient) =
let collection = DocumentCollection( Id = collectionName)

// partition key
if capacity.UsePartitionKey then
collection.PartitionKey.Paths.Add(sprintf "/%s" partitionKey)
// always use partition key
collection.PartitionKey.Paths.Add(sprintf "/%s" partitionKey)

// unique keys
let streamPath = new System.Collections.ObjectModel.Collection<string>()
Expand All @@ -35,7 +35,7 @@ let private createCollection (dbUri:Uri) (capacity:Capacity) (throughput:int) (c
collection.UniqueKeyPolicy <- new UniqueKeyPolicy(UniqueKeys = keys)

// throughput
let throughput = throughput |> capacity.CorrectThroughput
let throughput = throughput |> Throughput.correct
let ro = new RequestOptions()
ro.OfferThroughput <- Nullable<int>(throughput)

Expand Down Expand Up @@ -133,9 +133,7 @@ let private getEvent (client:DocumentClient) (collectionUri:Uri) streamId positi
return events.Head
}

let private getRequestOptions usePartitionKey streamId =
if usePartitionKey then RequestOptions(PartitionKey = PartitionKey(streamId))
else RequestOptions()
let private getRequestOptions streamId = RequestOptions(PartitionKey = PartitionKey(streamId))

let getEventStore (configuration:Configuration) =
let client = new DocumentClient(configuration.ServiceEndpoint, configuration.AuthKey)
Expand All @@ -145,14 +143,13 @@ let getEventStore (configuration:Configuration) =

task {
do! createDatabase configuration.DatabaseName client
do! createCollection dbUri configuration.Capacity configuration.Throughput client
do! createCollection dbUri configuration.Throughput client
do! createStoreProcedures eventsCollectionUri appendEventProcUri client
} |> Async.AwaitTask |> Async.RunSynchronously

let getOpts = getRequestOptions configuration.Capacity.UsePartitionKey
{
AppendEvent = appendEvent getOpts client appendEventProcUri
AppendEvents = appendEvents getOpts client appendEventProcUri
AppendEvent = appendEvent getRequestOptions client appendEventProcUri
AppendEvents = appendEvents getRequestOptions client appendEventProcUri
GetEvent = getEvent client eventsCollectionUri
GetEvents = getEvents client eventsCollectionUri
GetStreams = getStreams client eventsCollectionUri
Expand Down
71 changes: 27 additions & 44 deletions tests/CosmoStore.Tests/BasicTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,24 @@ open CosmoStore.TableStorage
module CosmosDb =
open CosmoStore.CosmosDb

let smallSize = 1000
let bigSize = 100000


let private getConfig throughput =
let conf =
CosmoStore.CosmosDb.Configuration.CreateDefault
(Uri "https://localhost:8081")
"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
let c = if throughput > 10000 then { conf with Capacity = Unlimited; Throughput = throughput } else { conf with Throughput = throughput }
let n = sprintf "EventStore_%i" throughput
{ c with DatabaseName = n }

let getCleanEventStore throughput =
let conf = getConfig throughput
let client = new DocumentClient(conf.ServiceEndpoint, conf.AuthKey)
let private config =
CosmoStore.CosmosDb.Configuration.CreateDefault
(Uri "https://localhost:8081")
"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="

let getCleanEventStore() =
let client = new DocumentClient(config.ServiceEndpoint, config.AuthKey)
try
do client.DeleteDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(conf.DatabaseName, "Events"))
do client.DeleteDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(config.DatabaseName, "Events"))
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore
with ex -> ()
conf |> EventStore.getEventStore
config |> EventStore.getEventStore

let eventStoreSmall =
smallSize |> getCleanEventStore
let eventStoreBig =
bigSize |> getCleanEventStore
let eventStore = getCleanEventStore()

module TableStorage =
open CosmoStore.TableStorage
open Microsoft.WindowsAzure.Storage

let private conf = Configuration.CreateDefaultForLocalEmulator()
Expand Down Expand Up @@ -78,23 +65,19 @@ let checkPosition acc (item:EventRead) =
item.Position

type StoreType =
| CosmosSmall = 0
| CosmosBig = 1
| TableStorage = 2
| CosmosDB = 0
| TableStorage = 1

let getEventStore = function
| StoreType.CosmosSmall -> CosmosDb.eventStoreSmall
| StoreType.CosmosBig -> CosmosDb.eventStoreBig
| StoreType.CosmosDB -> CosmosDb.eventStore
| StoreType.TableStorage -> TableStorage.eventStore

let getCleanEventStore = function
| StoreType.CosmosSmall -> CosmosDb.getCleanEventStore CosmosDb.smallSize
| StoreType.CosmosBig -> CosmosDb.getCleanEventStore CosmosDb.bigSize
| StoreType.CosmosDB -> CosmosDb.getCleanEventStore()
| StoreType.TableStorage -> TableStorage.getCleanEventStore()


[<Test>]
let ``Appends event`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Appends event`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()

Expand All @@ -107,7 +90,7 @@ let ``Appends event`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, Stor
)

[<Test>]
let ``Get event`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get event`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()

Expand All @@ -122,7 +105,7 @@ let ``Get event`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreTyp
Assert.AreEqual("Created_3", event.Name)

[<Test>]
let ``Get events (all)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get events (all)`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()

Expand All @@ -137,7 +120,7 @@ let ``Get events (all)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, S
events |> List.fold checkPosition 0L |> ignore

[<Test>]
let ``Get events (from position)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get events (from position)`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()

Expand All @@ -152,7 +135,7 @@ let ``Get events (from position)`` ([<Values(StoreType.CosmosSmall, StoreType.Co
events |> List.fold checkPosition 5L |> ignore

[<Test>]
let ``Get events (to position)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get events (to position)`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()

Expand All @@ -167,7 +150,7 @@ let ``Get events (to position)`` ([<Values(StoreType.CosmosSmall, StoreType.Cosm
events |> List.fold checkPosition 0L |> ignore

[<Test>]
let ``Get events (position range)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get events (position range)`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()

Expand All @@ -182,7 +165,7 @@ let ``Get events (position range)`` ([<Values(StoreType.CosmosSmall, StoreType.C
events |> List.fold checkPosition 4L |> ignore

[<Test>]
let ``Get streams (all)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get streams (all)`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let addEventToStream i =
[1..99]
Expand All @@ -200,7 +183,7 @@ let ``Get streams (all)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig,
Assert.AreEqual("A_3", streams.[2].Id)

[<Test>]
let ``Get streams (startswith)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get streams (startswith)`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let startsWith = Guid.NewGuid().ToString("N")
let addEventToStream i =
Expand All @@ -214,7 +197,7 @@ let ``Get streams (startswith)`` ([<Values(StoreType.CosmosSmall, StoreType.Cosm
Assert.AreEqual(sprintf "X2_%s" startsWith, streams.Head.Id)

[<Test>]
let ``Get streams (endswith)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get streams (endswith)`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let endsWith = Guid.NewGuid().ToString("N")
let addEventToStream i =
Expand All @@ -229,7 +212,7 @@ let ``Get streams (endswith)`` ([<Values(StoreType.CosmosSmall, StoreType.Cosmos
Assert.AreEqual(sprintf "X1_%s" endsWith, streams.Head.Id)

[<Test>]
let ``Get streams (contains)`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Get streams (contains)`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let contains = Guid.NewGuid().ToString("N")
let addEventToStream i =
Expand All @@ -244,7 +227,7 @@ let ``Get streams (contains)`` ([<Values(StoreType.CosmosSmall, StoreType.Cosmos
Assert.AreEqual(sprintf "C_%s_1" contains, streams.Head.Id)

[<Test>]
let ``Fails to append to existing position`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Fails to append to existing position`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()

Expand All @@ -266,7 +249,7 @@ let ``Fails to append to existing position`` ([<Values(StoreType.CosmosSmall, St
)

[<Test>]
let ``Fails to append to existing stream if is not expected to exist`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Fails to append to existing stream if is not expected to exist`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()
Assert.Throws<AggregateException>(fun _ ->
Expand All @@ -287,7 +270,7 @@ let ``Fails to append to existing stream if is not expected to exist`` ([<Values
)

[<Test>]
let ``Appends events`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Appends events`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()

Expand Down
4 changes: 2 additions & 2 deletions tests/CosmoStore.Tests/Issues.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ open CosmoStore
open CosmoStore.Tests.BasicTests

[<Test>]
let ``Can read back Events stored without metadata`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``Can read back Events stored without metadata`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getCleanEventStore
let streamId = getStreamId()
let event =
Expand All @@ -19,7 +19,7 @@ let ``Can read back Events stored without metadata`` ([<Values(StoreType.CosmosS
|> ignore

[<Test>]
let ``NoStream Position check works for non-existing stream`` ([<Values(StoreType.CosmosSmall, StoreType.CosmosBig, StoreType.TableStorage)>] (typ:StoreType)) =
let ``NoStream Position check works for non-existing stream`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = getStreamId()
let event =
Expand Down

0 comments on commit f7305ab

Please sign in to comment.