Skip to content

Commit

Permalink
Fixed #2643 Added a data adapter to azure queue stream provider by @j…
Browse files Browse the repository at this point in the history
…ason-bragg (#2658)

* Added a data adapter to azure queue stream provider
The data adapter allows users to use the new v2 versions of sequence token and batch container without risking changes to the original versions, which are being used in production services.
The adapter also allows service developers to control how data is structured in azure queues, enabling the streaming of azure queue events of legacy systems or systems integrated with other non-orleans services.
  • Loading branch information
sergeybykov authored and benjaminpetit committed Jan 30, 2017
1 parent 9ab2d2a commit c6457f2
Show file tree
Hide file tree
Showing 30 changed files with 487 additions and 433 deletions.
4 changes: 2 additions & 2 deletions src/OrleansAWSUtils/Streams/SQSBatchContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace OrleansAWSUtils.Streams
internal class SQSBatchContainer : IBatchContainer
{
[JsonProperty]
private EventSequenceToken sequenceToken;
private EventSequenceTokenV2 sequenceToken;

[JsonProperty]
private readonly List<object> events;
Expand Down Expand Up @@ -45,7 +45,7 @@ private SQSBatchContainer(
String streamNamespace,
List<object> events,
Dictionary<string, object> requestContext,
EventSequenceToken sequenceToken)
EventSequenceTokenV2 sequenceToken)
: this(streamGuid, streamNamespace, events, requestContext)
{
this.sequenceToken = sequenceToken;
Expand Down
30 changes: 23 additions & 7 deletions src/OrleansAzureUtils/OrleansAzureUtils.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
<Compile Include="MultiClusterNetwork\GossipTableInstanceManager.cs" />
<Compile Include="Providers\AzureConfigurationExtensions.cs" />
<Compile Include="Providers\Storage\AzureBlobStorage.cs" />
<Compile Include="Providers\Streams\AzureQueue\AzureQueueAdapterConstants.cs" />
<Compile Include="Providers\Streams\AzureQueue\AzureQueueBatchContainerV2.cs" />
<Compile Include="Providers\Streams\AzureQueue\IAzureQueueDataAdapter.cs" />
<Compile Include="Providers\Streams\PersistentStreams\AzureTableStorageStreamFailureHandler.cs" />
<Compile Include="Providers\Streams\PersistentStreams\StreamDeliveryFailureEntity.cs" />
<Compile Include="Storage\AzureBasedMembershipTable.cs" />
Expand Down Expand Up @@ -112,11 +114,25 @@
<None Include="project.json" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
<!-- Begin Orleans: Without these lines the project won't build properly -->
<PropertyGroup>
<OrleansProjectType>Server</OrleansProjectType>
</PropertyGroup>
<!-- Set path to ClientGenerator.exe -->
<Choose>
<When Condition="'$(builduri)' != ''">
<PropertyGroup>
<!-- TFS build -->
<OrleansReferencesBase>$(TargetDir)</OrleansReferencesBase>
</PropertyGroup>
</When>
<Otherwise>
<PropertyGroup>
<!-- Visual Studio or MsBuild .sln build -->
<OrleansReferencesBase>$(ProjectDir)..\ClientGenerator\$(OutputPath)</OrleansReferencesBase>
</PropertyGroup>
</Otherwise>
</Choose>
<Import Project="$(ProjectDir)..\Orleans.SDK.targets" />
<!--End Orleans -->
</Project>
4 changes: 3 additions & 1 deletion src/OrleansAzureUtils/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Orleans.CodeGeneration;
using Orleans.Providers.Streams.AzureQueue;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
Expand All @@ -23,4 +24,5 @@
[assembly: InternalsVisibleTo("UnitTestGrains")]
[assembly: InternalsVisibleTo("Orleans.NonSiloTests")]
[assembly: InternalsVisibleTo("Tester.AzureUtils")]
[assembly: SkipCodeGeneration]
[assembly: GenerateSerializer(typeof(AzureQueueBatchContainerV2))]

68 changes: 61 additions & 7 deletions src/OrleansAzureUtils/Providers/AzureConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public static void AddAzureQueueStreamProvider(
this ClusterConfiguration config,
string providerName,
string connectionString = null,
int numberOfQueues = AzureQueueAdapterFactory.NumQueuesDefaultValue,
int numberOfQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue,
string deploymentId = null,
int cacheSize = AzureQueueAdapterFactory.CacheSizeDefaultValue,
int cacheSize = AzureQueueAdapterConstants.CacheSizeDefaultValue,
PersistentStreamProviderState startupState = AzureQueueStreamProvider.StartupStateDefaultValue,
PersistentStreamProviderConfig persistentStreamProviderConfig = null)
{
Expand All @@ -112,6 +112,33 @@ public static void AddAzureQueueStreamProvider(
config.Globals.RegisterStreamProvider<AzureQueueStreamProvider>(providerName, properties);
}

/// <summary>
/// Adds a stream provider of type <see cref="AzureQueueStreamProviderV2"/>.
/// </summary>
/// <param name="config">The cluster configuration object to add provider to.</param>
/// <param name="providerName">The provider name</param>
/// <param name="connectionString">The azure storage connection string. If none is provided, it will use the same as in the Globals configuration.</param>
/// <param name="numberOfQueues">The number of queues to use as partitions.</param>
/// <param name="deploymentId">The deployment ID used for partitioning. If none is specified, the provider will use the same DeploymentId as the Cluster.</param>
/// <param name="cacheSize">The cache size.</param>
/// <param name="startupState">The startup state of the persistent stream provider.</param>
/// <param name="persistentStreamProviderConfig">Settings related to all persistent stream providers.</param>
public static void AddAzureQueueStreamProviderV2(
this ClusterConfiguration config,
string providerName,
string connectionString = null,
int numberOfQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue,
string deploymentId = null,
int cacheSize = AzureQueueAdapterConstants.CacheSizeDefaultValue,
PersistentStreamProviderState startupState = AzureQueueStreamProvider.StartupStateDefaultValue,
PersistentStreamProviderConfig persistentStreamProviderConfig = null)
{
connectionString = GetConnectionString(connectionString, config);
deploymentId = deploymentId ?? config.Globals.DeploymentId;
var properties = GetAzureQueueStreamProviderProperties(providerName, connectionString, numberOfQueues, deploymentId, cacheSize, startupState, persistentStreamProviderConfig);
config.Globals.RegisterStreamProvider<AzureQueueStreamProviderV2>(providerName, properties);
}

/// <summary>
/// Adds a stream provider of type <see cref="AzureQueueStreamProvider"/>.
/// </summary>
Expand All @@ -127,9 +154,9 @@ public static void AddAzureQueueStreamProvider(
this ClientConfiguration config,
string providerName,
string connectionString = null,
int numberOfQueues = AzureQueueAdapterFactory.NumQueuesDefaultValue,
int numberOfQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue,
string deploymentId = null,
int cacheSize = AzureQueueAdapterFactory.CacheSizeDefaultValue,
int cacheSize = AzureQueueAdapterConstants.CacheSizeDefaultValue,
PersistentStreamProviderState startupState = AzureQueueStreamProvider.StartupStateDefaultValue,
PersistentStreamProviderConfig persistentStreamProviderConfig = null)
{
Expand All @@ -139,16 +166,43 @@ public static void AddAzureQueueStreamProvider(
config.RegisterStreamProvider<AzureQueueStreamProvider>(providerName, properties);
}

/// <summary>
/// Adds a stream provider of type <see cref="AzureQueueStreamProviderV2"/>.
/// </summary>
/// <param name="config">The cluster configuration object to add provider to.</param>
/// <param name="providerName">The provider name</param>
/// <param name="connectionString">The azure storage connection string. If none is provided, it will use the same as in the Globals configuration.</param>
/// <param name="numberOfQueues">The number of queues to use as partitions.</param>
/// <param name="deploymentId">The deployment ID used for partitioning. If none is specified, the provider will use the same DeploymentId as the Cluster.</param>
/// <param name="cacheSize">The cache size.</param>
/// <param name="startupState">The startup state of the persistent stream provider.</param>
/// <param name="persistentStreamProviderConfig">Settings related to all persistent stream providers.</param>
public static void AddAzureQueueStreamProviderV2(
this ClientConfiguration config,
string providerName,
string connectionString = null,
int numberOfQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue,
string deploymentId = null,
int cacheSize = AzureQueueAdapterConstants.CacheSizeDefaultValue,
PersistentStreamProviderState startupState = AzureQueueStreamProvider.StartupStateDefaultValue,
PersistentStreamProviderConfig persistentStreamProviderConfig = null)
{
connectionString = GetConnectionString(connectionString, config);
deploymentId = deploymentId ?? config.DeploymentId;
var properties = GetAzureQueueStreamProviderProperties(providerName, connectionString, numberOfQueues, deploymentId, cacheSize, startupState, persistentStreamProviderConfig);
config.RegisterStreamProvider<AzureQueueStreamProviderV2>(providerName, properties);
}

private static Dictionary<string, string> GetAzureQueueStreamProviderProperties(string providerName, string connectionString, int numberOfQueues, string deploymentId, int cacheSize, PersistentStreamProviderState startupState, PersistentStreamProviderConfig persistentStreamProviderConfig)
{
if (string.IsNullOrWhiteSpace(providerName)) throw new ArgumentNullException(nameof(providerName));
if (numberOfQueues < 1) throw new ArgumentOutOfRangeException(nameof(numberOfQueues));

var properties = new Dictionary<string, string>
{
{ AzureQueueAdapterFactory.DataConnectionStringPropertyName, connectionString },
{ AzureQueueAdapterFactory.NumQueuesPropertyName, numberOfQueues.ToString() },
{ AzureQueueAdapterFactory.DeploymentIdPropertyName, deploymentId },
{ AzureQueueAdapterConstants.DataConnectionStringPropertyName, connectionString },
{ AzureQueueAdapterConstants.NumQueuesPropertyName, numberOfQueues.ToString() },
{ AzureQueueAdapterConstants.DeploymentIdPropertyName, deploymentId },
{ SimpleQueueAdapterCache.CacheSizePropertyName, cacheSize.ToString() },
{ AzureQueueStreamProvider.StartupStatePropertyName, startupState.ToString() },
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand All @@ -7,42 +8,42 @@

namespace Orleans.Providers.Streams.AzureQueue
{
internal class AzureQueueAdapter : IQueueAdapter
internal class AzureQueueAdapter<TDataAdapter> : IQueueAdapter
where TDataAdapter : IAzureQueueDataAdapter, new()
{
protected readonly string DeploymentId;
protected readonly string DataConnectionString;
protected readonly TimeSpan? MessageVisibilityTimeout;
private readonly HashRingBasedStreamQueueMapper streamQueueMapper;
protected readonly ConcurrentDictionary<QueueId, AzureQueueDataManager> Queues = new ConcurrentDictionary<QueueId, AzureQueueDataManager>();
protected readonly IAzureQueueDataAdapter dataAdapter;

public string Name { get ; private set; }
public bool IsRewindable { get { return false; } }
public string Name { get ; }
public bool IsRewindable => false;

public StreamProviderDirection Direction { get { return StreamProviderDirection.ReadWrite; } }
public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite;

public AzureQueueAdapter(HashRingBasedStreamQueueMapper streamQueueMapper, string dataConnectionString, string deploymentId, string providerName, TimeSpan? messageVisibilityTimeout = null)
{
if (String.IsNullOrEmpty(dataConnectionString)) throw new ArgumentNullException("dataConnectionString");
if (String.IsNullOrEmpty(deploymentId)) throw new ArgumentNullException("deploymentId");
if (string.IsNullOrEmpty(dataConnectionString)) throw new ArgumentNullException(nameof(dataConnectionString));
if (string.IsNullOrEmpty(deploymentId)) throw new ArgumentNullException(nameof(deploymentId));

DataConnectionString = dataConnectionString;
DeploymentId = deploymentId;
Name = providerName;
MessageVisibilityTimeout = messageVisibilityTimeout;
this.streamQueueMapper = streamQueueMapper;
this.dataAdapter = new TDataAdapter();
}

public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
{
return AzureQueueAdapterReceiver.Create(queueId, DataConnectionString, DeploymentId, MessageVisibilityTimeout);
return AzureQueueAdapterReceiver.Create(queueId, DataConnectionString, DeploymentId, this.dataAdapter, MessageVisibilityTimeout);
}

public async Task QueueMessageBatchAsync<T>(Guid streamGuid, String streamNamespace, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
public async Task QueueMessageBatchAsync<T>(Guid streamGuid, string streamNamespace, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
{
if(token != null)
{
throw new ArgumentException("AzureQueue stream provider currently does not support non-null StreamSequenceToken.", "token");
}
if(token != null) throw new ArgumentException("AzureQueue stream provider currently does not support non-null StreamSequenceToken.", nameof(token));
var queueId = streamQueueMapper.GetQueueForStream(streamGuid, streamNamespace);
AzureQueueDataManager queue;
if (!Queues.TryGetValue(queueId, out queue))
Expand All @@ -51,7 +52,7 @@ public async Task QueueMessageBatchAsync<T>(Guid streamGuid, String streamNamesp
await tmpQueue.InitQueueAsync();
queue = Queues.GetOrAdd(queueId, tmpQueue);
}
var cloudMsg = AzureQueueBatchContainer.ToCloudQueueMessage(streamGuid, streamNamespace, events, requestContext);
var cloudMsg = this.dataAdapter.ToCloudQueueMessage(streamGuid, streamNamespace, events, requestContext);
await queue.AddQueueMessage(cloudMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

namespace Orleans.Providers.Streams.AzureQueue
{
/// <summary>
/// Azure queue stream provider constants.
/// </summary>
public static class AzureQueueAdapterConstants
{
internal const int CacheSizeDefaultValue = 4096;

/// <summary>"DataConnectionString".</summary>
public const string DataConnectionStringPropertyName = "DataConnectionString";
/// <summary>"DeploymentId".</summary>
public const string DeploymentIdPropertyName = "DeploymentId";
/// <summary>"MessageVisibilityTimeout".</summary>
public const string MessageVisibilityTimeoutPropertyName = "VisibilityTimeout";

/// <summary>"NumQueues".</summary>
public const string NumQueuesPropertyName = "NumQueues";
/// <summary> Default number of Azure Queue used in this stream provider.</summary>
public const int NumQueuesDefaultValue = 8; // keep as power of 2.
}
}
Loading

0 comments on commit c6457f2

Please sign in to comment.