Skip to content

Commit

Permalink
Add stream deletion feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Thompson committed Oct 5, 2018
1 parent 9c9049f commit e0e4763
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Threading.Tasks;
using NUnit.Framework;
using SimpleEventStore.Tests;

namespace SimpleEventStore.AzureDocumentDb.Tests
{
[TestFixture]
public class AzureDocumentDbEventStoreDeletingStream : EventStoreDeletingStream
{
protected override Task<IStorageEngine> CreateStorageEngine()
{
return StorageEngineFactory.Create("DeletingStreamTests");
}
}
}
40 changes: 32 additions & 8 deletions SimpleEventStore.AzureDocumentDb/AzureDocumentDbStorageEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ namespace SimpleEventStore.AzureDocumentDb
internal class AzureDocumentDbStorageEngine : IStorageEngine
{
private const string AppendStoredProcedureName = "appendToStream";
private const string DeleteStoredProcedureName = "deleteStream";
private const string ConcurrencyConflictErrorKey = "Concurrency conflict.";

private readonly DocumentClient client;
private readonly string databaseName;
private readonly CollectionOptions collectionOptions;
private readonly Uri commitsLink;
private readonly Uri storedProcLink;
private readonly Uri appendStoredProcedureLink;
private readonly Uri deleteStoredProcedureLink;
private readonly LoggingOptions loggingOptions;
private readonly ISerializationTypeMap typeMap;

Expand All @@ -27,7 +29,8 @@ internal AzureDocumentDbStorageEngine(DocumentClient client, string databaseName
this.databaseName = databaseName;
this.collectionOptions = collectionOptions;
commitsLink = UriFactory.CreateDocumentCollectionUri(databaseName, collectionOptions.CollectionName);
storedProcLink = UriFactory.CreateStoredProcedureUri(databaseName, collectionOptions.CollectionName, AppendStoredProcedureName);
appendStoredProcedureLink = UriFactory.CreateStoredProcedureUri(databaseName, collectionOptions.CollectionName, AppendStoredProcedureName);
deleteStoredProcedureLink = UriFactory.CreateStoredProcedureUri(databaseName, collectionOptions.CollectionName, DeleteStoredProcedureName);
this.loggingOptions = loggingOptions;
this.typeMap = typeMap;
}
Expand All @@ -36,7 +39,8 @@ public async Task<IStorageEngine> Initialise()
{
await CreateDatabaseIfItDoesNotExist();
await CreateCollectionIfItDoesNotExist();
await CreateAppendStoredProcedureIfItDoesNotExist();
await CreateStoredProcedureIfItDoesNotExist(AppendStoredProcedureName, "appendToStream.js");
await CreateStoredProcedureIfItDoesNotExist(DeleteStoredProcedureName, "deleteStream.js");

return this;
}
Expand All @@ -48,7 +52,7 @@ public async Task AppendToStream(string streamId, IEnumerable<StorageEvent> even
try
{
var result = await client.ExecuteStoredProcedureAsync<dynamic>(
storedProcLink,
appendStoredProcedureLink,
new RequestOptions { PartitionKey = new PartitionKey(streamId), ConsistencyLevel = collectionOptions.ConsistencyLevel },
docs);

Expand Down Expand Up @@ -90,6 +94,26 @@ public async Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string s
return events.AsReadOnly();
}

public async Task DeleteStream(string streamId)
{
while (true)
{
var result = await client.ExecuteStoredProcedureAsync<dynamic>(
deleteStoredProcedureLink,
new RequestOptions { PartitionKey = new PartitionKey(streamId), ConsistencyLevel = collectionOptions.ConsistencyLevel },
streamId);

if ((bool)result.Response.continuation)
{
continue;
}

loggingOptions.OnSuccess(ResponseInformation.FromWriteResponse(nameof(DeleteStream), result));

break;
}
}

private async Task CreateDatabaseIfItDoesNotExist()
{
await client.CreateDatabaseIfNotExistsAsync(new Database { Id = databaseName });
Expand Down Expand Up @@ -117,18 +141,18 @@ private async Task CreateCollectionIfItDoesNotExist()
await client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, collection, requestOptions);
}

private async Task CreateAppendStoredProcedureIfItDoesNotExist()
private async Task CreateStoredProcedureIfItDoesNotExist(string procedureName, string resourceName)
{
var query = client.CreateStoredProcedureQuery(commitsLink)
.Where(x => x.Id == AppendStoredProcedureName)
.Where(x => x.Id == procedureName)
.AsDocumentQuery();

if (!(await query.ExecuteNextAsync<StoredProcedure>()).Any())
{
await client.CreateStoredProcedureAsync(commitsLink, new StoredProcedure
{
Id = AppendStoredProcedureName,
Body = Resources.GetString("appendToStream.js")
Id = procedureName,
Body = Resources.GetString(resourceName)
});
}
}
Expand Down
73 changes: 73 additions & 0 deletions SimpleEventStore.AzureDocumentDb/Resources/deleteStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
function deleteStream(streamId) {
var context = getContext();
var collection = context.getCollection();
var collectionLink = collection.getSelfLink();
var response = getContext().getResponse();

// Based on https://raw.githubusercontent.com/Azure/azure-cosmosdb-js-server/master/samples/stored-procedures/bulkDelete.js
var query = {
query: "SELECT c._self FROM Commits c WHERE c.streamId = @streamId ORDER BY c.eventNumber ASC",
parameters: [{ name: "@streamId", value: streamId }]
};

var responseBody = {
deleted: 0,
continuation: true
};

tryQueryAndDelete();

// Recursively runs the query w/ support for continuation tokens.
// Calls tryDelete(documents) as soon as the query returns documents.
function tryQueryAndDelete(continuation) {
var requestOptions = { continuation: continuation };

var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function (err, retrievedDocs, responseOptions) {
if (err) throw err;

if (retrievedDocs.length > 0) {
// Begin deleting documents as soon as documents are returned form the query results.
// tryDelete() resumes querying after deleting; no need to page through continuation tokens.
// - this is to prioritize writes over reads given timeout constraints.
tryDelete(retrievedDocs);
} else if (responseOptions.continuation) {
// Else if the query came back empty, but with a continuation token; repeat the query w/ the token.
tryQueryAndDelete(responseOptions.continuation);
} else {
// Else if there are no more documents and no continuation token - we are finished deleting documents.
responseBody.continuation = false;
response.setBody(responseBody);
}
});

// If we hit execution bounds - return continuation: true.
if (!isAccepted) {
response.setBody(responseBody);
}
}

// Recursively deletes documents passed in as an array argument.
// Attempts to query for more on empty array.
function tryDelete(documents) {
if (documents.length > 0) {
// Delete the first document in the array.
var isAccepted = collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
if (err) throw err;

responseBody.deleted++;
documents.shift();

// Delete the next document in the array.
tryDelete(documents);
});

// If we hit execution bounds - return continuation: true.
if (!isAccepted) {
response.setBody(responseBody);
}
} else {
// If the document array is empty, query for more documents.
tryQueryAndDelete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
<PackageProjectUrl>https://github.com/GivePenny/SimpleEventStore</PackageProjectUrl>
<OutputTypeEx>library</OutputTypeEx>
</PropertyGroup>
<ItemGroup>
<None Remove="Resources\deleteStream.js" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.DocumentDB.Core" Version="2.1.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\SimpleEventStore\SimpleEventStore.csproj" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Resources\deleteStream.js" />
<EmbeddedResource Include="Resources\appendToStream.js" />
</ItemGroup>
</Project>
43 changes: 43 additions & 0 deletions SimpleEventStore.Tests/EventStoreDeletingStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;
using SimpleEventStore.Tests.Events;

namespace SimpleEventStore.Tests
{
[TestFixture]
public abstract class EventStoreDeletingStream : EventStoreTestBase
{
[Test]
public async Task when_deleting_stream_all_events_in_stream_are_deleted()
{
var streamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();
var @event = new EventData(Guid.NewGuid(), new OrderCreated(streamId));

await subject.AppendToStream(streamId, 0, @event);

await subject.DeleteStream(streamId);

var stream = await subject.ReadStreamForwards(streamId);
Assert.That(stream.Count, Is.EqualTo(0));
}

[Test]
public async Task when_deleting_stream_events_in_other_streams_are_preserved()
{
var deleteStreamId = Guid.NewGuid().ToString();
var keepStreamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();

await subject.AppendToStream(keepStreamId, 0, new EventData(Guid.NewGuid(), new OrderCreated(keepStreamId)));
await subject.AppendToStream(deleteStreamId, 0, new EventData(Guid.NewGuid(), new OrderCreated(deleteStreamId)));

await subject.DeleteStream(deleteStreamId);

var stream = await subject.ReadStreamForwards(keepStreamId);
Assert.That(stream.Count, Is.EqualTo(1));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Threading.Tasks;
using NUnit.Framework;
using SimpleEventStore.InMemory;

namespace SimpleEventStore.Tests.InMemory
{
[TestFixture]
public class InMemoryEventStoreDeletingStream : EventStoreDeletingStream
{
protected override Task<IStorageEngine> CreateStorageEngine()
{
return Task.FromResult((IStorageEngine)new InMemoryStorageEngine());
}
}
}
19 changes: 8 additions & 11 deletions SimpleEventStore.sln
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.25420.1
# Visual Studio 15
VisualStudioVersion = 15.0.28010.2041
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleEventStore", "SimpleEventStore\SimpleEventStore.csproj", "{73235465-69BF-4762-B8C5-20C8E45795FF}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleEventStore", "SimpleEventStore\SimpleEventStore.csproj", "{73235465-69BF-4762-B8C5-20C8E45795FF}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{CD241C9A-0A56-42C9-8309-D68890C78B64}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleEventStore.Tests", "SimpleEventStore.Tests\SimpleEventStore.Tests.csproj", "{ACA6B3AE-FCB9-45F4-9D6B-66196F98F819}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleEventStore.Tests", "SimpleEventStore.Tests\SimpleEventStore.Tests.csproj", "{ACA6B3AE-FCB9-45F4-9D6B-66196F98F819}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleEventStore.AzureDocumentDb", "SimpleEventStore.AzureDocumentDb\SimpleEventStore.AzureDocumentDb.csproj", "{48C71940-D9B0-446A-9F3D-E6275CD43440}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleEventStore.AzureDocumentDb", "SimpleEventStore.AzureDocumentDb\SimpleEventStore.AzureDocumentDb.csproj", "{48C71940-D9B0-446A-9F3D-E6275CD43440}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleEventStore.AzureDocumentDb.Tests", "SimpleEventStore.AzureDocumentDb.Tests\SimpleEventStore.AzureDocumentDb.Tests.csproj", "{205A7F81-A496-4400-9A97-D156F88B7883}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleEventStore.AzureDocumentDb.Tests", "SimpleEventStore.AzureDocumentDb.Tests\SimpleEventStore.AzureDocumentDb.Tests.csproj", "{205A7F81-A496-4400-9A97-D156F88B7883}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -39,8 +37,7 @@ Global
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{ACA6B3AE-FCB9-45F4-9D6B-66196F98F819} = {CD241C9A-0A56-42C9-8309-D68890C78B64}
{205A7F81-A496-4400-9A97-D156F88B7883} = {CD241C9A-0A56-42C9-8309-D68890C78B64}
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {936D1C6C-3474-4DFB-A983-FF729D21B72B}
EndGlobalSection
EndGlobal
9 changes: 8 additions & 1 deletion SimpleEventStore/EventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public Task AppendToStream(string streamId, int expectedVersion, params EventDat
var storageEvents = new List<StorageEvent>();
var eventVersion = expectedVersion;

for (int i = 0; i < events.Length; i++)
for (var i = 0; i < events.Length; i++)
{
storageEvents.Add(new StorageEvent(streamId, events[i], ++eventVersion));
}
Expand All @@ -41,5 +41,12 @@ public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamI

return engine.ReadStreamForwards(streamId, startPosition, numberOfEventsToRead);
}

public Task DeleteStream(string streamId)
{
Guard.IsNotNullOrEmpty(nameof(streamId), streamId);

return engine.DeleteStream(streamId);
}
}
}
2 changes: 2 additions & 0 deletions SimpleEventStore/IStorageEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public interface IStorageEngine
Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead);

Task<IStorageEngine> Initialise();

Task DeleteStream(string streamId);
}
}
17 changes: 17 additions & 0 deletions SimpleEventStore/InMemory/InMemoryStorageEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ public Task AppendToStream(string streamId, IEnumerable<StorageEvent> events)
});
}

public Task DeleteStream(string streamId)
{
if (!streams.ContainsKey(streamId))
{
return Task.CompletedTask;
}

foreach (var @event in streams[streamId])
{
allEvents.Remove(@event);
}

streams.TryRemove(streamId, out var removedStream);

return Task.CompletedTask;
}

private void AddEventsToAllStream(IEnumerable<StorageEvent> events)
{
foreach (var e in events)
Expand Down

0 comments on commit e0e4763

Please sign in to comment.