Skip to content

Commit

Permalink
EdgeHub: Fix MessageStore initial offset after restart (#603)
Browse files Browse the repository at this point in the history
* Fix sequential store

* Add tests
  • Loading branch information
varunpuranik authored and myagley committed Dec 5, 2018
1 parent 36bf9df commit 81f93dc
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ public class CheckpointStore : ICheckpointStore
{
readonly IEntityStore<string, CheckpointEntity> underlyingStore;

CheckpointStore(IEntityStore<string, CheckpointEntity> underlyingStore)
internal CheckpointStore(IEntityStore<string, CheckpointEntity> underlyingStore)
{
this.underlyingStore = underlyingStore;
}

public static CheckpointStore Create(IStoreProvider storeProvider)
{
IEntityStore<string, CheckpointEntity> underlyingStore = Preconditions.CheckNotNull(storeProvider, nameof(storeProvider)).GetEntityStore<string, CheckpointEntity>(Constants.CheckpointStorePartitionKey);
IEntityStore<string, CheckpointEntity> underlyingStore = Preconditions.CheckNotNull(storeProvider, nameof(storeProvider))
.GetEntityStore<string, CheckpointEntity>(Constants.CheckpointStorePartitionKey);
return new CheckpointStore(underlyingStore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void SetTimeToLive(TimeSpan timeSpan)

public async Task AddEndpoint(string endpointId)
{
ISequentialStore<MessageRef> sequentialStore = await this.storeProvider.GetSequentialStore<MessageRef>(endpointId);
CheckpointData checkpointData = await this.checkpointStore.GetCheckpointDataAsync(endpointId, CancellationToken.None);
ISequentialStore<MessageRef> sequentialStore = await this.storeProvider.GetSequentialStore<MessageRef>(endpointId, checkpointData.Offset + 1);
if (this.endpointSequentialStores.TryAdd(endpointId, sequentialStore))
{
Events.SequentialStoreAdded(endpointId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,26 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Storage
[Integration]
public class MessageStoreTest
{
[Fact]
public async Task BasicTest()
[Theory]
[InlineData(0)]
[InlineData(10150)]
[InlineData(-1)]
public async Task BasicTest(long initialCheckpointOffset)
{
(IMessageStore messageStore, ICheckpointStore checkpointStore) result = await this.GetMessageStore();
(IMessageStore messageStore, ICheckpointStore checkpointStore) result = await this.GetMessageStore(initialCheckpointOffset);
using (IMessageStore messageStore = result.messageStore)
{
for (int i = 0; i < 10000; i++)
{
if (i % 2 == 0)
{
long offset = await messageStore.Add("module1", this.GetMessage(i));
Assert.Equal(i / 2, offset);
Assert.Equal(initialCheckpointOffset + 1 + i / 2, offset);
}
else
{
long offset = await messageStore.Add("module2", this.GetMessage(i));
Assert.Equal(i / 2, offset);
Assert.Equal(initialCheckpointOffset + 1 + i / 2, offset);
}
}

Expand Down Expand Up @@ -285,6 +288,25 @@ IMessage GetMessage(int i)
return (messageStore, checkpointStore);
}

async Task<(IMessageStore, ICheckpointStore)> GetMessageStore(long initialCheckpointOffset, int ttlSecs = 300)
{
var dbStoreProvider = new InMemoryDbStoreProvider();
IStoreProvider storeProvider = new StoreProvider(dbStoreProvider);

IEntityStore<string, CheckpointStore.CheckpointEntity> checkpointUnderlyingStore = storeProvider.GetEntityStore<string, CheckpointStore.CheckpointEntity>($"Checkpoint{Guid.NewGuid().ToString()}");
if (initialCheckpointOffset >= 0)
{
await checkpointUnderlyingStore.Put("module1", new CheckpointStore.CheckpointEntity(initialCheckpointOffset, null, null));
await checkpointUnderlyingStore.Put("module2", new CheckpointStore.CheckpointEntity(initialCheckpointOffset, null, null));
}

ICheckpointStore checkpointStore = new CheckpointStore(checkpointUnderlyingStore);
IMessageStore messageStore = new MessageStore(storeProvider, checkpointStore, TimeSpan.FromSeconds(ttlSecs));
await messageStore.AddEndpoint("module1");
await messageStore.AddEndpoint("module2");
return (messageStore, checkpointStore);
}

[Fact]
public void MessageWrapperRoundtripTest()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace Microsoft.Azure.Devices.Edge.Storage
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;

/// <summary>
/// Provides stores that are higher level abstractions over the underlying key/value stores.
Expand All @@ -14,6 +15,8 @@ public interface IStoreProvider : IDisposable

Task<ISequentialStore<T>> GetSequentialStore<T>(string entityName);

Task<ISequentialStore<T>> GetSequentialStore<T>(string entityName, long defaultHeadOffset);

Task RemoveStore<T>(ISequentialStore<T> sequentialStore);

Task RemoveStore<TK, TV>(IEntityStore<TK, TV> entityStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ namespace Microsoft.Azure.Devices.Edge.Storage
/// </summary>
class SequentialStore<T> : ISequentialStore<T>
{
const int DefaultTailOffset = -1;
const int DefaultHeadOffset = 0;
const long DefaultHeadOffset = 0;
readonly IEntityStore<byte[], T> entityStore;
readonly AsyncLock headLockObject = new AsyncLock();
readonly AsyncLock tailLockObject = new AsyncLock();
Expand All @@ -40,14 +39,17 @@ class SequentialStore<T> : ISequentialStore<T>

public Task<IEnumerable<(long, T)>> GetBatch(long startingOffset, int batchSize) => this.GetBatch(startingOffset, batchSize, CancellationToken.None);

public static async Task<ISequentialStore<T>> Create(IEntityStore<byte[], T> entityStore)
public static Task<ISequentialStore<T>> Create(IEntityStore<byte[], T> entityStore)
=> Create(entityStore, DefaultHeadOffset);

public static async Task<ISequentialStore<T>> Create(IEntityStore<byte[], T> entityStore, long defaultHeadOffset)
{
Preconditions.CheckNotNull(entityStore, nameof(entityStore));
Option<(byte[] key, T value)> firstEntry = await entityStore.GetFirstEntry(CancellationToken.None);
Option<(byte[] key, T value)> lastEntry = await entityStore.GetLastEntry(CancellationToken.None);
long MapLocalFunction((byte[] key, T) e) => StoreUtils.GetOffsetFromKey(e.key);
long headOffset = firstEntry.Map(MapLocalFunction).GetOrElse(DefaultHeadOffset);
long tailOffset = lastEntry.Map(MapLocalFunction).GetOrElse(DefaultTailOffset);
long headOffset = firstEntry.Map(MapLocalFunction).GetOrElse(defaultHeadOffset);
long tailOffset = lastEntry.Map(MapLocalFunction).GetOrElse(defaultHeadOffset - 1);
var sequentialStore = new SequentialStore<T>(entityStore, headOffset, tailOffset);
return sequentialStore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public async Task<ISequentialStore<T>> GetSequentialStore<T>(string entityName)
return sequentialStore;
}

public async Task<ISequentialStore<T>> GetSequentialStore<T>(string entityName, long defaultHeadOffset)
{
IEntityStore<byte[], T> underlyingStore = this.GetEntityStore<byte[], T>(entityName);
ISequentialStore<T> sequentialStore = await SequentialStore<T>.Create(underlyingStore, defaultHeadOffset);
return sequentialStore;
}

public Task RemoveStore<T>(ISequentialStore<T> sequentialStore)
{
this.dbStoreProvider.RemoveDbStore(sequentialStore.EntityName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace Microsoft.Azure.Devices.Edge.Storage
{
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Devices.Edge.Util;

public static class StoreUtils
Expand All @@ -13,6 +14,7 @@ public static long GetOffsetFromKey(byte[] key)
Preconditions.CheckNotNull(key, nameof(key));
if (BitConverter.IsLittleEndian)
{
key = key.ToArray();
Array.Reverse(key);
}

Expand All @@ -26,6 +28,7 @@ public static byte[] GetKeyFromOffset(long offset)
byte[] bytes = BitConverter.GetBytes(offset);
if (BitConverter.IsLittleEndian)
{
bytes = bytes.ToArray();
Array.Reverse(bytes);
}

Expand Down
Loading

0 comments on commit 81f93dc

Please sign in to comment.