diff --git a/Orleans.sln b/Orleans.sln
index de03b2d8f4..e87bc8bf51 100644
--- a/Orleans.sln
+++ b/Orleans.sln
@@ -192,10 +192,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.TestingHost.Legacy"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.TestingHost.AppDomain", "test\TestInfrastructure\Orleans.TestingHost.AppDomain\Orleans.TestingHost.AppDomain.csproj", "{DF911257-3617-4B5C-9B78-AED17BA6DC9C}"
EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Transactions.DynamoDB", "src\AWS\Orleans.Transactions.DynamoDB\Orleans.Transactions.DynamoDB.csproj", "{17A7F27C-DBDE-4339-A7F5-18BD80C3F205}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Transactions.DynamoDB.Test", "test\Transactions\Orleans.Transactions.DynamoDB.Test\Orleans.Transactions.DynamoDB.Test.csproj", "{8B3EEA6B-BE00-482D-9625-15A70B534183}"
-EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DependencyInjection.Tests", "test\DependencyInjection.Tests\DependencyInjection.Tests.csproj", "{F23930FE-A219-49E1-8ECB-5A94F271EDC1}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grains", "Grains", "{2A128E88-B281-4BFB-ADEB-E515437F2385}"
@@ -1252,30 +1248,6 @@ Global
{DF911257-3617-4B5C-9B78-AED17BA6DC9C}.Release|x64.Build.0 = Release|Any CPU
{DF911257-3617-4B5C-9B78-AED17BA6DC9C}.Release|x86.ActiveCfg = Release|Any CPU
{DF911257-3617-4B5C-9B78-AED17BA6DC9C}.Release|x86.Build.0 = Release|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|x64.ActiveCfg = Debug|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|x64.Build.0 = Debug|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|x86.ActiveCfg = Debug|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|x86.Build.0 = Debug|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|Any CPU.Build.0 = Release|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x64.ActiveCfg = Release|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x64.Build.0 = Release|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x86.ActiveCfg = Release|Any CPU
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x86.Build.0 = Release|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|x64.ActiveCfg = Debug|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|x64.Build.0 = Debug|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|x86.ActiveCfg = Debug|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|x86.Build.0 = Debug|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|Any CPU.Build.0 = Release|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|x64.ActiveCfg = Release|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|x64.Build.0 = Release|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|x86.ActiveCfg = Release|Any CPU
- {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|x86.Build.0 = Release|Any CPU
{F23930FE-A219-49E1-8ECB-5A94F271EDC1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F23930FE-A219-49E1-8ECB-5A94F271EDC1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F23930FE-A219-49E1-8ECB-5A94F271EDC1}.Debug|x64.ActiveCfg = Debug|Any CPU
@@ -1385,8 +1357,6 @@ Global
{262A898E-0EED-4235-908B-322A699CDD01} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{AC640F05-E013-4D80-A1A4-E577D6883D16} = {B4E538C2-F1C4-4314-84EE-188C59579DED}
{DF911257-3617-4B5C-9B78-AED17BA6DC9C} = {B4E538C2-F1C4-4314-84EE-188C59579DED}
- {17A7F27C-DBDE-4339-A7F5-18BD80C3F205} = {DA8E126B-BCDB-4E8F-BFB9-2DBFD41F8F70}
- {8B3EEA6B-BE00-482D-9625-15A70B534183} = {E4550469-BCFB-4F3E-B778-3769DE18F45A}
{F23930FE-A219-49E1-8ECB-5A94F271EDC1} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{2A128E88-B281-4BFB-ADEB-E515437F2385} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{082D25DB-70CA-48F4-93E0-EC3455F494B8} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
diff --git a/Test.cmd b/Test.cmd
index 17a8ccb979..0e334c1d9b 100644
--- a/Test.cmd
+++ b/Test.cmd
@@ -35,7 +35,6 @@ set TESTS=^
%CMDHOME%\test\RuntimeCodeGen.Tests,^
%CMDHOME%\test\Transactions\Orleans.Transactions.Tests,^
%CMDHOME%\test\Transactions\Orleans.Transactions.Azure.Test,^
-%CMDHOME%\test\Transactions\Orleans.Transactions.DynamoDB.Test,^
%CMDHOME%\test\TestInfrastructure\Orleans.TestingHost.Tests,^
%CMDHOME%\test\DependencyInjection.Tests
diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Directory.Build.props b/src/AWS/Orleans.Transactions.DynamoDB/Directory.Build.props
deleted file mode 100644
index 05abe39f6f..0000000000
--- a/src/AWS/Orleans.Transactions.DynamoDB/Directory.Build.props
+++ /dev/null
@@ -1,35 +0,0 @@
-
-
- <_ParentDirectoryBuildPropsPath Condition="'$(_DirectoryBuildPropsFile)' != ''">$([System.IO.Path]::Combine('..', '$(_DirectoryBuildPropsFile)'))
-
-
-
-
-
- false
-
-
-
- true
-
-
-
-
- true
- https://github.com/dotnet/orleans
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Hosting/SiloBuilderExtensions.cs b/src/AWS/Orleans.Transactions.DynamoDB/Hosting/SiloBuilderExtensions.cs
deleted file mode 100644
index 355857e428..0000000000
--- a/src/AWS/Orleans.Transactions.DynamoDB/Hosting/SiloBuilderExtensions.cs
+++ /dev/null
@@ -1,46 +0,0 @@
-using System;
-using Microsoft.Extensions.DependencyInjection;
-using Orleans.Configuration;
-using Orleans.Transactions.DynamoDB;
-
-namespace Orleans.Hosting
-{
- public static class SiloBuilderExtensions
- {
- ///
- /// Configure cluster to use dynamoDB transaction log using configure action.
- ///
- public static ISiloHostBuilder UseDynamoDBTransactionLog(this ISiloHostBuilder builder, Action configureOptions)
- {
- return builder.UseDynamoDBTransactionLog(ob => ob.Configure(configureOptions));
- }
-
- ///
- /// Configure cluster to use dynamoDB transaction log using configuration builder.
- ///
- public static ISiloHostBuilder UseDynamoDBTransactionLog(this ISiloHostBuilder builder, Action> configureOptions)
- {
- return builder.ConfigureServices(services => services.UseDynamoDBTransactionLog(configureOptions));
- }
-
- ///
- /// Configure cluster service to use dynamoDB transaction log using configure action.
- ///
- public static IServiceCollection UseDynamoDBTransactionLog(this IServiceCollection services, Action configureOptions)
- {
- return services.UseDynamoDBTransactionLog(ob => ob.Configure(configureOptions));
- }
-
- ///
- /// Configure cluster service to use dynamoDB transaction log using configuration builder.
- ///
- public static IServiceCollection UseDynamoDBTransactionLog(this IServiceCollection services,
- Action> configureOptions)
- {
- configureOptions?.Invoke(services.AddOptions());
- services.AddTransient();
- services.AddTransient(DynamoDBTransactionLogStorage.Create);
- return services;
- }
- }
-}
diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Orleans.Transactions.DynamoDB.csproj b/src/AWS/Orleans.Transactions.DynamoDB/Orleans.Transactions.DynamoDB.csproj
deleted file mode 100644
index 9040e20df7..0000000000
--- a/src/AWS/Orleans.Transactions.DynamoDB/Orleans.Transactions.DynamoDB.csproj
+++ /dev/null
@@ -1,27 +0,0 @@
-
-
- Microsoft.Orleans.Transactions.DynamoDB
- Microsoft Orleans Transactions on DynamoDB
- DynamoDB Transaction library of Microsoft Orleans used on the server.
- $(PackageTags) DynamoDB Transactions
-
-
-
- Orleans.Transactions.DynamoDB
- Orleans.Transactions.DynamoDB
- $(DefineConstants);TRANSACTIONS_DYNAMODB
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Properties/AssemblyInfo.cs b/src/AWS/Orleans.Transactions.DynamoDB/Properties/AssemblyInfo.cs
deleted file mode 100644
index bf170eabdf..0000000000
--- a/src/AWS/Orleans.Transactions.DynamoDB/Properties/AssemblyInfo.cs
+++ /dev/null
@@ -1,3 +0,0 @@
-using System.Runtime.CompilerServices;
-
-[assembly: InternalsVisibleTo("AWSUtils.Tests")]
diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs
deleted file mode 100644
index 4b66e48794..0000000000
--- a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs
+++ /dev/null
@@ -1,67 +0,0 @@
-using Microsoft.Extensions.Options;
-using Orleans.Runtime;
-using Orleans.Transactions.DynamoDB;
-
-namespace Orleans.Configuration
-{
- public class DynamoDBTransactionLogOptions
- {
- ///
- /// AccessKey string for DynamoDB Storage
- ///
- [Redact]
- public string AccessKey { get; set; }
-
- ///
- /// Secret key for DynamoDB storage
- ///
- [Redact]
- public string SecretKey { get; set; }
-
- ///
- /// DynamoDB Service name
- ///
- public string Service { get; set; }
-
- ///
- /// Read capacity unit for DynamoDB storage
- ///
- public int ReadCapacityUnits { get; set; } = DynamoDBStorage.DefaultReadCapacityUnits;
-
- ///
- /// Write capacity unit for DynamoDB storage
- ///
- public int WriteCapacityUnits { get; set; } = DynamoDBStorage.DefaultWriteCapacityUnits;
-
- ///
- /// DynamoDB table name.
- /// Defaults to 'TransactionLog'.
- ///
- public string TableName { get; set; } = "TransactionLog";
- }
-
- public class DynamoDBTransactionLogOptionsValidator : IConfigurationValidator
- {
- private readonly DynamoDBTransactionLogOptions options;
-
- public DynamoDBTransactionLogOptionsValidator(IOptions configurationOptions)
- {
- this.options = configurationOptions.Value;
- }
-
- public void ValidateConfiguration()
- {
- if (string.IsNullOrWhiteSpace(this.options.TableName))
- throw new OrleansConfigurationException(
- $"Configuration for DynamoDBTransactionLogStorage is invalid. {nameof(this.options.TableName)} is not valid.");
-
- if (this.options.ReadCapacityUnits == 0)
- throw new OrleansConfigurationException(
- $"Configuration for DynamoDBTransactionLogStorage is invalid. {nameof(this.options.ReadCapacityUnits)} is not valid.");
-
- if (this.options.WriteCapacityUnits == 0)
- throw new OrleansConfigurationException(
- $"Configuration for DynamoDBTransactionLogStorage is invalid. {nameof(this.options.WriteCapacityUnits)} is not valid.");
- }
- }
-}
diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs
deleted file mode 100644
index f6df373684..0000000000
--- a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs
+++ /dev/null
@@ -1,407 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Options;
-using Microsoft.Extensions.DependencyInjection;
-using Orleans.Configuration;
-using Orleans.Serialization;
-using Orleans.Transactions.Abstractions;
-using Microsoft.Extensions.Logging;
-using Amazon.DynamoDBv2.Model;
-using Amazon.DynamoDBv2;
-using System.Globalization;
-using System.IO;
-
-namespace Orleans.Transactions.DynamoDB
-{
- public class DynamoDBTransactionLogStorage : ITransactionLogStorage
- {
-
- private const string RowKey = "RowKey";
- private const string PartitionKey = "PartitionKey";
- private const string AllocatedTransactionIdsKey = "AllocatedTransactionIds";
- private const string TransactionsKey = "Transactions";
- private const string RowKeyAlias = ":RowKey";
- private const string PartitionKeyAlias = ":PartitionKey";
-
- private const int BatchOperationLimit = 25;
- private const int CommitRecordsPerRow = 40;
- private const string CommitRecordPartitionKey = "0";
-
- private const string StartRowPartitionKey = "1";
- private static readonly AttributeValue StartRowRowKey = new AttributeValue { N = "0" };
-
- //TODO: jbragg - Do not use serializationManager for persistent data!!
- private readonly SerializationManager serializationManager;
- private readonly DynamoDBTransactionLogOptions options;
- private readonly ILoggerFactory loggerFactory;
-
- private DynamoDBStorage storage;
-
- private long startRecordValue;
- private long nextLogSequenceNumber;
-
- // Log iteration indexes, reused between operations
- private Dictionary currentLastEvaluatedKey;
- private List currentQueryResult;
- private int currentQueryResultIndex;
- private List currentRowTransactions;
- private int currentRowTransactionsIndex;
-
- public DynamoDBTransactionLogStorage(SerializationManager serializationManager, IOptions configurationOptions, ILoggerFactory loggerFactory)
- {
- this.serializationManager = serializationManager;
- this.options = configurationOptions.Value;
- this.loggerFactory = loggerFactory;
- }
-
- public async Task Initialize()
- {
- storage = new DynamoDBStorage(this.loggerFactory, this.options.Service, this.options.AccessKey, this.options.SecretKey,
- this.options.ReadCapacityUnits, this.options.WriteCapacityUnits);
- await storage.InitializeTable(this.options.TableName,
- new List
- {
- new KeySchemaElement { AttributeName = PartitionKey, KeyType = KeyType.HASH },
- new KeySchemaElement { AttributeName = RowKey, KeyType = KeyType.RANGE }
- },
- new List
- {
- new AttributeDefinition { AttributeName = PartitionKey, AttributeType = ScalarAttributeType.S },
- new AttributeDefinition { AttributeName = RowKey, AttributeType = ScalarAttributeType.N }
- }).ConfigureAwait(false);
-
- var (results, lastEvaluatedKey) = await storage.QueryAsync(this.options.TableName,
- new Dictionary
- {
- { PartitionKeyAlias, new AttributeValue(StartRowPartitionKey) },
- { RowKeyAlias, StartRowRowKey }
- },
- $"{PartitionKey} = {PartitionKeyAlias} AND {RowKey} <= {RowKeyAlias}",
- (fields) =>
- {
- return new StartRow(AttributeToLong(fields[AllocatedTransactionIdsKey]));
- }).ConfigureAwait(false);
-
- if (results.Count == 0)
- {
- // This is a fresh deployment, the StartRecord isn't created yet.
- // Create it here.
- await storage.PutEntryAsync(this.options.TableName,
- new Dictionary
- {
- { PartitionKey, new AttributeValue(StartRowPartitionKey) },
- { RowKey, StartRowRowKey },
- { AllocatedTransactionIdsKey, new AttributeValue { N = "0" } }
- }).ConfigureAwait(false);
-
- startRecordValue = 0;
- }
- else
- {
- startRecordValue = results[0].AllocatedTransactionIds;
- }
- }
-
- public static Factory> Create(IServiceProvider serviceProvider)
- {
- return async () =>
- {
- DynamoDBTransactionLogStorage logStorage = ActivatorUtilities.CreateInstance(serviceProvider, new object[0]);
- await logStorage.Initialize();
- return logStorage;
- };
- }
-
- public async Task GetFirstCommitRecord()
- {
- currentLastEvaluatedKey = null;
-
- await ReadRowsFromTable(0);
-
- if (currentQueryResult.Count == 0)
- {
- // The log has no log entries
- currentQueryResult = null;
-
- nextLogSequenceNumber = 1;
-
- return null;
- }
-
- currentRowTransactions = DeserializeCommitRecords(currentQueryResult[0].Transactions);
-
- // TODO: Assert not empty?
-
- nextLogSequenceNumber = currentRowTransactions[currentRowTransactionsIndex].LSN + 1;
-
- return currentRowTransactions[currentRowTransactionsIndex++];
- }
-
- public async Task GetNextCommitRecord()
- {
- // Based on the current implementation logic in TransactionManager, this must be not be null, since
- // GetFirstCommitRecord sets a query or if no start record, the TransactionManager exits its loop.
- if (currentQueryResult == null)
- {
- throw new InvalidOperationException("GetNextCommitRecord called but currentQueryResult is null.");
- }
-
- if (currentRowTransactionsIndex == currentRowTransactions.Count)
- {
- currentQueryResultIndex++;
- currentRowTransactionsIndex = 0;
- currentRowTransactions = null;
- }
-
- if (currentQueryResultIndex == currentQueryResult.Count)
- {
- // No more rows in our current segment, retrieve the next segment from the Table.
- if (currentLastEvaluatedKey == null || currentLastEvaluatedKey.Count == 0)
- {
- currentQueryResult = null;
- return null;
- }
-
- await ReadRowsFromTable(0);
- }
-
- if (currentRowTransactions == null)
- {
- // TODO: assert currentRowTransactionsIndex = 0?
- currentRowTransactions = DeserializeCommitRecords(currentQueryResult[currentQueryResultIndex].Transactions);
- }
-
- var currentTransaction = currentRowTransactions[currentRowTransactionsIndex++];
-
- nextLogSequenceNumber = currentTransaction.LSN + 1;
-
- return currentTransaction;
- }
-
- public Task GetStartRecord()
- {
- return Task.FromResult(startRecordValue);
- }
-
- public async Task UpdateStartRecord(long transactionId)
- {
- await storage.UpsertEntryAsync(this.options.TableName,
- new Dictionary
- {
- { PartitionKey, new AttributeValue(StartRowPartitionKey) },
- { RowKey, StartRowRowKey }
- },
- new Dictionary
- {
- { AllocatedTransactionIdsKey, LongToAttribute(transactionId) }
- }).ConfigureAwait(false);
-
- startRecordValue = transactionId;
- }
-
- public async Task Append(IEnumerable commitRecords)
- {
- var batchOperation = new List>();
-
- // TODO modify this to be able to use IEnumerable, fixed size array for serialization in the size of CommitRecordsPerRow, list is temporary
- var transactionList = new List(commitRecords);
-
- for (int nextRecord = 0; nextRecord < transactionList.Count; nextRecord += CommitRecordsPerRow)
- {
- var recordCount = Math.Min(transactionList.Count - nextRecord, CommitRecordsPerRow);
- var transactionSegment = transactionList.GetRange(nextRecord, recordCount);
- var commitRow = new CommitRow(nextLogSequenceNumber);
-
- foreach (var transaction in transactionSegment)
- {
- transaction.LSN = nextLogSequenceNumber++;
- }
-
- commitRow.Transactions = SerializeCommitRecords(transactionSegment);
-
- batchOperation.Add(
- new Dictionary
- {
- { PartitionKey, new AttributeValue(CommitRecordPartitionKey) },
- { RowKey, commitRow.FirstLSNAttribute },
- { TransactionsKey, new AttributeValue { B = new MemoryStream(commitRow.Transactions.Value.Array) } }
- });
-
- if (batchOperation.Count == BatchOperationLimit)
- {
- await storage.PutEntriesAsync(this.options.TableName, batchOperation).ConfigureAwait(false);
-
- batchOperation = new List>();
- }
- }
-
- if (batchOperation.Count > 0)
- {
- await storage.PutEntriesAsync(this.options.TableName, batchOperation).ConfigureAwait(false);
- }
- }
-
- public async Task TruncateLog(long lsn)
- {
- var keyValues = new Dictionary
- {
- { PartitionKeyAlias, new AttributeValue(CommitRecordPartitionKey) },
- { RowKeyAlias, LongToAttribute(lsn) }
- };
- string query = $"{PartitionKey} = {PartitionKeyAlias} AND {RowKey} <= {RowKeyAlias}";
- Dictionary lastEvaluatedKey = null;
- var batchOperation = new List>();
-
- do
- {
- var result = await storage.QueryAsync(this.options.TableName, keyValues, query,
- CommitRowResolver, lastEvaluatedKey: lastEvaluatedKey).ConfigureAwait(false);
- lastEvaluatedKey = result.lastEvaluatedKey;
-
- foreach (var row in result.results)
- {
- var transactions = DeserializeCommitRecords(row.Transactions);
-
- if (transactions.Count > 0 && transactions[transactions.Count - 1].LSN <= lsn)
- {
- batchOperation.Add(
- new Dictionary
- {
- { PartitionKey, new AttributeValue(CommitRecordPartitionKey) },
- { RowKey, row.FirstLSNAttribute }
- });
-
- if (batchOperation.Count == BatchOperationLimit)
- {
- await storage.DeleteEntriesAsync(this.options.TableName, batchOperation).ConfigureAwait(false);
-
- batchOperation = new List>();
- }
- }
- else
- {
- break;
- }
- }
-
- } while (lastEvaluatedKey.Count != 0);
-
- if (batchOperation.Count > 0)
- {
- await storage.DeleteEntriesAsync(this.options.TableName, batchOperation).ConfigureAwait(false);
- }
- }
-
- private async Task ReadRowsFromTable(long keyLowerBound)
- {
- var (results, lastEvaluatedKey) = await storage.QueryAsync(this.options.TableName,
- new Dictionary
- {
- { PartitionKeyAlias, new AttributeValue(CommitRecordPartitionKey) },
- { RowKeyAlias, LongToAttribute(keyLowerBound) }
- },
- $"{PartitionKey} = {PartitionKeyAlias} AND {RowKey} >= {RowKeyAlias}",
- CommitRowResolver,
- lastEvaluatedKey: currentLastEvaluatedKey).ConfigureAwait(false);
- currentQueryResult = results;
-
- // Reset the indexes
- currentQueryResultIndex = 0;
- currentRowTransactionsIndex = 0;
- currentLastEvaluatedKey = lastEvaluatedKey;
- }
-
- private ArraySegment SerializeCommitRecords(List commitRecords)
- {
- var serializableList = new List>>(commitRecords.Count);
-
- foreach (var commitRecord in commitRecords)
- {
- serializableList.Add(new Tuple>(commitRecord.LSN, commitRecord.TransactionId, commitRecord.Resources));
- }
-
- var streamWriter = new BinaryTokenStreamWriter();
-
- serializationManager.Serialize(serializableList, streamWriter);
-
- return new ArraySegment(streamWriter.ToByteArray());
- }
-
- private List DeserializeCommitRecords(ArraySegment? serializerCommitRecords)
- {
- if (!serializerCommitRecords.HasValue)
- {
- return new List();
- }
-
- var streamReader = new BinaryTokenStreamReader(serializerCommitRecords.Value);
-
- var deserializedList = serializationManager.Deserialize>>>(streamReader);
-
- var commitRecords = new List(deserializedList.Count);
-
- foreach (var item in deserializedList)
- {
- commitRecords.Add(new CommitRecord { LSN = item.Item1, TransactionId = item.Item2, Resources = item.Item3 });
- }
-
- return commitRecords;
- }
-
- private static AttributeValue LongToAttribute(long value)
- {
- return new AttributeValue { N = value.ToString("d", CultureInfo.InvariantCulture) };
- }
-
- private static long AttributeToLong(AttributeValue value)
- {
- return long.Parse(value.N, CultureInfo.InvariantCulture);
- }
-
- private static Func, CommitRow> CommitRowResolver => (fields) =>
- {
- var commitRow = new CommitRow(AttributeToLong(fields[RowKey]));
- var stream = fields[TransactionsKey].B;
- if (stream.TryGetBuffer(out ArraySegment buffer))
- {
- commitRow.Transactions = buffer;
- }
- else
- {
- commitRow.Transactions = new ArraySegment(stream.ToArray());
- }
- return commitRow;
- };
-
- private class CommitRow
- {
- public CommitRow(long firstLSN)
- {
- FirstLSN = firstLSN;
- }
-
- public ArraySegment? Transactions { get; set; }
-
- public long FirstLSN { get; set; }
-
- public AttributeValue FirstLSNAttribute
- {
- get
- {
- return LongToAttribute(FirstLSN);
- }
- }
- }
-
- private class StartRow
- {
- public StartRow(long transactionId)
- {
- AllocatedTransactionIds = transactionId;
- }
-
- public long AllocatedTransactionIds { get; set; }
- }
- }
-}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/Hosting/AzureTableTransactionsSiloBuilderExtensions.cs b/src/Azure/Orleans.Transactions.AzureStorage/Hosting/AzureTableTransactionsSiloBuilderExtensions.cs
index 7f358eb40d..d7f6f50564 100644
--- a/src/Azure/Orleans.Transactions.AzureStorage/Hosting/AzureTableTransactionsSiloBuilderExtensions.cs
+++ b/src/Azure/Orleans.Transactions.AzureStorage/Hosting/AzureTableTransactionsSiloBuilderExtensions.cs
@@ -3,9 +3,9 @@
using Orleans.Runtime;
using Orleans.Configuration;
using Orleans.Transactions.Abstractions;
-using Orleans.Transactions.AzureStorage.TransactionalState;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Orleans.Providers;
+using Orleans.Transactions.AzureStorage;
namespace Orleans.Hosting
{
@@ -48,17 +48,10 @@ private static IServiceCollection AddAzureTableTransactionalStateStorage(this IS
{
configureOptions?.Invoke(services.AddOptions(name));
- // single TM
- services.ConfigureNamedOptionForLogging(name);
services.TryAddSingleton(sp => sp.GetServiceByName(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME));
services.AddSingletonNamedService(name, AzureTableTransactionalStateStorageFactory.Create);
services.AddSingletonNamedService>(name, (s, n) => (ILifecycleParticipant)s.GetRequiredServiceByName(n));
- // distributed TM
- services.TryAddSingleton(sp => sp.GetServiceByName(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME));
- services.AddSingletonNamedService(name, Orleans.Transactions.DistributedTM.AzureStorage.AzureTableTransactionalStateStorageFactory.Create);
- services.AddSingletonNamedService>(name, (s, n) => (ILifecycleParticipant)s.GetRequiredServiceByName(n));
-
return services;
}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/Hosting/SiloBuilderExtensions.cs b/src/Azure/Orleans.Transactions.AzureStorage/Hosting/SiloBuilderExtensions.cs
deleted file mode 100644
index 223dc7c196..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/Hosting/SiloBuilderExtensions.cs
+++ /dev/null
@@ -1,31 +0,0 @@
-using System;
-using Microsoft.Extensions.DependencyInjection;
-using Orleans.Configuration;
-using Orleans.Transactions.AzureStorage;
-
-namespace Orleans.Hosting
-{
- public static class SiloBuilderExtensions
- {
- ///
- /// Configure cluster to use azure transaction log using configure action.
- ///
- public static ISiloHostBuilder UseAzureTransactionLog(this ISiloHostBuilder builder, Action configureOptions)
- {
- return builder.UseAzureTransactionLog(ob => ob.Configure(configureOptions));
- }
-
- ///
- /// Configure cluster to use azure transaction log using configuration builder.
- ///
- public static ISiloHostBuilder UseAzureTransactionLog(this ISiloHostBuilder builder, Action> configureOptions)
- {
- return builder.ConfigureServices(services =>
- {
- configureOptions?.Invoke(services.AddOptions());
- services.AddTransient();
- services.AddTransient(AzureTransactionLogStorage.Create);
- });
- }
- }
-}
\ No newline at end of file
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionArchiveLogOptions.cs b/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionArchiveLogOptions.cs
deleted file mode 100644
index 53f4f74eae..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionArchiveLogOptions.cs
+++ /dev/null
@@ -1,16 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace Orleans.Configuration.Development
-{
- ///
- /// Option class to configure Azure transaction log archive behavior
- ///
- public class AzureTransactionArchiveLogOptions
- {
- public const bool DEFAULT_ARCHIVE_LOG = false;
- //whether to archive commited transaction log or not. turned off by default
- public bool ArchiveLog { get; set; } = DEFAULT_ARCHIVE_LOG;
- }
-}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogConfiguration.cs b/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogConfiguration.cs
deleted file mode 100644
index 604678014a..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogConfiguration.cs
+++ /dev/null
@@ -1,17 +0,0 @@
-
-namespace Orleans.Transactions.AzureStorage
-{
- public class AzureTransactionLogConfiguration
- {
- public string ConnectionString { get; set; }
-
- public string TableName { get; set; } = "TransactionLog";
-
- internal void Copy(AzureTransactionLogConfiguration other)
- {
- if (other == null) Copy(new AzureTransactionLogConfiguration());
- this.ConnectionString = other.ConnectionString;
- this.TableName = other.TableName;
- }
- }
-}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogOptions.cs b/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogOptions.cs
deleted file mode 100644
index 25e5b1fcb6..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogOptions.cs
+++ /dev/null
@@ -1,36 +0,0 @@
-
-using Microsoft.Extensions.Options;
-using Orleans.Runtime;
-
-namespace Orleans.Configuration
-{
- public class AzureTransactionLogOptions
- {
- [RedactConnectionString]
- public string ConnectionString { get; set; }
-
- public string TableName { get; set; } = "TransactionLog";
- }
-
- public class AzureTransactionLogOptionsValidator : IConfigurationValidator
- {
- private readonly AzureTransactionLogOptions options;
-
- public AzureTransactionLogOptionsValidator(IOptions configurationOptions)
- {
- this.options = configurationOptions.Value;
- }
-
- public void ValidateConfiguration()
- {
- if (string.IsNullOrWhiteSpace(this.options.ConnectionString))
- {
- throw new OrleansConfigurationException($"Invalid AzureTransactionLogOptions. ConnectionString is required.");
- }
- if (string.IsNullOrWhiteSpace(this.options.TableName))
- {
- throw new OrleansConfigurationException($"Invalid AzureTransactionLogOptions. TableName is required.");
- }
- }
- }
-}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogStorage.cs b/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogStorage.cs
deleted file mode 100644
index bd8fab0a68..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/Storage/AzureTransactionLogStorage.cs
+++ /dev/null
@@ -1,449 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-using Microsoft.WindowsAzure.Storage;
-using Microsoft.WindowsAzure.Storage.Table;
-using Microsoft.WindowsAzure.Storage.RetryPolicies;
-using Microsoft.Extensions.Options;
-using Microsoft.Extensions.DependencyInjection;
-using Orleans.Configuration;
-using Orleans.Configuration.Development;
-using Orleans.Serialization;
-using Orleans.Transactions.Abstractions;
-
-namespace Orleans.Transactions.AzureStorage
-{
- ///
- /// TransactionLog ported from research. Placeholder, is being rewritten.
- ///
- public class AzureTransactionLogStorage : ITransactionLogStorage
- {
- private const string RowKey = "RowKey";
- private const string PartitionKey = "PartitionKey";
-
- private const int BatchOperationLimit = 100;
- private const int CommitRecordsPerRow = 10;
-
- //TODO: jbragg - Do not use serializationManager for persistent data!!
- private readonly SerializationManager serializationManager;
- private readonly AzureTransactionLogOptions options;
-
- // Azure Tables objects for persistent storage
- private CloudTable table;
-
- private long startRecordValue;
- private long nextLogSequenceNumber;
- private readonly string commitRecordPartitionKey;
-
- // Log iteration indexes, reused between operations
- private TableContinuationToken currentContinuationToken;
- private TableQuerySegment currentQueryResult;
- private int currentQueryResultIndex;
- private List currentRowTransactions;
- private int currentRowTransactionsIndex;
- private readonly ClusterOptions clusterOptions;
- private readonly AzureTransactionArchiveLogOptions archiveLogOptions;
- public AzureTransactionLogStorage(SerializationManager serializationManager, IOptions configurationOptions,
- IOptions archiveOptions, IOptions clusterOptions)
- {
- this.serializationManager = serializationManager;
- this.options = configurationOptions.Value;
- this.clusterOptions = clusterOptions.Value;
- this.archiveLogOptions = archiveOptions.Value;
- this.commitRecordPartitionKey = ArchivalRow.MakePartitionKey(this.clusterOptions.ServiceId);
- }
-
- public async Task Initialize()
- {
- if (string.IsNullOrWhiteSpace(this.options.ConnectionString))
- {
- throw new ArgumentNullException(nameof(this.options.ConnectionString));
- }
-
- // Retrieve the storage account from the connection string.
- var storageAccount = CloudStorageAccount.Parse(this.options.ConnectionString);
-
- // Create the table if not exists.
- CloudTableClient creationClient = storageAccount.CreateCloudTableClient();
- // TODO - do not hard code DefaultRequestOptions in rewrite.
- creationClient.DefaultRequestOptions.RetryPolicy = new LinearRetry(TimeSpan.FromSeconds(1), 60);
- creationClient.DefaultRequestOptions.ServerTimeout = TimeSpan.FromMinutes(3);
- creationClient.DefaultRequestOptions.PayloadFormat = TablePayloadFormat.JsonNoMetadata;
- CloudTable creationTable = creationClient.GetTableReference(this.options.TableName);
- await creationTable.CreateIfNotExistsAsync().ConfigureAwait(false);
-
- // get table for operations
- CloudTableClient operationClient = storageAccount.CreateCloudTableClient();
- // TODO - do not hard code DefaultRequestOptions in rewrite.
- operationClient.DefaultRequestOptions.RetryPolicy = new LinearRetry(TimeSpan.FromMilliseconds(100), 5);
- operationClient.DefaultRequestOptions.ServerTimeout = TimeSpan.FromSeconds(3);
- operationClient.DefaultRequestOptions.PayloadFormat = TablePayloadFormat.JsonNoMetadata;
- this.table = operationClient.GetTableReference(this.options.TableName);
-
- var query = new TableQuery().Where(TableQuery.CombineFilters(
- TableQuery.GenerateFilterCondition(PartitionKey, QueryComparisons.Equal, StartRow.MakePartitionkey(this.clusterOptions.ServiceId)),
- TableOperators.And,
- TableQuery.GenerateFilterCondition(RowKey, QueryComparisons.LessThanOrEqual, StartRow.StartRowRowKey)));
- var queryResult = await table.ExecuteQuerySegmentedAsync(query, null).ConfigureAwait(false);
-
- if (queryResult.Results.Count == 0)
- {
- // This is a fresh deployment, the StartRecord isn't created yet.
- // Create it here.
- var row = new StartRow(this.clusterOptions, 0);
- var operation = TableOperation.Insert(row);
-
- await table.ExecuteAsync(operation).ConfigureAwait(false);
-
- startRecordValue = 0;
- }
- else
- {
- startRecordValue = queryResult.Results[0].AllocatedTransactionIds;
- }
- }
-
- public async Task GetFirstCommitRecord()
- {
- currentContinuationToken = null;
-
- await ReadRowsFromTable(0);
-
- if (currentQueryResult.Results.Count == 0)
- {
- // The log has no log entries
- currentQueryResult = null;
-
- nextLogSequenceNumber = 1;
-
- return null;
- }
-
- currentRowTransactions = DeserializeCommitRecords(currentQueryResult.Results[0].Transactions);
-
- // TODO: Assert not empty?
-
- nextLogSequenceNumber = currentRowTransactions[currentRowTransactionsIndex].LSN + 1;
-
- return currentRowTransactions[currentRowTransactionsIndex++];
- }
-
- public async Task GetNextCommitRecord()
- {
- // Based on the current implementation logic in TransactionManager, this must be not be null, since
- // GetFirstCommitRecord sets a query or if no start record, the TransactionManager exits its loop.
- if (currentQueryResult == null)
- {
- throw new InvalidOperationException("GetNextCommitRecord called but currentQueryResult is null.");
- }
-
- if (currentRowTransactionsIndex == currentRowTransactions.Count)
- {
- currentQueryResultIndex++;
- currentRowTransactionsIndex = 0;
- currentRowTransactions = null;
- }
-
- if (currentQueryResultIndex == currentQueryResult.Results.Count)
- {
- // No more rows in our current segment, retrieve the next segment from the Table.
- if (currentContinuationToken == null)
- {
- currentQueryResult = null;
- return null;
- }
-
- await ReadRowsFromTable(0);
- }
-
- if (currentRowTransactions == null)
- {
- // TODO: assert currentRowTransactionsIndex = 0?
- currentRowTransactions = DeserializeCommitRecords(currentQueryResult.Results[currentQueryResultIndex].Transactions);
- }
-
- var currentTransaction = currentRowTransactions[currentRowTransactionsIndex++];
-
- nextLogSequenceNumber = currentTransaction.LSN + 1;
-
- return currentTransaction;
- }
-
- public Task GetStartRecord()
- {
- return Task.FromResult(startRecordValue);
- }
-
- public async Task UpdateStartRecord(long transactionId)
- {
- var tableOperation = TableOperation.Replace(new StartRow(this.clusterOptions, transactionId));
-
- await table.ExecuteAsync(tableOperation).ConfigureAwait(false);
-
- startRecordValue = transactionId;
- }
-
- public async Task Append(IEnumerable transactions)
- {
- var batchOperation = new TableBatchOperation();
-
- // TODO modify this to be able to use IEnumerable, fixed size array for serialization in the size of CommitRecordsPerRow, list is temporary
- var transactionList = new List(transactions);
-
- for (int nextRecord = 0; nextRecord < transactionList.Count; nextRecord += CommitRecordsPerRow)
- {
- var recordCount = Math.Min(transactionList.Count - nextRecord, CommitRecordsPerRow);
- var transactionSegment = transactionList.GetRange(nextRecord, recordCount);
- var commitRow = new CommitRow(this.clusterOptions, nextLogSequenceNumber);
-
- foreach (var transaction in transactionSegment)
- {
- transaction.LSN = nextLogSequenceNumber++;
- }
-
- commitRow.Transactions = SerializeCommitRecords(transactionSegment);
-
- batchOperation.Insert(commitRow);
-
- if (batchOperation.Count == BatchOperationLimit)
- {
- await table.ExecuteBatchAsync(batchOperation).ConfigureAwait(false);
-
- batchOperation = new TableBatchOperation();
- }
- }
-
- if (batchOperation.Count > 0)
- {
- await table.ExecuteBatchAsync(batchOperation).ConfigureAwait(false);
- }
- }
-
- public async Task> QueryArchivalRecords(TableQuery query)
- {
- var continuationToken = default(TableContinuationToken);
- var deserializedResults = new List();
- do
- {
- var queryResult = await table.ExecuteQuerySegmentedAsync(query, continuationToken).ConfigureAwait(false);
- continuationToken = queryResult.ContinuationToken;
-
- if (queryResult.Results.Count > 0)
- {
- foreach (var row in queryResult)
- {
- var transactions = DeserializeCommitRecords(row.Transactions);
- deserializedResults.AddRange(transactions);
- }
- }
- } while (continuationToken != default(TableContinuationToken));
-
- return deserializedResults;
- }
-
- public async Task TruncateLog(long lsn)
- {
- var continuationToken = default(TableContinuationToken);
- var query = new TableQuery().Where(TableQuery.CombineFilters(
- TableQuery.GenerateFilterCondition(PartitionKey, QueryComparisons.Equal, commitRecordPartitionKey),
- TableOperators.And,
- TableQuery.CombineFilters(
- TableQuery.GenerateFilterCondition(RowKey, QueryComparisons.LessThanOrEqual, CommitRow.MakeRowKey(lsn)),
- TableOperators.And,
- TableQuery.GenerateFilterCondition(RowKey, QueryComparisons.GreaterThanOrEqual, CommitRow.MinRowKey))));
- var batchOperation = new TableBatchOperation();
- do
- {
- var queryResult = await table.ExecuteQuerySegmentedAsync(query, continuationToken).ConfigureAwait(false);
-
- continuationToken = queryResult.ContinuationToken;
-
- if (queryResult.Results.Count > 0)
- {
-
- foreach (var row in queryResult)
- {
- var transactions = DeserializeCommitRecords(row.Transactions);
-
- if (transactions.Count > 0 && transactions[transactions.Count - 1].LSN <= lsn)
- {
- batchOperation.Delete(row);
- if (this.archiveLogOptions.ArchiveLog)
- {
- var archiveRow = new ArchivalRow(this.clusterOptions, row.Transactions, transactions.Select(tx => tx.TransactionId).Min(), transactions.Select(tx => tx.LSN).Min());
- batchOperation.Insert(archiveRow);
- }
-
- if (batchOperation.Count == BatchOperationLimit)
- {
- await table.ExecuteBatchAsync(batchOperation).ConfigureAwait(false);
- batchOperation = new TableBatchOperation();
- }
- }
- else
- {
- break;
- }
- }
- }
-
- } while (continuationToken != default(TableContinuationToken));
-
- if (batchOperation.Count > 0)
- {
- await table.ExecuteBatchAsync(batchOperation).ConfigureAwait(false);
- }
- }
-
- public static Factory> Create(IServiceProvider serviceProvider)
- {
- return async () =>
- {
- AzureTransactionLogStorage storage = ActivatorUtilities.CreateInstance(serviceProvider, new object[0]);
- await storage.Initialize();
- return storage;
- };
- }
-
- private async Task ReadRowsFromTable(long keyLowerBound)
- {
- var query = new TableQuery().Where(TableQuery.CombineFilters(
- TableQuery.GenerateFilterCondition(PartitionKey, QueryComparisons.Equal, commitRecordPartitionKey),
- TableOperators.And,
- TableQuery.CombineFilters(
- TableQuery.GenerateFilterCondition(RowKey, QueryComparisons.GreaterThanOrEqual, CommitRow.MakeRowKey(keyLowerBound)),
- TableOperators.And,
- TableQuery.GenerateFilterCondition(RowKey, QueryComparisons.LessThanOrEqual, CommitRow.MaxRowKey))));
-
- currentQueryResult = await table.ExecuteQuerySegmentedAsync(query, currentContinuationToken).ConfigureAwait(false);
-
- // Reset the indexes
- currentQueryResultIndex = 0;
- currentRowTransactionsIndex = 0;
- currentContinuationToken = currentQueryResult.ContinuationToken;
- }
-
- private byte[] SerializeCommitRecords(List commitRecords)
- {
- var serializableList = new List>>(commitRecords.Count);
-
- foreach (var commitRecord in commitRecords)
- {
- serializableList.Add(new Tuple>(commitRecord.LSN, commitRecord.TransactionId, commitRecord.Resources));
- }
-
- var streamWriter = new BinaryTokenStreamWriter();
-
- serializationManager.Serialize(serializableList, streamWriter);
-
- return streamWriter.ToByteArray();
- }
-
- private List DeserializeCommitRecords(byte[] serializerCommitRecords)
- {
- if (serializerCommitRecords == null)
- {
- return new List();
- }
-
- var streamReader = new BinaryTokenStreamReader(serializerCommitRecords);
-
- var deserializedList = serializationManager.Deserialize>>>(streamReader);
-
- var commitRecords = new List(deserializedList.Count);
-
- foreach (var item in deserializedList)
- {
- commitRecords.Add(new CommitRecord { LSN = item.Item1, TransactionId = item.Item2, Resources = item.Item3 });
- }
-
- return commitRecords;
- }
-
- public class ArchivalRow : TableEntity
- {
- public const string MinRowKey = "arch_";
- public const string MaxRowKey = "arch_~";
-
- public static string MakeRowKey(long firstTransactionId)
- {
- return $"arch_{firstTransactionId:x16}";
- }
-
- //CommitRow and ArchivalRow should share the same partitionKey making. So they
- //the same method
- public static string MakePartitionKey(string serviceId)
- {
- return $"tlpk_{serviceId}";
- }
-
- public ArchivalRow()
- {
- }
-
- public ArchivalRow(ClusterOptions clusterOptions, byte[] transactions, long firstTransactionId, long firstLSN)
- {
- this.Transactions = transactions;
- this.ClusterId = clusterOptions.ClusterId;
- this.FirstLSN = firstLSN;
- this.FirstTransactionId = firstTransactionId;
- PartitionKey = MakePartitionKey(clusterOptions.ServiceId);
- RowKey = MakeRowKey(firstTransactionId);
- }
-
- public string ClusterId { get; set; }
- public long FirstLSN { get; set; }
- public long FirstTransactionId { get; set; }
- public byte[] Transactions { get; set; }
- }
-
- public class CommitRow : TableEntity
- {
- public const string MaxRowKey = "crrk_~";
- public const string MinRowKey = "crrk_";
- public CommitRow(ClusterOptions clusterOptions, long firstLSN)
- {
- // All entities are in the same partition for atomic read/writes.
- PartitionKey = ArchivalRow.MakePartitionKey(clusterOptions.ServiceId);
- RowKey = MakeRowKey(firstLSN);
- }
-
- public CommitRow()
- {
- }
-
- public byte[] Transactions { get; set; }
-
- internal static string MakeRowKey(long lsn)
- {
- return $"crrk_{lsn:x16}";
- }
- }
-
- private class StartRow : TableEntity
- {
- internal const string StartRowRowKey = "srrk_";
-
- internal static string MakePartitionkey(string serviceId)
- {
- return $"srpk_{serviceId}";
- }
-
- public StartRow(ClusterOptions options, long transactionId)
- {
- // only row in the table with this partition key
- PartitionKey = MakePartitionkey(options.ServiceId);
- RowKey = StartRowRowKey;
- ETag = "*";
- AllocatedTransactionIds = transactionId;
- }
-
- public StartRow()
- {
- }
-
- public long AllocatedTransactionIds { get; set; }
- }
- }
-}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/AzureTableTransactionalStateStorage.cs b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/AzureTableTransactionalStateStorage.cs
index 5fce5a1f6d..0a072ba424 100644
--- a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/AzureTableTransactionalStateStorage.cs
+++ b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/AzureTableTransactionalStateStorage.cs
@@ -14,16 +14,18 @@ public class AzureTableTransactionalStateStorage : ITransactionalStateSt
{
private readonly CloudTable table;
private readonly string partition;
+ private readonly string stateName;
private readonly JsonSerializerSettings jsonSettings;
private readonly ILogger logger;
private KeyEntity key;
- private List states;
+ private List> states;
- public AzureTableTransactionalStateStorage(CloudTable table, string partition, JsonSerializerSettings JsonSettings, ILogger> logger)
+ public AzureTableTransactionalStateStorage(CloudTable table, string partition, string stateName, JsonSerializerSettings JsonSettings, ILogger> logger)
{
this.table = table;
this.partition = partition;
+ this.stateName = stateName;
this.jsonSettings = JsonSettings;
this.logger = logger;
}
@@ -32,98 +34,187 @@ public async Task> Load()
{
try
{
- Task keyTask = ReadKey();
- Task> statesTask = ReadStates();
- this.key = await keyTask.ConfigureAwait(false);
- this.states = await statesTask.ConfigureAwait(false);
- if (string.IsNullOrEmpty(this.key.ETag))
+ var keyTask = ReadKey();
+ var statesTask = ReadStates();
+ key = await keyTask.ConfigureAwait(false);
+ states = await statesTask.ConfigureAwait(false);
+
+ if (string.IsNullOrEmpty(key.ETag))
{
+ if (logger.IsEnabled(LogLevel.Debug))
+ logger.LogDebug($"{partition} Loaded v0, fresh");
+
+ // first time load
return new TransactionalStorageLoadResponse();
}
- TState commitedState = (!string.IsNullOrEmpty(this.key.CommittedTransactionId)) ? FindState(this.key.CommittedTransactionId) : new TState();
- if (commitedState == null)
+ else
{
- this.logger.LogCritical("Transactional state non-recoverable error. Commited state for transaction {TransactionId} not found.", this.key.CommittedTransactionId);
- throw new InvalidOperationException($"Transactional state non-recoverable error. Commited state for transaction {this.key.CommittedTransactionId} not found.");
+ if (!FindState(this.key.CommittedSequenceId, out var pos))
+ {
+ var error = $"Storage state corrupted: no record for committed state";
+ logger.LogCritical(error);
+ throw new InvalidOperationException(error);
+ }
+ var committedState = states[pos].Value.GetState(this.jsonSettings);
+
+ var PrepareRecordsToRecover = new List>();
+ for (int i = 0; i < states.Count; i++)
+ {
+ var kvp = states[i];
+
+ // pending states for already committed transactions can be ignored
+ if (kvp.Key <= key.CommittedSequenceId)
+ continue;
+
+ // upon recovery, local non-committed transactions are considered aborted
+ if (kvp.Value.TransactionManager == null)
+ break;
+
+ PrepareRecordsToRecover.Add(new PendingTransactionState()
+ {
+ SequenceId = kvp.Key,
+ State = kvp.Value.GetState(this.jsonSettings),
+ TimeStamp = kvp.Value.TransactionTimestamp,
+ TransactionId = kvp.Value.TransactionId,
+ TransactionManager = kvp.Value.TransactionManager
+ });
+ }
+
+ // clear the state strings... no longer needed, ok to GC now
+ for (int i = 0; i < states.Count; i++)
+ {
+ states[i].Value.StateJson = null;
+ }
+
+ if (logger.IsEnabled(LogLevel.Debug))
+ logger.LogDebug($"{partition} Loaded v{this.key.CommittedSequenceId} rows={string.Join(",", states.Select(s => s.Key.ToString("x16")))}");
+
+ return new TransactionalStorageLoadResponse(this.key.ETag, committedState, this.key.CommittedSequenceId, this.key.Metadata, PrepareRecordsToRecover);
}
- var pendingStates = states.Select(s => new PendingTransactionState(s.TransactionId, s.SequenceId, s.GetState(this.jsonSettings))).ToList();
- return new TransactionalStorageLoadResponse(this.key.ETag, commitedState, this.key.Metadata, pendingStates);
- } catch(Exception ex)
+ }
+ catch (Exception ex)
{
this.logger.LogError("Transactional state load failed {Exception}.", ex);
throw;
}
}
- public async Task Persist(string expectedETag, string metadata, List> statesToPrepare)
+
+ public async Task Store(string expectedETag, string metadata, List> statesToPrepare, long? commitUpTo, long? abortAfter)
{
- try
- {
- var batchOperation = new TableBatchOperation();
+ if (this.key.ETag != expectedETag)
+ throw new ArgumentException(nameof(expectedETag), "Etag does not match");
- this.key.ETag = expectedETag;
- this.key.Metadata = metadata;
- if (string.IsNullOrEmpty(this.key.ETag))
- batchOperation.Insert(this.key);
- else
- batchOperation.Replace(this.key);
+ // assemble all storage operations into a single batch
+ // these operations must commit in sequence, but not necessarily atomically
+ // so we can split this up if needed
+ var batchOperation = new BatchOperation(logger, key, table);
- // add new states
- List> stored = this.states.Select(s => Tuple.Create(s.TransactionId, s.SequenceId)).ToList();
- List newStates = new List();
- foreach (PendingTransactionState pendingState in statesToPrepare.Where(p => !stored.Contains(Tuple.Create(p.TransactionId, p.SequenceId))))
+ // first, clean up aborted records
+ if (abortAfter.HasValue && states.Count != 0)
+ {
+ while (states.Count > 0 && states[states.Count - 1].Key > abortAfter)
{
- var newState = StateEntity.Create(this.jsonSettings, this.partition, pendingState);
- newStates.Add(newState);
- batchOperation.Insert(newState);
- }
+ var entity = states[states.Count - 1].Value;
+ await batchOperation.Add(TableOperation.Delete(entity)).ConfigureAwait(false);
+ states.RemoveAt(states.Count - 1);
- if (batchOperation.Count > AzureTableConstants.MaxBatchSize)
- {
- this.logger.LogError("Too many pending states. PendingStateCount {PendingStateCount}.", batchOperation.Count);
- throw new InvalidOperationException($"Too many pending states. PendingStateCount {batchOperation.Count}");
+ if (logger.IsEnabled(LogLevel.Trace))
+ logger.LogTrace($"{partition}.{states[states.Count - 1].Key:x16} Delete {entity.TransactionId}");
}
-
- await table.ExecuteBatchAsync(batchOperation).ConfigureAwait(false);
- this.states.AddRange(newStates);
- return this.key.ETag;
}
- catch (Exception ex)
+
+ // second, persist non-obsolete prepare records
+ var obsoleteBefore = commitUpTo.HasValue ? commitUpTo.Value : key.CommittedSequenceId;
+ if (statesToPrepare != null)
+ foreach (var s in statesToPrepare)
+ if (s.SequenceId >= obsoleteBefore)
+ {
+ if (FindState(s.SequenceId, out var pos))
+ {
+ // overwrite with new pending state
+ var existing = states[pos].Value;
+ existing.TransactionId = s.TransactionId;
+ existing.TransactionTimestamp = s.TimeStamp;
+ existing.TransactionManager = s.TransactionManager;
+ existing.SetState(s.State, this.jsonSettings);
+ await batchOperation.Add(TableOperation.Replace(existing)).ConfigureAwait(false);
+ states.RemoveAt(pos);
+
+ if (logger.IsEnabled(LogLevel.Trace))
+ logger.LogTrace($"{partition}.{existing.SequenceId:x16} Update {existing.TransactionId}");
+ }
+ else
+ {
+ var entity = StateEntity.Create(this.jsonSettings, this.partition, s);
+ await batchOperation.Add(TableOperation.Insert(entity)).ConfigureAwait(false);
+ states.Insert(pos, new KeyValuePair(s.SequenceId, entity));
+
+ if (logger.IsEnabled(LogLevel.Trace))
+ logger.LogTrace($"{partition}.{s.SequenceId:x16} Insert {entity.TransactionId}");
+ }
+ }
+
+ // third, persist metadata and commit position
+ key.Metadata = metadata;
+ if (commitUpTo.HasValue && commitUpTo.Value > key.CommittedSequenceId)
{
- this.logger.LogError("Transactional state persist failed {Exception}.", ex);
- throw;
+ key.CommittedSequenceId = commitUpTo.Value;
}
- }
+ if (string.IsNullOrEmpty(this.key.ETag))
+ {
+ await batchOperation.Add(TableOperation.Insert(this.key)).ConfigureAwait(false);
- public async Task Confirm(string expectedETag, string metadata, string transactionIdToCommit)
- {
- try
+ if (logger.IsEnabled(LogLevel.Trace))
+ logger.LogTrace($"{partition}.k Insert");
+ }
+ else
{
- // only update storage if transaction id is greater then previously committed
- if (string.Compare(transactionIdToCommit, this.key.CommittedTransactionId) <= 0)
- return this.key.ETag;
+ await batchOperation.Add(TableOperation.Replace(this.key)).ConfigureAwait(false);
+
+ if (logger.IsEnabled(LogLevel.Trace))
+ logger.LogTrace($"{partition}.k Update");
+ }
- TState state = FindState(transactionIdToCommit);
- if (state == null)
+ // fourth, remove obsolete records
+ if (states.Count > 0 && states[0].Key < obsoleteBefore)
+ {
+ FindState(obsoleteBefore, out var pos);
+ for (int i = 0; i < pos; i++)
{
- this.logger.LogCritical("Transactional state non-recoverable error. Attempting to confirm a transaction {TransactionId} for which no state exists.", transactionIdToCommit);
- throw new InvalidOperationException($"Transactional state non-recoverable error. Attempting to confirm a transaction {transactionIdToCommit} for which no state exists.");
- }
+ await batchOperation.Add(TableOperation.Delete(states[i].Value)).ConfigureAwait(false);
- this.key.ETag = expectedETag;
- this.key.Metadata = metadata;
- this.key.CommittedTransactionId = transactionIdToCommit;
- await WriteKey().ConfigureAwait(false);
- var dead = this.states.Where(p => string.Compare(p.TransactionId, transactionIdToCommit) <0).ToList();
- this.states = this.states.Where(p => string.Compare(p.TransactionId, transactionIdToCommit) >= 0).ToList();
- Cleanup(dead).Ignore();
- return this.key.ETag;
+ if (logger.IsEnabled(LogLevel.Trace))
+ logger.LogTrace($"{partition}.{states[i].Key:x16} Delete {states[i].Value.TransactionId}");
+ }
+ states.RemoveRange(0, pos);
}
- catch (Exception ex)
+
+ await batchOperation.Flush().ConfigureAwait(false);
+
+ if (logger.IsEnabled(LogLevel.Debug))
+ logger.LogDebug($"{partition} Stored v{this.key.CommittedSequenceId} eTag={key.ETag}");
+
+ return key.ETag;
+ }
+
+ private bool FindState(long sequenceId, out int pos)
+ {
+ pos = 0;
+ while (pos < states.Count)
{
- this.logger.LogError("Transactional state confirm failed {Exception}.", ex);
- throw;
+ switch (states[pos].Key.CompareTo(sequenceId))
+ {
+ case 0:
+ return true;
+ case -1:
+ pos++;
+ continue;
+ case 1:
+ break;
+ }
}
+ return false;
}
private async Task ReadKey()
@@ -136,82 +227,100 @@ private async Task ReadKey()
: queryResult.Results[0];
}
- private async Task WriteKey()
- {
- Task write = (string.IsNullOrEmpty(this.key.ETag))
- ? this.table.ExecuteAsync(TableOperation.Insert(this.key))
- : this.table.ExecuteAsync(TableOperation.Replace(this.key));
- await write.ConfigureAwait(false);
- }
-
- private async Task> ReadStates()
+ private async Task>> ReadStates()
{
var query = new TableQuery()
- .Where(AzureStorageUtils.RangeQuery(this.partition, StateEntity.RKMin, StateEntity.RKMax));
+ .Where(AzureStorageUtils.RangeQuery(this.partition, StateEntity.RK_MIN, StateEntity.RK_MAX));
TableContinuationToken continuationToken = null;
- List results = new List();
+ var results = new List>();
do
{
TableQuerySegment queryResult = await table.ExecuteQuerySegmentedAsync(query, continuationToken).ConfigureAwait(false);
- results.AddRange(queryResult.Results);
+ foreach (var x in queryResult.Results)
+ {
+ results.Add(new KeyValuePair(x.SequenceId, x));
+ };
continuationToken = queryResult.ContinuationToken;
} while (continuationToken != null);
return results;
}
- private TState FindState(string transactionId)
+ private class BatchOperation
{
- StateEntity entity = this.states.FirstOrDefault(s => s.TransactionId == transactionId);
- return entity?.GetState(this.jsonSettings);
- }
+ private readonly TableBatchOperation batchOperation;
+ private readonly ILogger logger;
+ private readonly KeyEntity key;
+ private readonly CloudTable table;
- private async Task Cleanup(List deadStates)
- {
- var batchOperation = new TableBatchOperation();
- var pendingTasks = new List();
- const int MaxInFlight = 3;
- foreach (StateEntity deadState in deadStates)
- {
- batchOperation.Delete(deadState);
- // if batch is full, execute and make new batch
- if (batchOperation.Count == AzureTableConstants.MaxBatchSize)
- {
- pendingTasks.Add(table.ExecuteBatchAsync(batchOperation));
- // if we've more than MaxInFlight storage calls in flight, wait for those to execute before continuing and clear pending tasks
- if (pendingTasks.Count == MaxInFlight)
- {
- try
- {
- await Task.WhenAll(pendingTasks).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- this.logger.LogInformation("Error cleaning up transactional states {Exception}. Ignoring", ex);
- }
- pendingTasks.Clear();
- }
- batchOperation = new TableBatchOperation();
- }
- }
+ private bool batchContainsKey;
- if (batchOperation.Count != 0)
+ public BatchOperation(ILogger logger, KeyEntity key, CloudTable table)
{
- pendingTasks.Add(table.ExecuteBatchAsync(batchOperation));
- batchOperation = new TableBatchOperation();
+ this.batchOperation = new TableBatchOperation();
+ this.logger = logger;
+ this.key = key;
+ this.table = table;
}
- if (pendingTasks.Count != 0)
+ public async Task Add(TableOperation operation)
{
- try
+ batchOperation.Add(operation);
+
+ if (operation.Entity == key)
{
- await Task.WhenAll(pendingTasks).ConfigureAwait(false);
+ batchContainsKey = true;
}
- catch (Exception ex)
+
+ if (batchOperation.Count == AzureTableConstants.MaxBatchSize - (batchContainsKey ? 0 : 1))
{
- this.logger.LogInformation("Error cleaning up transactional states {Exception}. Ignoring", ex);
+ // the key serves as a synchronizer, to prevent modification by multiple grains under edge conditions,
+ // like duplicate activations or deployments.Every batch write needs to include the key,
+ // even if the key values don't change.
+
+ if (!batchContainsKey)
+ {
+ if (string.IsNullOrEmpty(key.ETag))
+ batchOperation.Insert(key);
+ else
+ batchOperation.Replace(key);
+ }
+
+ await Flush().ConfigureAwait(false);
+
+ batchOperation.Clear();
+ batchContainsKey = false;
}
- pendingTasks.Clear();
+ }
+
+ public async Task Flush()
+ {
+ if (batchOperation.Count > 0)
+ try
+ {
+ await table.ExecuteBatchAsync(batchOperation).ConfigureAwait(false);
+
+ batchOperation.Clear();
+ batchContainsKey = false;
+
+ if (logger.IsEnabled(LogLevel.Trace))
+ {
+ for (int i = 0; i < batchOperation.Count; i++)
+ logger.LogTrace($"batch-op ok {i} PK={batchOperation[i].Entity.PartitionKey} RK={batchOperation[i].Entity.RowKey}");
+ }
+ }
+ catch (Exception ex)
+ {
+ if (logger.IsEnabled(LogLevel.Trace))
+ {
+ for (int i = 0; i < batchOperation.Count; i++)
+ logger.LogTrace($"batch-op failed {i} PK={batchOperation[i].Entity.PartitionKey} RK={batchOperation[i].Entity.RowKey}");
+ }
+
+ this.logger.LogError("Transactional state store failed {Exception}.", ex);
+ throw;
+ }
}
}
+
}
-}
+}
\ No newline at end of file
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/AzureTableTransactionalStateStorageFactory.cs b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/AzureTableTransactionalStateStorageFactory.cs
index 32a9fd8f7f..91976480a7 100644
--- a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/AzureTableTransactionalStateStorageFactory.cs
+++ b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/AzureTableTransactionalStateStorageFactory.cs
@@ -11,7 +11,7 @@
using Orleans.Serialization;
using Orleans.Transactions.Abstractions;
-namespace Orleans.Transactions.AzureStorage.TransactionalState
+namespace Orleans.Transactions.AzureStorage
{
public class AzureTableTransactionalStateStorageFactory : ITransactionalStateStorageFactory, ILifecycleParticipant
{
@@ -40,7 +40,7 @@ public AzureTableTransactionalStateStorageFactory(string name, AzureTableTransac
public ITransactionalStateStorage Create(string stateName, IGrainActivationContext context) where TState : class, new()
{
string partitionKey = MakePartitionKey(context, stateName);
- return ActivatorUtilities.CreateInstance>(context.ActivationServices, this.table, partitionKey, this.jsonSettings);
+ return ActivatorUtilities.CreateInstance>(context.ActivationServices, this.table, partitionKey, stateName, this.jsonSettings);
}
public void Participate(ISiloLifecycle lifecycle)
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/AzureTableTransactionalStateStorage.cs b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/AzureTableTransactionalStateStorage.cs
deleted file mode 100644
index 764535fd07..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/AzureTableTransactionalStateStorage.cs
+++ /dev/null
@@ -1,327 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Logging;
-using Microsoft.WindowsAzure.Storage.Table;
-using Newtonsoft.Json;
-using Orleans.Transactions.Abstractions;
-using Orleans.Transactions.AzureStorage;
-
-namespace Orleans.Transactions.DistributedTM.AzureStorage
-{
- public class AzureTableTransactionalStateStorage : ITransactionalStateStorage
- where TState : class, new()
- {
- private readonly CloudTable table;
- private readonly string partition;
- private readonly string stateName;
- private readonly JsonSerializerSettings jsonSettings;
- private readonly ILogger logger;
-
- private KeyEntity key;
- private List> states;
-
- public AzureTableTransactionalStateStorage(CloudTable table, string partition, string stateName, JsonSerializerSettings JsonSettings, ILogger> logger)
- {
- this.table = table;
- this.partition = partition;
- this.stateName = stateName;
- this.jsonSettings = JsonSettings;
- this.logger = logger;
- }
-
- public async Task> Load()
- {
- try
- {
- var keyTask = ReadKey();
- var statesTask = ReadStates();
- key = await keyTask.ConfigureAwait(false);
- states = await statesTask.ConfigureAwait(false);
-
- if (string.IsNullOrEmpty(key.ETag))
- {
- if (logger.IsEnabled(LogLevel.Debug))
- logger.LogDebug($"{partition} Loaded v0, fresh");
-
- // first time load
- return new TransactionalStorageLoadResponse();
- }
- else
- {
- if (!FindState(this.key.CommittedSequenceId, out var pos))
- {
- var error = $"Storage state corrupted: no record for committed state";
- logger.LogCritical(error);
- throw new InvalidOperationException(error);
- }
- var committedState = states[pos].Value.GetState(this.jsonSettings);
-
- var PrepareRecordsToRecover = new List>();
- for (int i = 0; i < states.Count; i++)
- {
- var kvp = states[i];
-
- // pending states for already committed transactions can be ignored
- if (kvp.Key <= key.CommittedSequenceId)
- continue;
-
- // upon recovery, local non-committed transactions are considered aborted
- if (kvp.Value.TransactionManager == null)
- break;
-
- PrepareRecordsToRecover.Add(new PendingTransactionState()
- {
- SequenceId = kvp.Key,
- State = kvp.Value.GetState(this.jsonSettings),
- TimeStamp = kvp.Value.TransactionTimestamp,
- TransactionId = kvp.Value.TransactionId,
- TransactionManager = kvp.Value.TransactionManager
- });
- }
-
- // clear the state strings... no longer needed, ok to GC now
- for (int i = 0; i < states.Count; i++)
- {
- states[i].Value.StateJson = null;
- }
-
- if (logger.IsEnabled(LogLevel.Debug))
- logger.LogDebug($"{partition} Loaded v{this.key.CommittedSequenceId} rows={string.Join(",", states.Select(s => s.Key.ToString("x16")))}");
-
- return new TransactionalStorageLoadResponse(this.key.ETag, committedState, this.key.CommittedSequenceId, this.key.Metadata, PrepareRecordsToRecover);
- }
- }
- catch (Exception ex)
- {
- this.logger.LogError("Transactional state load failed {Exception}.", ex);
- throw;
- }
- }
-
-
- public async Task Store(string expectedETag, string metadata, List> statesToPrepare, long? commitUpTo, long? abortAfter)
- {
- if (this.key.ETag != expectedETag)
- throw new ArgumentException(nameof(expectedETag), "Etag does not match");
-
- // assemble all storage operations into a single batch
- // these operations must commit in sequence, but not necessarily atomically
- // so we can split this up if needed
- var batchOperation = new BatchOperation(logger, key, table);
-
- // first, clean up aborted records
- if (abortAfter.HasValue && states.Count != 0)
- {
- while (states.Count > 0 && states[states.Count - 1].Key > abortAfter)
- {
- var entity = states[states.Count - 1].Value;
- await batchOperation.Add(TableOperation.Delete(entity)).ConfigureAwait(false);
- states.RemoveAt(states.Count - 1);
-
- if (logger.IsEnabled(LogLevel.Trace))
- logger.LogTrace($"{partition}.{states[states.Count - 1].Key:x16} Delete {entity.TransactionId}");
- }
- }
-
- // second, persist non-obsolete prepare records
- var obsoleteBefore = commitUpTo.HasValue ? commitUpTo.Value : key.CommittedSequenceId;
- if (statesToPrepare != null)
- foreach (var s in statesToPrepare)
- if (s.SequenceId >= obsoleteBefore)
- {
- if (FindState(s.SequenceId, out var pos))
- {
- // overwrite with new pending state
- var existing = states[pos].Value;
- existing.TransactionId = s.TransactionId;
- existing.TransactionTimestamp = s.TimeStamp;
- existing.TransactionManager = s.TransactionManager;
- existing.SetState(s.State, this.jsonSettings);
- await batchOperation.Add(TableOperation.Replace(existing)).ConfigureAwait(false);
- states.RemoveAt(pos);
-
- if (logger.IsEnabled(LogLevel.Trace))
- logger.LogTrace($"{partition}.{existing.SequenceId:x16} Update {existing.TransactionId}");
- }
- else
- {
- var entity = StateEntity.Create(this.jsonSettings, this.partition, s);
- await batchOperation.Add(TableOperation.Insert(entity)).ConfigureAwait(false);
- states.Insert(pos, new KeyValuePair(s.SequenceId, entity));
-
- if (logger.IsEnabled(LogLevel.Trace))
- logger.LogTrace($"{partition}.{s.SequenceId:x16} Insert {entity.TransactionId}");
- }
- }
-
- // third, persist metadata and commit position
- key.Metadata = metadata;
- if (commitUpTo.HasValue && commitUpTo.Value > key.CommittedSequenceId)
- {
- key.CommittedSequenceId = commitUpTo.Value;
- }
- if (string.IsNullOrEmpty(this.key.ETag))
- {
- await batchOperation.Add(TableOperation.Insert(this.key)).ConfigureAwait(false);
-
- if (logger.IsEnabled(LogLevel.Trace))
- logger.LogTrace($"{partition}.k Insert");
- }
- else
- {
- await batchOperation.Add(TableOperation.Replace(this.key)).ConfigureAwait(false);
-
- if (logger.IsEnabled(LogLevel.Trace))
- logger.LogTrace($"{partition}.k Update");
- }
-
- // fourth, remove obsolete records
- if (states.Count > 0 && states[0].Key < obsoleteBefore)
- {
- FindState(obsoleteBefore, out var pos);
- for (int i = 0; i < pos; i++)
- {
- await batchOperation.Add(TableOperation.Delete(states[i].Value)).ConfigureAwait(false);
-
- if (logger.IsEnabled(LogLevel.Trace))
- logger.LogTrace($"{partition}.{states[i].Key:x16} Delete {states[i].Value.TransactionId}");
- }
- states.RemoveRange(0, pos);
- }
-
- await batchOperation.Flush().ConfigureAwait(false);
-
- if (logger.IsEnabled(LogLevel.Debug))
- logger.LogDebug($"{partition} Stored v{this.key.CommittedSequenceId} eTag={key.ETag}");
-
- return key.ETag;
- }
-
- private bool FindState(long sequenceId, out int pos)
- {
- pos = 0;
- while (pos < states.Count)
- {
- switch (states[pos].Key.CompareTo(sequenceId))
- {
- case 0:
- return true;
- case -1:
- pos++;
- continue;
- case 1:
- break;
- }
- }
- return false;
- }
-
- private async Task ReadKey()
- {
- var query = new TableQuery()
- .Where(AzureStorageUtils.PointQuery(this.partition, KeyEntity.RK));
- TableQuerySegment queryResult = await table.ExecuteQuerySegmentedAsync(query, null).ConfigureAwait(false);
- return queryResult.Results.Count == 0
- ? new KeyEntity() { PartitionKey = this.partition }
- : queryResult.Results[0];
- }
-
- private async Task>> ReadStates()
- {
- var query = new TableQuery()
- .Where(AzureStorageUtils.RangeQuery(this.partition, StateEntity.RK_MIN, StateEntity.RK_MAX));
- TableContinuationToken continuationToken = null;
- var results = new List>();
- do
- {
- TableQuerySegment queryResult = await table.ExecuteQuerySegmentedAsync(query, continuationToken).ConfigureAwait(false);
- foreach (var x in queryResult.Results)
- {
- results.Add(new KeyValuePair(x.SequenceId, x));
- };
- continuationToken = queryResult.ContinuationToken;
- } while (continuationToken != null);
- return results;
- }
-
- private class BatchOperation
- {
- private readonly TableBatchOperation batchOperation;
- private readonly ILogger logger;
- private readonly KeyEntity key;
- private readonly CloudTable table;
-
- private bool batchContainsKey;
-
- public BatchOperation(ILogger logger, KeyEntity key, CloudTable table)
- {
- this.batchOperation = new TableBatchOperation();
- this.logger = logger;
- this.key = key;
- this.table = table;
- }
-
- public async Task Add(TableOperation operation)
- {
- batchOperation.Add(operation);
-
- if (operation.Entity == key)
- {
- batchContainsKey = true;
- }
-
- if (batchOperation.Count == AzureTableConstants.MaxBatchSize - (batchContainsKey ? 0 : 1))
- {
- // the key serves as a synchronizer, to prevent modification by multiple grains under edge conditions,
- // like duplicate activations or deployments.Every batch write needs to include the key,
- // even if the key values don't change.
-
- if (!batchContainsKey)
- {
- if (string.IsNullOrEmpty(key.ETag))
- batchOperation.Insert(key);
- else
- batchOperation.Replace(key);
- }
-
- await Flush().ConfigureAwait(false);
-
- batchOperation.Clear();
- batchContainsKey = false;
- }
- }
-
- public async Task Flush()
- {
- if (batchOperation.Count > 0)
- try
- {
- await table.ExecuteBatchAsync(batchOperation).ConfigureAwait(false);
-
- batchOperation.Clear();
- batchContainsKey = false;
-
- if (logger.IsEnabled(LogLevel.Trace))
- {
- for (int i = 0; i < batchOperation.Count; i++)
- logger.LogTrace($"batch-op ok {i} PK={batchOperation[i].Entity.PartitionKey} RK={batchOperation[i].Entity.RowKey}");
- }
- }
- catch (Exception ex)
- {
- if (logger.IsEnabled(LogLevel.Trace))
- {
- for (int i = 0; i < batchOperation.Count; i++)
- logger.LogTrace($"batch-op failed {i} PK={batchOperation[i].Entity.PartitionKey} RK={batchOperation[i].Entity.RowKey}");
- }
-
- this.logger.LogError("Transactional state store failed {Exception}.", ex);
- throw;
- }
- }
- }
-
- }
-}
\ No newline at end of file
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/AzureTableTransactionalStateStorageFactory.cs b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/AzureTableTransactionalStateStorageFactory.cs
deleted file mode 100644
index 91890f6c3a..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/AzureTableTransactionalStateStorageFactory.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-using Microsoft.WindowsAzure.Storage.Table;
-using Newtonsoft.Json;
-using Orleans.Configuration;
-using Orleans.Runtime;
-using Orleans.Serialization;
-using Orleans.Transactions.Abstractions;
-using Orleans.Transactions.AzureStorage;
-
-namespace Orleans.Transactions.DistributedTM.AzureStorage
-{
- public class AzureTableTransactionalStateStorageFactory : ITransactionalStateStorageFactory, ILifecycleParticipant
- {
- private readonly string name;
- private readonly AzureTableTransactionalStateOptions options;
- private readonly ClusterOptions clusterOptions;
- private readonly JsonSerializerSettings jsonSettings;
- private readonly ILoggerFactory loggerFactory;
- private CloudTable table;
-
- public static ITransactionalStateStorageFactory Create(IServiceProvider services, string name)
- {
- IOptionsSnapshot optionsSnapshot = services.GetRequiredService>();
- return ActivatorUtilities.CreateInstance(services, name, optionsSnapshot.Get(name));
- }
-
- public AzureTableTransactionalStateStorageFactory(string name, AzureTableTransactionalStateOptions options, IOptions clusterOptions, ITypeResolver typeResolver, IGrainFactory grainFactory, ILoggerFactory loggerFactory)
- {
- this.name = name;
- this.options = options;
- this.clusterOptions = clusterOptions.Value;
- this.jsonSettings = OrleansJsonSerializer.GetDefaultSerializerSettings(typeResolver, grainFactory);
- this.loggerFactory = loggerFactory;
- }
-
- public ITransactionalStateStorage Create(string stateName, IGrainActivationContext context) where TState : class, new()
- {
- string partitionKey = MakePartitionKey(context, stateName);
- return ActivatorUtilities.CreateInstance>(context.ActivationServices, this.table, partitionKey, stateName, this.jsonSettings);
- }
-
- public void Participate(ISiloLifecycle lifecycle)
- {
- lifecycle.Subscribe(OptionFormattingUtilities.Name(this.name), this.options.InitStage, Init);
- }
-
- private string MakePartitionKey(IGrainActivationContext context, string stateName)
- {
- string grainKey = context.GrainInstance.GrainReference.ToKeyString();
- var key = $"ts_{this.clusterOptions.ServiceId}_{grainKey}_{stateName}";
- return AzureStorageUtils.SanitizeTableProperty(key);
- }
-
- private async Task CreateTable()
- {
- var tableManager = new AzureTableDataManager(this.options.TableName, this.options.ConnectionString, this.loggerFactory);
- await tableManager.InitTableAsync().ConfigureAwait(false);
- this.table = tableManager.Table;
- }
-
- private Task Init(CancellationToken cancellationToken)
- {
- return CreateTable();
- }
- }
-}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/KeyEntity.cs b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/KeyEntity.cs
deleted file mode 100644
index e8d1f63fc0..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/KeyEntity.cs
+++ /dev/null
@@ -1,17 +0,0 @@
-using Microsoft.WindowsAzure.Storage.Table;
-
-namespace Orleans.Transactions.DistributedTM.AzureStorage
-{
- internal class KeyEntity : TableEntity
- {
- public const string RK = "k";
-
- public KeyEntity()
- {
- this.RowKey = RK;
- }
-
- public long CommittedSequenceId { get; set; }
- public string Metadata { get; set; }
- }
-}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/StateEntity.cs b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/StateEntity.cs
deleted file mode 100644
index 0a7034c9ba..0000000000
--- a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/DistributedTM/StateEntity.cs
+++ /dev/null
@@ -1,54 +0,0 @@
-using Microsoft.WindowsAzure.Storage.Table;
-using Newtonsoft.Json;
-using Orleans.Transactions.Abstractions;
-using System;
-
-namespace Orleans.Transactions.DistributedTM.AzureStorage
-{
- internal class StateEntity : TableEntity
- {
- public static string MakeRowKey(long sequenceId)
- {
- return $"{RK_PREFIX}{sequenceId.ToString("x16")}";
- }
-
- public long SequenceId => long.Parse(RowKey.Substring(RK_PREFIX.Length));
-
- // row keys range from s0000000000000001 to s7fffffffffffffff
- public const string RK_PREFIX = "s";
- public const string RK_MIN = RK_PREFIX;
- public const string RK_MAX = RK_PREFIX + "~";
-
- public string TransactionId { get; set; }
-
- public DateTime TransactionTimestamp { get; set; }
-
- public string TransactionManager { get; set; }
-
- public string StateJson { get; set; }
-
- public static StateEntity Create(JsonSerializerSettings JsonSettings,
- string partitionKey, PendingTransactionState pendingState)
- where T : class, new()
- {
- return new StateEntity
- {
- PartitionKey = partitionKey,
- RowKey = MakeRowKey(pendingState.SequenceId),
- TransactionId = pendingState.TransactionId,
- TransactionTimestamp = pendingState.TimeStamp,
- TransactionManager = pendingState.TransactionManager,
- StateJson = JsonConvert.SerializeObject(pendingState.State, JsonSettings)
- };
- }
-
- public T GetState(JsonSerializerSettings JsonSettings)
- {
- return JsonConvert.DeserializeObject(this.StateJson, JsonSettings);
- }
- public void SetState(T state, JsonSerializerSettings JsonSettings)
- {
- StateJson = JsonConvert.SerializeObject(state, JsonSettings);
- }
- }
-}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/KeyEntity.cs b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/KeyEntity.cs
index 3b8e17b8fc..e1d158e80e 100644
--- a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/KeyEntity.cs
+++ b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/KeyEntity.cs
@@ -4,14 +4,14 @@ namespace Orleans.Transactions.AzureStorage
{
internal class KeyEntity : TableEntity
{
- public const string RK = "tsk";
+ public const string RK = "k";
public KeyEntity()
{
this.RowKey = RK;
}
- public string CommittedTransactionId { get; set; }
+ public long CommittedSequenceId { get; set; }
public string Metadata { get; set; }
}
}
diff --git a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/StateEntity.cs b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/StateEntity.cs
index 344b14c9cb..c7c090ff60 100644
--- a/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/StateEntity.cs
+++ b/src/Azure/Orleans.Transactions.AzureStorage/TransactionalState/StateEntity.cs
@@ -1,4 +1,5 @@
-using Microsoft.WindowsAzure.Storage.Table;
+using System;
+using Microsoft.WindowsAzure.Storage.Table;
using Newtonsoft.Json;
using Orleans.Transactions.Abstractions;
@@ -6,11 +7,24 @@ namespace Orleans.Transactions.AzureStorage
{
internal class StateEntity : TableEntity
{
- public const string RKMin = "ts_";
- public const string RKMax = "ts_~";
+ public static string MakeRowKey(long sequenceId)
+ {
+ return $"{RK_PREFIX}{sequenceId.ToString("x16")}";
+ }
+
+ public long SequenceId => long.Parse(RowKey.Substring(RK_PREFIX.Length));
+
+ // row keys range from s0000000000000001 to s7fffffffffffffff
+ public const string RK_PREFIX = "s_";
+ public const string RK_MIN = RK_PREFIX;
+ public const string RK_MAX = RK_PREFIX + "~";
public string TransactionId { get; set; }
- public long SequenceId { get; set; }
+
+ public DateTime TransactionTimestamp { get; set; }
+
+ public string TransactionManager { get; set; }
+
public string StateJson { get; set; }
public static StateEntity Create(JsonSerializerSettings JsonSettings,
@@ -20,21 +34,21 @@ public static StateEntity Create(JsonSerializerSettings JsonSettings,
return new StateEntity
{
PartitionKey = partitionKey,
- RowKey = MakeRowKey(pendingState.TransactionId, pendingState.SequenceId),
+ RowKey = MakeRowKey(pendingState.SequenceId),
TransactionId = pendingState.TransactionId,
- SequenceId = pendingState.SequenceId,
+ TransactionTimestamp = pendingState.TimeStamp,
+ TransactionManager = pendingState.TransactionManager,
StateJson = JsonConvert.SerializeObject(pendingState.State, JsonSettings)
};
}
- public static string MakeRowKey(string TransactionId, long sequenceId)
- {
- return AzureStorageUtils.SanitizeTableProperty($"{RKMin}{TransactionId}_{sequenceId.ToString("x16")}");
- }
-
public T GetState(JsonSerializerSettings JsonSettings)
{
return JsonConvert.DeserializeObject(this.StateJson, JsonSettings);
}
+ public void SetState(T state, JsonSerializerSettings JsonSettings)
+ {
+ StateJson = JsonConvert.SerializeObject(state, JsonSettings);
+ }
}
}
diff --git a/src/Orleans.Core/Properties/AssemblyInfo.cs b/src/Orleans.Core/Properties/AssemblyInfo.cs
index fb287ec22e..3287fae157 100644
--- a/src/Orleans.Core/Properties/AssemblyInfo.cs
+++ b/src/Orleans.Core/Properties/AssemblyInfo.cs
@@ -20,7 +20,6 @@
[assembly: InternalsVisibleTo("Orleans.TestingHost")]
[assembly: InternalsVisibleTo("Orleans.TestingHost.AppDomain")]
[assembly: InternalsVisibleTo("Orleans.TestingHost.Legacy")]
-[assembly: InternalsVisibleTo("Orleans.Transactions.DynamoDB")]
[assembly: InternalsVisibleTo("OrleansCounterControl")]
[assembly: InternalsVisibleTo("OrleansManager")]
[assembly: InternalsVisibleTo("OrleansProviders")]
diff --git a/src/Orleans.Core/Transactions/ITransactionAgent.cs b/src/Orleans.Core/Transactions/ITransactionAgent.cs
index f949e1e233..455c068faa 100644
--- a/src/Orleans.Core/Transactions/ITransactionAgent.cs
+++ b/src/Orleans.Core/Transactions/ITransactionAgent.cs
@@ -49,19 +49,5 @@ public interface ITransactionAgent
/// None.
/// This method is exception-free
void Abort(ITransactionInfo transactionInfo, OrleansTransactionAbortedException reason);
-
- ///
- /// Check if a transaction is known to have aborted.
- ///
- /// the id of the transaction
- /// true if the transaction is known to have aborted, false otherwise
- ///
- /// Note that the transaction could have aborted but this still returns false, if the agent
- /// did not learn about the outcome yet.
- /// This method is exception-free.
- ///
- bool IsAborted(long transactionId);
-
- long ReadOnlyTransactionId { get; }
}
}
diff --git a/src/Orleans.Core/Transactions/ITransactionInfo.cs b/src/Orleans.Core/Transactions/ITransactionInfo.cs
index 4d5f687628..e0e73b0e79 100644
--- a/src/Orleans.Core/Transactions/ITransactionInfo.cs
+++ b/src/Orleans.Core/Transactions/ITransactionInfo.cs
@@ -1,7 +1,4 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
+
namespace Orleans.Transactions
{
///
@@ -38,6 +35,4 @@ public interface ITransactionInfo
/// true if there are no orphans, false otherwise
bool ReconcilePending(out int numberOrphans);
}
-
-
}
diff --git a/src/Orleans.Core/Transactions/ITransactionalResource.cs b/src/Orleans.Core/Transactions/ITransactionalResource.cs
index 230dcd15c5..a7f59e829e 100644
--- a/src/Orleans.Core/Transactions/ITransactionalResource.cs
+++ b/src/Orleans.Core/Transactions/ITransactionalResource.cs
@@ -1,7 +1,7 @@
-using Orleans.Concurrency;
using System;
using System.Threading.Tasks;
+using Orleans.Concurrency;
namespace Orleans.Transactions
{
diff --git a/src/Orleans.Core/Transactions/TransactionContext.cs b/src/Orleans.Core/Transactions/TransactionContext.cs
index 509e716af2..300b1f321d 100644
--- a/src/Orleans.Core/Transactions/TransactionContext.cs
+++ b/src/Orleans.Core/Transactions/TransactionContext.cs
@@ -1,7 +1,5 @@
-using System;
using System.Collections.Generic;
-using System.Linq;
using Orleans.Runtime;
namespace Orleans.Transactions
diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
index 0ba717b4f7..684448865b 100644
--- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
+++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
@@ -228,11 +228,10 @@ internal static void AddDefaultServices(HostBuilderContext context, IServiceColl
services.AddFromExisting();
services.AddSingleton();
services.AddFromExisting();
-
+
// Transactions
- services.TryAddSingleton();
+ services.TryAddSingleton();
services.TryAddSingleton>(sp => () => sp.GetRequiredService());
- services.TryAddSingleton();
// Application Parts
var applicationPartManager = context.GetApplicationPartManager();
diff --git a/src/Orleans.Runtime/Transactions/DisabledTransactionAgent.cs b/src/Orleans.Runtime/Transactions/DisabledTransactionAgent.cs
new file mode 100644
index 0000000000..da81d20c3b
--- /dev/null
+++ b/src/Orleans.Runtime/Transactions/DisabledTransactionAgent.cs
@@ -0,0 +1,31 @@
+using Orleans.Transactions;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Orleans.Transactions
+{
+ internal class DisabledTransactionAgent : ITransactionAgent
+ {
+ public void Abort(ITransactionInfo transactionInfo, OrleansTransactionAbortedException reason)
+ {
+ throw new OrleansTransactionsDisabledException();
+ }
+
+ public Task Commit(ITransactionInfo transactionInfo)
+ {
+ throw new OrleansTransactionsDisabledException();
+ }
+
+ public Task Start()
+ {
+ return Task.CompletedTask;
+ }
+
+ public Task StartTransaction(bool readOnly, TimeSpan timeout)
+ {
+ throw new OrleansStartTransactionFailedException(new OrleansTransactionsDisabledException());
+ }
+ }
+}
diff --git a/src/Orleans.Runtime/Transactions/DisabledTransactionManagerService.cs b/src/Orleans.Runtime/Transactions/DisabledTransactionManagerService.cs
deleted file mode 100644
index 575186542f..0000000000
--- a/src/Orleans.Runtime/Transactions/DisabledTransactionManagerService.cs
+++ /dev/null
@@ -1,25 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using Orleans.Transactions;
-
-namespace Orleans.Transactions
-{
- internal class DisabledTransactionManagerService : ITransactionManagerService
- {
- public Task AbortTransaction(long transactionId, OrleansTransactionAbortedException reason)
- {
- throw new OrleansTransactionsDisabledException();
- }
-
- public Task CommitTransactions(List transactions, HashSet queries)
- {
- throw new OrleansTransactionsDisabledException();
- }
-
- public Task StartTransactions(List timeouts)
- {
- throw new OrleansTransactionsDisabledException();
- }
- }
-}
diff --git a/src/Orleans.Runtime/Transactions/ITransactionManagerService.cs b/src/Orleans.Runtime/Transactions/ITransactionManagerService.cs
deleted file mode 100644
index b89612522f..0000000000
--- a/src/Orleans.Runtime/Transactions/ITransactionManagerService.cs
+++ /dev/null
@@ -1,37 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-namespace Orleans.Transactions
-{
- public interface ITransactionManagerService
- {
- Task StartTransactions(List timeouts);
- Task CommitTransactions(List transactions, HashSet queries);
- Task AbortTransaction(long transactionId, OrleansTransactionAbortedException reason);
- }
-
- [Serializable]
- public struct CommitResult
- {
- public bool Success { get; set; }
-
- public OrleansTransactionAbortedException AbortingException { get; set; }
- }
-
- [Serializable]
- public class CommitTransactionsResponse
- {
- public long ReadOnlyTransactionId { get; set; }
- public long AbortLowerBound { get; set; }
- public Dictionary CommitResult { get; set; }
- }
-
- [Serializable]
- public class StartTransactionsResponse
- {
- public long ReadOnlyTransactionId { get; set; }
- public long AbortLowerBound { get; set; }
- public List TransactionId { get; set; }
- }
-}
diff --git a/src/Orleans.Runtime/Transactions/TransactionAgent.cs b/src/Orleans.Runtime/Transactions/TransactionAgent.cs
deleted file mode 100644
index ddfc8d41bf..0000000000
--- a/src/Orleans.Runtime/Transactions/TransactionAgent.cs
+++ /dev/null
@@ -1,476 +0,0 @@
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Logging;
-using Orleans.Runtime;
-using Orleans.Concurrency;
-using DateTime = System.DateTime;
-using Microsoft.Extensions.Options;
-using Orleans.Configuration;
-
-namespace Orleans.Transactions
-{
- internal class TransactionAgentMetrics
- {
- //TPS in current monitor window
- private const string BatchStartTransanctionsTPS = "TransactionAgent.BatchStartTransactions.TPS";
- //avg latency in current monitor window
- private const string AvgBatchStartTransactionsLatency = "TransactionAgent.BatchStartTransactions.AvgLatency";
- private const string AvgBatchStartTransactionsBatchSize = "TransactionAgent.BatchStartTransactions.AvgBatchSize";
- private const string BatchCommitTransactionsTPS = "TransactionAgent.BatchCommitTransactions.TPS";
- private const string AvgBatchCommitTransactionsLatency = "TransactionAgent.BatchCommitTransactions.AvgLatency";
- private const string AvgBatchCommitTransactionsSize = "TransactionAgent.BatchCommitTransactions.AvgBatchSize";
- internal int BatchStartTransactionsRequestCounter { get; set; }
-
- internal int BatchCommitTransactionsRequestsCounter { get; set; }
- internal TimeSpan BatchStartTransactionsRequestLatencyCounter { get; set; } = TimeSpan.Zero;
-
- internal TimeSpan BatchCommitTransactionsRequestLatencyCounter { get; set; } = TimeSpan.Zero;
- internal int BatchStartTransactionsRequestSizeCounter { get; set; }
- internal int BatchCommitTransactionsRequestSizeCounter { get; set; }
- private DateTime lastReportTime = DateTime.UtcNow;
- private ITelemetryProducer telemetryProducer;
- private PeriodicAction periodicMonitor;
-
- public TransactionAgentMetrics(ITelemetryProducer producer, TimeSpan interval)
- {
- this.telemetryProducer = producer;
- this.periodicMonitor = new PeriodicAction(interval, this.ReportMetrics);
- }
-
- public void TryReportMetrics()
- {
- this.periodicMonitor.TryAction(DateTime.UtcNow);
- }
-
- private void ResetCounters(DateTime lastReportTimeStamp)
- {
- //record last report time stamp
- lastReportTime = lastReportTimeStamp;
- this.BatchStartTransactionsRequestCounter = 0;
- this.BatchCommitTransactionsRequestsCounter = 0;
- this.BatchStartTransactionsRequestLatencyCounter = TimeSpan.Zero;
- this.BatchCommitTransactionsRequestLatencyCounter = TimeSpan.Zero;
- this.BatchCommitTransactionsRequestSizeCounter = 0;
- this.BatchStartTransactionsRequestSizeCounter = 0;
- }
-
- private void ReportMetrics()
- {
- if (this.telemetryProducer == null)
- return;
- var now = DateTime.UtcNow;
- var timeSinceLastReportInSeconds = Math.Max(1, (now - this.lastReportTime).TotalSeconds);
- //batch start metrics
- var batchStartTransactionTPS = BatchStartTransactionsRequestCounter / timeSinceLastReportInSeconds;
- this.telemetryProducer.TrackMetric(BatchStartTransanctionsTPS, batchStartTransactionTPS);
- if (BatchStartTransactionsRequestCounter > 0)
- {
- var avgBatchStartTransactionLatency = BatchStartTransactionsRequestLatencyCounter.Divide(BatchStartTransactionsRequestCounter);
- this.telemetryProducer.TrackMetric(AvgBatchStartTransactionsLatency,
- avgBatchStartTransactionLatency);
- var avgBatchStartSize = BatchStartTransactionsRequestSizeCounter / BatchStartTransactionsRequestCounter;
- this.telemetryProducer.TrackMetric(AvgBatchStartTransactionsBatchSize, avgBatchStartSize);
- }
-
- //batch commit metrics
- var batchCommitTransactionTPS = BatchCommitTransactionsRequestsCounter / timeSinceLastReportInSeconds;
- this.telemetryProducer.TrackMetric(BatchCommitTransactionsTPS, batchCommitTransactionTPS);
-
- if (BatchCommitTransactionsRequestsCounter > 0)
- {
- var avgBatchCommitTransactionLatency =
- BatchCommitTransactionsRequestLatencyCounter.Divide(BatchCommitTransactionsRequestsCounter);
- this.telemetryProducer.TrackMetric(AvgBatchCommitTransactionsLatency,
- avgBatchCommitTransactionLatency);
- var avgBatchCommitSzie =
- BatchCommitTransactionsRequestSizeCounter / BatchCommitTransactionsRequestsCounter;
- this.telemetryProducer.TrackMetric(AvgBatchCommitTransactionsSize, avgBatchCommitSzie);
- }
-
- this.ResetCounters(now);
- }
- }
-
- [Reentrant]
- internal class TransactionAgent : SystemTarget, ITransactionAgent, ITransactionAgentSystemTarget
- {
- private readonly ITransactionManagerService tmService;
-
- //private long abortSequenceNumber;
- private long abortLowerBound;
- private readonly ConcurrentDictionary abortedTransactions;
-
- private readonly ConcurrentQueue>> transactionStartQueue;
- private readonly ConcurrentQueue transactionCommitQueue;
- private readonly ConcurrentDictionary> commitCompletions;
- private readonly HashSet outstandingCommits;
-
- private readonly ILogger logger;
- private readonly ILoggerFactory loggerFactory;
- private IGrainTimer requestProcessor;
- private Task startTransactionsTask = Task.CompletedTask;
- private Task commitTransactionsTask = Task.CompletedTask;
-
- public long ReadOnlyTransactionId { get; private set; }
-
- //metrics related
- private TransactionAgentMetrics metrics;
- public TransactionAgent(
- ILocalSiloDetails siloDetails,
- ITransactionManagerService tmService,
- ILoggerFactory loggerFactory,
- ITelemetryProducer telemetryProducer,
- IOptions options)
- : base(Constants.TransactionAgentSystemTargetId, siloDetails.SiloAddress, loggerFactory)
- {
- logger = loggerFactory.CreateLogger();
- this.tmService = tmService;
- ReadOnlyTransactionId = 0;
- //abortSequenceNumber = 0;
- abortLowerBound = 0;
- this.loggerFactory = loggerFactory;
-
- abortedTransactions = new ConcurrentDictionary();
- transactionStartQueue = new ConcurrentQueue>>();
- transactionCommitQueue = new ConcurrentQueue();
- commitCompletions = new ConcurrentDictionary>();
- outstandingCommits = new HashSet();
- this.metrics = new TransactionAgentMetrics(telemetryProducer, options.Value.MetricsWritePeriod);
- }
-
- #region ITransactionAgent
-
- public async Task StartTransaction(bool readOnly, TimeSpan timeout)
- {
- if (readOnly)
- {
- return new TransactionInfo(ReadOnlyTransactionId, true);
- }
-
- TransactionsStatisticsGroup.OnTransactionStartRequest();
- var completion = new TaskCompletionSource();
- transactionStartQueue.Enqueue(new Tuple>(timeout, completion));
-
- long id = await completion.Task;
- return new TransactionInfo(id);
- }
-
- public async Task Commit(ITransactionInfo info)
- {
- var transactionInfo = (TransactionInfo)info;
-
- TransactionsStatisticsGroup.OnTransactionCommitRequest();
-
- if (transactionInfo.IsReadOnly)
- {
- return;
- }
-
- var completion = new TaskCompletionSource();
- bool canCommit = true;
-
- List> prepareTasks = new List>(transactionInfo.WriteSet.Count);
- foreach (var g in transactionInfo.WriteSet.Keys)
- {
- TransactionalResourceVersion write = TransactionalResourceVersion.Create(transactionInfo.TransactionId, transactionInfo.WriteSet[g]);
- TransactionalResourceVersion? read = null;
- if (transactionInfo.ReadSet.ContainsKey(g))
- {
- read = transactionInfo.ReadSet[g];
- transactionInfo.ReadSet.Remove(g);
- }
- prepareTasks.Add(g.Prepare(transactionInfo.TransactionId, write, read));
- }
-
- foreach (var g in transactionInfo.ReadSet.Keys)
- {
- TransactionalResourceVersion read = transactionInfo.ReadSet[g];
- prepareTasks.Add(g.Prepare(transactionInfo.TransactionId, null, read));
- }
-
- await Task.WhenAll(prepareTasks);
- foreach (var t in prepareTasks)
- {
- if (!t.Result)
- {
- canCommit = false;
- }
- }
-
- if (!canCommit)
- {
- TransactionsStatisticsGroup.OnTransactionAborted();
- abortedTransactions.TryAdd(transactionInfo.TransactionId, 0);
- throw new OrleansPrepareFailedException(transactionInfo.TransactionId.ToString());
- }
- commitCompletions.TryAdd(transactionInfo.TransactionId, completion);
- transactionCommitQueue.Enqueue(transactionInfo);
- await completion.Task;
- }
-
- public void Abort(ITransactionInfo info, OrleansTransactionAbortedException reason)
- {
- var transactionInfo = (TransactionInfo)info;
-
- abortedTransactions.TryAdd(transactionInfo.TransactionId, 0);
- foreach (var g in transactionInfo.WriteSet.Keys)
- {
- g.Abort(transactionInfo.TransactionId).Ignore();
- }
-
- // TODO: should we wait for the abort tasks to complete before returning?
- // If so, how do we handle exceptions?
-
- // There is no guarantee that the WriteSet is complete and has all the grains.
- // Notify the TM of the abort as well.
- this.tmService.AbortTransaction(transactionInfo.TransactionId, reason).Ignore();
- }
-
- public bool IsAborted(long transactionId)
- {
- if (transactionId <= abortLowerBound)
- {
- return true;
- }
-
- return abortedTransactions.ContainsKey(transactionId) || transactionId < this.abortLowerBound;
- }
-
- #endregion
-
- private async Task ProcessRequests(object args)
- {
- // NOTE: This code is a bit complicated because we want to issue both start and commit requests,
- // but wait for each one separately in its own continuation. This can be significantly simplified
- // if we can register a separate timer for start and commit.
-
- List committingTransactions = new List();
- List startingTransactions = new List();
- List> startCompletions = new List>();
-
- while (transactionCommitQueue.Count > 0 || transactionStartQueue.Count > 0 || outstandingCommits.Count > 0)
- {
- this.metrics.TryReportMetrics();
- var initialAbortLowerBound = this.abortLowerBound;
-
- await Task.Yield();
- await WaitForWork();
-
- int startCount = transactionStartQueue.Count;
- while (startCount > 0 && startTransactionsTask.IsCompleted)
- {
- Tuple> elem;
- transactionStartQueue.TryDequeue(out elem);
- startingTransactions.Add(elem.Item1);
- startCompletions.Add(elem.Item2);
-
- startCount--;
- }
-
- int commitCount = transactionCommitQueue.Count;
- while (commitCount > 0 && commitTransactionsTask.IsCompleted)
- {
- TransactionInfo elem;
- transactionCommitQueue.TryDequeue(out elem);
- committingTransactions.Add(elem);
- outstandingCommits.Add(elem.TransactionId);
-
- commitCount--;
- }
-
-
- if (startingTransactions.Count > 0 && startTransactionsTask.IsCompleted)
- {
- logger.Debug(ErrorCode.Transactions_SendingTMRequest, "Calling TM to start {0} transactions", startingTransactions.Count);
-
- startTransactionsTask = this.StartTransactions(startingTransactions, startCompletions);
- }
-
- if ((committingTransactions.Count > 0 || outstandingCommits.Count > 0) && commitTransactionsTask.IsCompleted)
- {
- logger.Debug(ErrorCode.Transactions_SendingTMRequest, "Calling TM to commit {0} transactions", committingTransactions.Count);
-
- commitTransactionsTask = this.CommitTransactions(committingTransactions, outstandingCommits);
-
- // Removed transactions below the abort lower bound.
- if (this.abortLowerBound != initialAbortLowerBound)
- {
- foreach (var aborted in this.abortedTransactions)
- {
- if (aborted.Key < this.abortLowerBound)
- {
- long ignored;
- this.abortedTransactions.TryRemove(aborted.Key, out ignored);
- }
- }
- }
- }
- }
- this.metrics.TryReportMetrics();
-
- }
-
- private async Task CommitTransactions(List committingTransactions,
- HashSet outstandingCommits)
- {
- var stopWatch = Stopwatch.StartNew();
- try
- {
- metrics.BatchCommitTransactionsRequestsCounter++;
- metrics.BatchCommitTransactionsRequestSizeCounter += committingTransactions.Count;
- CommitTransactionsResponse commitResponse;
- try
- {
- commitResponse = await this.tmService.CommitTransactions(committingTransactions, outstandingCommits);
- }
- finally
- {
- stopWatch.Stop();
- metrics.BatchCommitTransactionsRequestLatencyCounter += stopWatch.Elapsed;
- }
-
- var commitResults = commitResponse.CommitResult;
-
- // reply to clients with the outcomes we received from the TM.
- foreach (var completedId in commitResults.Keys)
- {
- outstandingCommits.Remove(completedId);
-
- TaskCompletionSource completion;
- if (commitCompletions.TryRemove(completedId, out completion))
- {
- if (commitResults[completedId].Success)
- {
- TransactionsStatisticsGroup.OnTransactionCommitted();
- completion.SetResult(true);
- }
- else
- {
- if (commitResults[completedId].AbortingException != null)
- {
- TransactionsStatisticsGroup.OnTransactionAborted();
- completion.SetException(commitResults[completedId].AbortingException);
- }
- else
- {
- TransactionsStatisticsGroup.OnTransactionInDoubt();
- completion.SetException(new OrleansTransactionInDoubtException(completedId.ToString()));
- }
- }
- }
- }
-
- // Refresh cached values using new values from TM.
- this.ReadOnlyTransactionId = Math.Max(this.ReadOnlyTransactionId,
- commitResponse.ReadOnlyTransactionId);
- this.abortLowerBound = Math.Max(this.abortLowerBound, commitResponse.AbortLowerBound);
- logger.Debug(ErrorCode.Transactions_ReceivedTMResponse,
- "{0} transactions committed. readOnlyTransactionId {1}, abortLowerBound {2}",
- committingTransactions.Count, ReadOnlyTransactionId, abortLowerBound);
- }
- catch (Exception e)
- {
- logger.Error(ErrorCode.Transactions_TMError, "TM Error", e);
- // Propagate the exception to every transaction in the request.
- foreach (var tx in committingTransactions)
- {
- TransactionsStatisticsGroup.OnTransactionInDoubt();
-
- TaskCompletionSource completion;
- if (commitCompletions.TryRemove(tx.TransactionId, out completion))
- {
- outstandingCommits.Remove(tx.TransactionId);
- completion.SetException(new OrleansTransactionInDoubtException(tx.TransactionId.ToString()));
- }
- }
- }
-
- committingTransactions.Clear();
-
- }
-
- private async Task StartTransactions(List startingTransactions, List> startCompletions)
- {
- var stopWatch = Stopwatch.StartNew();
- try
- {
- metrics.BatchStartTransactionsRequestCounter++;
- metrics.BatchStartTransactionsRequestSizeCounter += startingTransactions.Count;
- StartTransactionsResponse startResponse;
- try
- {
- startResponse = await this.tmService.StartTransactions(startingTransactions);
- }
- finally
- {
- stopWatch.Stop();
- metrics.BatchStartTransactionsRequestLatencyCounter += stopWatch.Elapsed;
- }
- List startedIds = startResponse.TransactionId;
-
- // reply to clients with results
- for (int i = 0; i < startCompletions.Count; i++)
- {
- TransactionsStatisticsGroup.OnTransactionStarted();
- startCompletions[i].SetResult(startedIds[i]);
- }
-
- // Refresh cached values using new values from TM.
- this.ReadOnlyTransactionId = Math.Max(this.ReadOnlyTransactionId, startResponse.ReadOnlyTransactionId);
- this.abortLowerBound = Math.Max(this.abortLowerBound, startResponse.AbortLowerBound);
- logger.Debug(ErrorCode.Transactions_ReceivedTMResponse,
- "{0} Transactions started. readOnlyTransactionId {1}, abortLowerBound {2}",
- startingTransactions.Count, ReadOnlyTransactionId, abortLowerBound);
- }
- catch (Exception e)
- {
- logger.Error(ErrorCode.Transactions_TMError, "Transaction manager failed to start transactions.", e);
-
- foreach (var completion in startCompletions)
- {
- TransactionsStatisticsGroup.OnTransactionStartFailed();
- completion.SetException(new OrleansStartTransactionFailedException(e));
- }
- }
-
- startingTransactions.Clear();
- startCompletions.Clear();
- }
-
- private Task WaitForWork()
- {
- // Returns a task that can be waited on until the RequestProcessor has
- // actionable work. The purpose is to avoid looping indefinitely while waiting
- // for the outstanding start or commit requests to complete.
- List toWait = new List();
-
- if (transactionStartQueue.Count > 0)
- {
- toWait.Add(startTransactionsTask);
- }
-
- if (transactionCommitQueue.Count > 0)
- {
- toWait.Add(commitTransactionsTask);
- }
-
- if (toWait.Count == 0)
- {
- return Task.CompletedTask;
- }
-
- return Task.WhenAny(toWait);
- }
-
- public Task Start()
- {
- requestProcessor = GrainTimer.FromTaskCallback(this.RuntimeClient.Scheduler, this.loggerFactory.CreateLogger(), ProcessRequests, null, TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(10), "TransactionAgent");
- requestProcessor.Start();
- return Task.CompletedTask;
- }
-
- }
-}
diff --git a/src/Orleans.Runtime/Transactions/TransactionInfo.cs b/src/Orleans.Runtime/Transactions/TransactionInfo.cs
deleted file mode 100644
index 9dea80ab24..0000000000
--- a/src/Orleans.Runtime/Transactions/TransactionInfo.cs
+++ /dev/null
@@ -1,199 +0,0 @@
-using Orleans.Concurrency;
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-
-namespace Orleans.Transactions
-{
- [Serializable]
- public class TransactionInfo : ITransactionInfo
- {
- public TransactionInfo()
- {
- this.joined = new ConcurrentQueue();
- }
-
- public TransactionInfo(long id, bool readOnly = false)
- : this()
- {
- TransactionId = id;
- IsReadOnly = readOnly;
- IsAborted = false;
- PendingCalls = 0;
- ReadSet = new Dictionary();
- WriteSet = new Dictionary();
- DependentTransactions = new HashSet();
- }
-
- ///
- /// Constructor used when TransactionInfo is transferred to a request
- ///
- ///
- public TransactionInfo(TransactionInfo other)
- : this()
- {
- TransactionId = other.TransactionId;
- IsReadOnly = other.IsReadOnly;
- IsAborted = other.IsAborted;
- PendingCalls = 0;
- ReadSet = new Dictionary();
- WriteSet = new Dictionary();
- DependentTransactions = new HashSet();
- }
-
- public string Id => TransactionId.ToString();
-
- public long TransactionId { get; }
-
- public bool IsReadOnly { get; }
-
- public bool IsAborted { get; set; }
-
- public Dictionary ReadSet { get; }
- public Dictionary WriteSet { get; }
- public HashSet DependentTransactions { get; }
-
- [NonSerialized]
- public int PendingCalls;
-
- [NonSerialized]
- private readonly ConcurrentQueue joined;
-
- public ITransactionInfo Fork()
- {
- PendingCalls++;
- return new TransactionInfo(this);
- }
-
- public void Join(ITransactionInfo other)
- {
- this.joined.Enqueue((TransactionInfo)other);
- }
-
- ///
- /// Reconciles all pending calls that have join the transaction.
- ///
- /// true if there are no orphans, false otherwise
- public bool ReconcilePending(out int numberOrphans)
- {
- TransactionInfo trasactionInfo;
- while (this.joined.TryDequeue(out trasactionInfo))
- {
- Union(trasactionInfo);
- PendingCalls--;
- }
- numberOrphans = PendingCalls;
- return numberOrphans == 0;
- }
-
- private void Union(TransactionInfo other)
- {
- if (TransactionId != other.TransactionId)
- {
- IsAborted = true;
- string error = $"Attempting to perform union between different Transactions. Attempted union between Transactions {TransactionId} and {other.TransactionId}";
- throw new InvalidOperationException(error);
- }
-
- if (other.IsAborted)
- {
- IsAborted = true;
- }
-
- // Take a union of the ReadSets.
- foreach (var grain in other.ReadSet.Keys)
- {
- if (ReadSet.ContainsKey(grain))
- {
- if (ReadSet[grain] != other.ReadSet[grain])
- {
- // Conflict! Transaction must abort
- IsAborted = true;
- }
- }
- else
- {
- ReadSet.Add(grain, other.ReadSet[grain]);
- }
- }
-
- // Take a union of the WriteSets.
- foreach (var grain in other.WriteSet.Keys)
- {
- if (!WriteSet.ContainsKey(grain))
- {
- WriteSet[grain] = 0;
- }
-
- WriteSet[grain] += other.WriteSet[grain];
- }
-
- DependentTransactions.UnionWith(other.DependentTransactions);
- }
-
-
- public void RecordRead(ITransactionalResource transactionalResource, TransactionalResourceVersion readVersion, long stableVersion)
- {
- if (readVersion.TransactionId == TransactionId)
- {
- // Just reading our own write here.
- // Sanity check to see if there's a lost write.
- int resourceWriteNumber;
- if (WriteSet.TryGetValue(transactionalResource, out resourceWriteNumber)
- && resourceWriteNumber > readVersion.WriteNumber)
- {
- // Context has record of more writes than we have, some writes must be lost.
- throw new OrleansTransactionAbortedException(TransactionId.ToString(), "Lost Write");
- }
- }
- else
- {
- TransactionalResourceVersion resourceReadVersion;
- if (ReadSet.TryGetValue(transactionalResource, out resourceReadVersion)
- && resourceReadVersion != readVersion)
- {
- // Uh-oh. Read two different versions of the grain.
- throw new OrleansValidationFailedException(TransactionId.ToString());
- }
-
- ReadSet[transactionalResource] = readVersion;
-
- if (readVersion.TransactionId != TransactionId &&
- readVersion.TransactionId > stableVersion)
- {
- DependentTransactions.Add(readVersion.TransactionId);
- }
- }
- }
-
- public void RecordWrite(ITransactionalResource transactionalResource, TransactionalResourceVersion latestVersion, long stableVersion)
- {
- int writeNumber;
- WriteSet.TryGetValue(transactionalResource, out writeNumber);
- WriteSet[transactionalResource] = writeNumber + 1;
-
- if (latestVersion.TransactionId != TransactionId && latestVersion.TransactionId > stableVersion)
- {
- DependentTransactions.Add(latestVersion.TransactionId);
- }
- }
-
- ///
- /// For verbose tracing and debugging.
- ///
- public override string ToString()
- {
- return string.Join("",
- TransactionId,
- (IsReadOnly ? " RO" : ""),
- (IsAborted ? " Aborted" : ""),
- $" R{{{string.Join(",", ReadSet.Select(kvp => $"{kvp.Key.ToShortString()}.{kvp.Value}"))}}}",
- $" W{{{string.Join(",", WriteSet.Select(kvp => $"{kvp.Key.ToShortString()}.{TransactionId}#{kvp.Value}"))}}}",
- $" D{{{string.Join(",", DependentTransactions)}}}"
- );
- }
- }
-
-}
diff --git a/src/Orleans.Runtime/Transactions/TransactionsOptions.cs b/src/Orleans.Runtime/Transactions/TransactionsOptions.cs
deleted file mode 100644
index 0221ed5d03..0000000000
--- a/src/Orleans.Runtime/Transactions/TransactionsOptions.cs
+++ /dev/null
@@ -1,44 +0,0 @@
-using System;
-
-namespace Orleans.Configuration
-{
- public class TransactionsOptions
- {
- public TransactionsOptions()
- {
- UseDefaults();
- }
-
- ///
- /// The number of new Transaction Ids allocated on every write to the log.
- /// To avoid writing to log on every transaction start, transaction Ids are allocated in batches.
- ///
- public int TransactionIdAllocationBatchSize { get; set; }
- public const int DefaultTransactionIdAllocationBatchSize = 50000;
-
- ///
- /// A new batch of transaction Ids will be automatically allocated if the available ids drop below
- /// this threshold.
- ///
- public int AvailableTransactionIdThreshold { get; set; }
- public const int DefaultAvailableTransactionIdThreshold = 20000;
-
- ///
- /// How long to preserve a transaction record in the TM memory after the transaction has completed.
- /// This is used to answer queries about the outcome of the transaction.
- ///
- public TimeSpan TransactionRecordPreservationDuration { get; set; }
- public static readonly TimeSpan DefaultTransactionRecordPreservationDuration = TimeSpan.FromMinutes(1);
-
- public TimeSpan MetricsWritePeriod { get; set; }
- public static readonly TimeSpan DefaultMetricsWritePeriod = TimeSpan.FromSeconds(30);
-
- private void UseDefaults()
- {
- this.TransactionIdAllocationBatchSize = DefaultTransactionIdAllocationBatchSize;
- this.AvailableTransactionIdThreshold = DefaultAvailableTransactionIdThreshold;
- this.TransactionRecordPreservationDuration = DefaultTransactionRecordPreservationDuration;
- this.MetricsWritePeriod = DefaultMetricsWritePeriod;
- }
- }
-}
diff --git a/src/Orleans.Transactions/Abstractions/CommitRecord.cs b/src/Orleans.Transactions/Abstractions/CommitRecord.cs
deleted file mode 100644
index 935960dc88..0000000000
--- a/src/Orleans.Transactions/Abstractions/CommitRecord.cs
+++ /dev/null
@@ -1,19 +0,0 @@
-using System;
-using System.Collections.Generic;
-
-namespace Orleans.Transactions.Abstractions
-{
- [Serializable]
- public class CommitRecord
- {
- public CommitRecord()
- {
- Resources = new HashSet();
- }
-
- public long TransactionId { get; set; }
- public long LSN { get; set; }
-
- public HashSet Resources { get; set; }
- }
-}
diff --git a/src/Orleans.Transactions/DistributedTM/Protocol/TransactionParticipantExtensionExtensions.cs b/src/Orleans.Transactions/Abstractions/Extensions/TransactionParticipantExtensionExtensions.cs
similarity index 99%
rename from src/Orleans.Transactions/DistributedTM/Protocol/TransactionParticipantExtensionExtensions.cs
rename to src/Orleans.Transactions/Abstractions/Extensions/TransactionParticipantExtensionExtensions.cs
index ac5916c617..ed1971b886 100644
--- a/src/Orleans.Transactions/DistributedTM/Protocol/TransactionParticipantExtensionExtensions.cs
+++ b/src/Orleans.Transactions/Abstractions/Extensions/TransactionParticipantExtensionExtensions.cs
@@ -7,7 +7,7 @@
using Orleans.Runtime;
using Orleans.Serialization;
-namespace Orleans.Transactions.DistributedTM
+namespace Orleans.Transactions.Abstractions.Extensions
{
public static class TransactionParticipantExtensionExtensions
{
diff --git a/src/Orleans.Transactions/Abstractions/TransactionalExtensionExtensions.cs b/src/Orleans.Transactions/Abstractions/Extensions/TransactionalExtensionExtensions.cs
similarity index 97%
rename from src/Orleans.Transactions/Abstractions/TransactionalExtensionExtensions.cs
rename to src/Orleans.Transactions/Abstractions/Extensions/TransactionalExtensionExtensions.cs
index 2bfeb72352..d58ce5bff7 100644
--- a/src/Orleans.Transactions/Abstractions/TransactionalExtensionExtensions.cs
+++ b/src/Orleans.Transactions/Abstractions/Extensions/TransactionalExtensionExtensions.cs
@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Orleans.Concurrency;
-namespace Orleans.Transactions.Abstractions
+namespace Orleans.Transactions.Abstractions.Extensions
{
public static class TransactionalExtensionExtensions
{
diff --git a/src/Orleans.Transactions/Abstractions/INamedTransactionalStateStorageFactory.cs b/src/Orleans.Transactions/Abstractions/INamedTransactionalStateStorageFactory.cs
index 09f4bf0a08..174f959896 100644
--- a/src/Orleans.Transactions/Abstractions/INamedTransactionalStateStorageFactory.cs
+++ b/src/Orleans.Transactions/Abstractions/INamedTransactionalStateStorageFactory.cs
@@ -1,6 +1,4 @@
-using System;
-
namespace Orleans.Transactions.Abstractions
{
///
diff --git a/src/Orleans.Transactions/Abstractions/ITransactionLogStorage.cs b/src/Orleans.Transactions/Abstractions/ITransactionLogStorage.cs
deleted file mode 100644
index 825580cf78..0000000000
--- a/src/Orleans.Transactions/Abstractions/ITransactionLogStorage.cs
+++ /dev/null
@@ -1,60 +0,0 @@
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-namespace Orleans.Transactions.Abstractions
-{
- ///
- /// This interface provides the abstraction for various durable transaction log storages.
- ///
- public interface ITransactionLogStorage
- {
- /////
- ///// Gets the first CommitRecord in the log.
- /////
- /////
- ///// The CommitRecord with the lowest LSN in the log, or null if there is none.
- /////
- Task GetFirstCommitRecord();
-
- ///
- /// Returns the CommitRecord with LSN following the LSN of record returned by the last
- /// GetFirstcommitRecord() or GetNextCommitRecord() call.
- ///
- ///
- /// The next CommitRecord, or null if there is none.
- ///
- Task GetNextCommitRecord();
-
- ///
- /// Returns the first available transaction id for new transactions.
- ///
- ///
- /// This method helps to ensure that a given transaction id is never issued more than once.
- ///
- Task GetStartRecord();
-
- ///
- /// Update the start record with the value.
- ///
- /// Id of the transaction to update the start record with.
- ///
- Task UpdateStartRecord(long transactionId);
-
- ///
- /// Append the given records to the log in order
- ///
- /// Commit Records
- ///
- /// If an exception is thrown it is possible that a prefix of the records are persisted
- /// to the log.
- ///
- Task Append(IEnumerable commitRecords);
-
- ///
- /// Truncates the transaction log from the start until the given LSN provided in the