From aa015a75d23ae9146145068432f4d70a39f8058f Mon Sep 17 00:00:00 2001 From: Gutemberg Ribeiro Date: Sat, 13 Aug 2016 21:45:30 -0300 Subject: [PATCH 1/3] AWS DynamoDB Membership Provider --- .../Membership/DynamoDBGatewayListProvider.cs | 76 ++++ .../Membership/DynamoDBMembershipTable.cs | 350 ++++++++++++++++++ .../Membership/SiloInstanceRecord.cs | 210 +++++++++++ src/OrleansAWSUtils/OrleansAWSUtils.csproj | 6 + .../DynamoDBClientMetricsPublisher.cs | 95 +++++ .../DynamoDBSiloMetricsPublisher.cs | 118 ++++++ .../Statistics/DynamoDBStatisticsPublisher.cs | 138 +++++++ .../IPersistenceTestGrains.cs | 2 +- test/Tester/MembershipTests/LivenessTests.cs | 49 +++ test/Tester/Tester.csproj | 4 + .../DynamoDBMembershipTableTest.cs | 80 ++++ .../MembershipTableTestsBase.cs | 176 ++++++--- .../DynamoDBSiloInstanceManagerTests.cs | 239 ++++++++++++ test/TesterInternal/TesterInternal.csproj | 1 + 14 files changed, 1484 insertions(+), 60 deletions(-) create mode 100644 src/OrleansAWSUtils/Membership/DynamoDBGatewayListProvider.cs create mode 100644 src/OrleansAWSUtils/Membership/DynamoDBMembershipTable.cs create mode 100644 src/OrleansAWSUtils/Membership/SiloInstanceRecord.cs create mode 100644 src/OrleansAWSUtils/Statistics/DynamoDBClientMetricsPublisher.cs create mode 100644 src/OrleansAWSUtils/Statistics/DynamoDBSiloMetricsPublisher.cs create mode 100644 src/OrleansAWSUtils/Statistics/DynamoDBStatisticsPublisher.cs create mode 100644 test/TesterInternal/MembershipTests/DynamoDBMembershipTableTest.cs create mode 100644 test/TesterInternal/StorageTests/AWSUtils/DynamoDBSiloInstanceManagerTests.cs diff --git a/src/OrleansAWSUtils/Membership/DynamoDBGatewayListProvider.cs b/src/OrleansAWSUtils/Membership/DynamoDBGatewayListProvider.cs new file mode 100644 index 0000000000..7d8e890b58 --- /dev/null +++ b/src/OrleansAWSUtils/Membership/DynamoDBGatewayListProvider.cs @@ -0,0 +1,76 @@ +using Amazon.DynamoDBv2; +using Amazon.DynamoDBv2.Model; +using Orleans.Messaging; +using Orleans.Runtime.Configuration; +using OrleansAWSUtils.Storage; +using System; +using System.Collections.Generic; +using System.Net; +using System.Threading.Tasks; + +namespace Orleans.Runtime.MembershipService +{ + internal class DynamoDBGatewayListProvider : IGatewayListProvider + { + private const string TABLE_NAME_DEFAULT_VALUE = "OrleansSiloInstances"; + + private DynamoDBStorage storage; + private TimeSpan gatewayListRefreshPeriod; + private string deploymentId; + private readonly string INSTANCE_STATUS_ACTIVE = ((int)SiloStatus.Active).ToString(); + + #region Implementation of IGatewayListProvider + + public Task InitializeGatewayListProvider(ClientConfiguration conf, Logger logger) + { + gatewayListRefreshPeriod = conf.GatewayListRefreshPeriod; + deploymentId = conf.DeploymentId; + + storage = new DynamoDBStorage(conf.DataConnectionString, logger); + return storage.InitializeTable(TABLE_NAME_DEFAULT_VALUE, + new List + { + new KeySchemaElement { AttributeName = SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME, KeyType = KeyType.HASH }, + new KeySchemaElement { AttributeName = SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME, KeyType = KeyType.RANGE } + }, + new List + { + new AttributeDefinition { AttributeName = SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, + new AttributeDefinition { AttributeName = SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } + }); + } + + public async Task> GetGateways() + { + var expressionValues = new Dictionary + { + { $":{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(deploymentId) }, + { $":{SiloInstanceRecord.STATUS_PROPERTY_NAME}", new AttributeValue { N = INSTANCE_STATUS_ACTIVE } } + }; + var expression = $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} AND {SiloInstanceRecord.STATUS_PROPERTY_NAME} = :{SiloInstanceRecord.STATUS_PROPERTY_NAME}"; + var records = await storage.ScanAsync(TABLE_NAME_DEFAULT_VALUE, expressionValues, + expression, gateway => + { + return SiloAddress.New( + new IPEndPoint( + IPAddress.Parse(gateway[SiloInstanceRecord.ADDRESS_PROPERTY_NAME].S), + int.Parse(gateway[SiloInstanceRecord.PROXY_PORT_PROPERTY_NAME].N)), + int.Parse(gateway[SiloInstanceRecord.GENERATION_PROPERTY_NAME].N)).ToGatewayUri(); + }); + + return records; + } + + public TimeSpan MaxStaleness + { + get { return gatewayListRefreshPeriod; } + } + + public bool IsUpdatable + { + get { return true; } + } + + #endregion + } +} diff --git a/src/OrleansAWSUtils/Membership/DynamoDBMembershipTable.cs b/src/OrleansAWSUtils/Membership/DynamoDBMembershipTable.cs new file mode 100644 index 0000000000..14af17c8a2 --- /dev/null +++ b/src/OrleansAWSUtils/Membership/DynamoDBMembershipTable.cs @@ -0,0 +1,350 @@ +using Amazon.DynamoDBv2; +using Amazon.DynamoDBv2.Model; +using Orleans.Runtime.Configuration; +using OrleansAWSUtils.Storage; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace Orleans.Runtime.MembershipService +{ + internal class DynamoDBMembershipTable : IMembershipTable + { + //DynamoDB does not support the extended Membership Protocol and will always return the same table version information + private readonly TableVersion _tableVersion = new TableVersion(0, "0"); + + private const string TABLE_NAME_DEFAULT_VALUE = "OrleansSiloInstances"; + private const string CURRENT_ETAG_ALIAS = ":currentETag"; + + private Logger logger; + private DynamoDBStorage storage; + private string deploymentId; + + public Task InitializeMembershipTable(GlobalConfiguration config, bool tryInitTableVersion, Logger log) + { + logger = log; + deploymentId = config.DeploymentId; + storage = new DynamoDBStorage(config.DataConnectionString, log); + logger.Info(ErrorCode.MembershipBase, "Initializing AWS DynamoDB Membership Table"); + return storage.InitializeTable(TABLE_NAME_DEFAULT_VALUE, + new List + { + new KeySchemaElement { AttributeName = SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME, KeyType = KeyType.HASH }, + new KeySchemaElement { AttributeName = SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME, KeyType = KeyType.RANGE } + }, + new List + { + new AttributeDefinition { AttributeName = SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, + new AttributeDefinition { AttributeName = SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } + }); + } + + public async Task DeleteMembershipTableEntries(string deploymentId) + { + try + { + var keys = new Dictionary { { $":{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(deploymentId) } }; + var records = await storage.QueryAsync(TABLE_NAME_DEFAULT_VALUE, keys, $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", item => new SiloInstanceRecord(item)); + + var toDelete = new List>(); + foreach (var record in records) + { + toDelete.Add(record.GetKeys()); + } + + if (records.Count <= 25) + { + await storage.DeleteEntriesAsync(TABLE_NAME_DEFAULT_VALUE, toDelete); + } + else + { + List tasks = new List(); + foreach (var batch in toDelete.BatchIEnumerable(25)) + { + tasks.Add(storage.DeleteEntriesAsync(TABLE_NAME_DEFAULT_VALUE, batch)); + } + await Task.WhenAll(tasks); + } + } + catch (Exception exc) + { + logger.Error(ErrorCode.MembershipBase, string.Format("Unable to delete membership records on table {0} for deploymentId {1}: Exception={2}", + TABLE_NAME_DEFAULT_VALUE, deploymentId, exc)); + throw; + } + } + + public async Task ReadRow(SiloAddress siloAddress) + { + try + { + var keys = new Dictionary + { + { $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(deploymentId) }, + { $"{SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME}", new AttributeValue(SiloInstanceRecord.ConstructSiloIdentity(siloAddress)) } + }; + var entry = await storage.ReadSingleEntryAsync(TABLE_NAME_DEFAULT_VALUE, keys, fields => new SiloInstanceRecord(fields)); + MembershipTableData data = entry != null ? Convert(new List { entry }) : new MembershipTableData(_tableVersion); + if (logger.IsVerbose2) logger.Verbose2("Read my entry {0} Table=" + Environment.NewLine + "{1}", siloAddress.ToLongString(), data.ToString()); + return data; + } + catch (Exception exc) + { + logger.Warn(ErrorCode.MembershipBase, + $"Intermediate error reading silo entry for key {siloAddress.ToLongString()} from the table {TABLE_NAME_DEFAULT_VALUE}.", exc); + throw; + } + } + + public async Task ReadAll() + { + try + { + var keys = new Dictionary { { $":{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(deploymentId) } }; + var records = await storage.QueryAsync(TABLE_NAME_DEFAULT_VALUE, keys, $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", item => new SiloInstanceRecord(item)); + + MembershipTableData data = Convert(records); + if (logger.IsVerbose2) logger.Verbose2("ReadAll Table=" + Environment.NewLine + "{0}", data.ToString()); + + return data; + } + catch (Exception exc) + { + logger.Warn(ErrorCode.MembershipBase, + $"Intermediate error reading all silo entries {TABLE_NAME_DEFAULT_VALUE}.", exc); + throw; + } + } + + public async Task InsertRow(MembershipEntry entry, TableVersion tableVersion) + { + try + { + if (logger.IsVerbose) logger.Verbose("InsertRow entry = {0}", entry.ToFullString()); + var tableEntry = Convert(entry); + + bool result; + + try + { + var expression = $"attribute_not_exists({SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}) AND attribute_not_exists({SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME})"; + + await storage.PutEntryAsync(TABLE_NAME_DEFAULT_VALUE, tableEntry.GetFields(true), expression); + result = true; + } + catch (ConditionalCheckFailedException) + { + result = false; + } + + if (result == false) + logger.Warn(ErrorCode.MembershipBase, + $"Insert failed due to contention on the table. Will retry. Entry {entry.ToFullString()}"); + return result; + } + catch (Exception exc) + { + logger.Warn(ErrorCode.MembershipBase, + $"Intermediate error inserting entry {entry.ToFullString()} to the table {TABLE_NAME_DEFAULT_VALUE}.", exc); + throw; + } + } + + public async Task UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion) + { + try + { + if (logger.IsVerbose) logger.Verbose("UpdateRow entry = {0}, etag = {1}", entry.ToFullString(), etag); + var siloEntry = Convert(entry); + int currentEtag = 0; + int.TryParse(etag, out currentEtag); + siloEntry.ETag = currentEtag; + siloEntry.ETag++; + + bool result; + + try + { + var conditionalValues = new Dictionary { { CURRENT_ETAG_ALIAS, new AttributeValue { N = etag } } }; + var expression = $"{SiloInstanceRecord.ETAG_PROPERTY_NAME} = {CURRENT_ETAG_ALIAS}"; + await storage.UpsertEntryAsync(TABLE_NAME_DEFAULT_VALUE, siloEntry.GetKeys(), + siloEntry.GetFields(), expression, conditionalValues); + + result = true; + } + catch (ConditionalCheckFailedException) + { + result = false; + } + + if (result == false) + logger.Warn(ErrorCode.MembershipBase, + $"Update failed due to contention on the table. Will retry. Entry {entry.ToFullString()}, eTag {etag}"); + return result; + } + catch (Exception exc) + { + logger.Warn(ErrorCode.MembershipBase, + $"Intermediate error updating entry {entry.ToFullString()} to the table {TABLE_NAME_DEFAULT_VALUE}.", exc); + throw; + } + } + + public async Task UpdateIAmAlive(MembershipEntry entry) + { + try + { + if (logger.IsVerbose) logger.Verbose("Merge entry = {0}", entry.ToFullString()); + var siloEntry = ConvertPartial(entry); + siloEntry.ETag++; + var expression = $"attribute_exists({SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}) AND attribute_exists({SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME})"; + await storage.UpsertEntryAsync(TABLE_NAME_DEFAULT_VALUE, siloEntry.GetKeys(), siloEntry.GetFields(), expression); + } + catch (Exception exc) + { + logger.Warn(ErrorCode.MembershipBase, + $"Intermediate error updating IAmAlive field for entry {entry.ToFullString()} to the table {TABLE_NAME_DEFAULT_VALUE}.", exc); + throw; + } + } + + private MembershipTableData Convert(List entries) + { + try + { + var memEntries = new List>(); + + foreach (var tableEntry in entries) + { + try + { + MembershipEntry membershipEntry = Parse(tableEntry); + memEntries.Add(new Tuple(membershipEntry, tableEntry.ETag.ToString())); + } + catch (Exception exc) + { + logger.Error(ErrorCode.MembershipBase, + $"Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {tableEntry}. Ignoring this entry.", exc); + } + } + var data = new MembershipTableData(memEntries, _tableVersion); + return data; + } + catch (Exception exc) + { + logger.Error(ErrorCode.AzureTable_60, + $"Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {Utils.EnumerableToString(entries, e => e.ToString())}.", exc); + throw; + } + } + + private MembershipEntry Parse(SiloInstanceRecord tableEntry) + { + var parse = new MembershipEntry + { + HostName = tableEntry.HostName, + Status = (SiloStatus)tableEntry.Status + }; + + parse.ProxyPort = tableEntry.ProxyPort; + + parse.SiloAddress = SiloAddress.New(new IPEndPoint(IPAddress.Parse(tableEntry.Address), tableEntry.Port), tableEntry.Generation); + + if (!string.IsNullOrEmpty(tableEntry.SiloName)) + { + parse.SiloName = tableEntry.SiloName; + } + + parse.StartTime = !string.IsNullOrEmpty(tableEntry.StartTime) ? + LogFormatter.ParseDate(tableEntry.StartTime) : default(DateTime); + + parse.IAmAliveTime = !string.IsNullOrEmpty(tableEntry.IAmAliveTime) ? + LogFormatter.ParseDate(tableEntry.IAmAliveTime) : default(DateTime); + + var suspectingSilos = new List(); + var suspectingTimes = new List(); + + if (!string.IsNullOrEmpty(tableEntry.SuspectingSilos)) + { + string[] silos = tableEntry.SuspectingSilos.Split('|'); + foreach (string silo in silos) + { + suspectingSilos.Add(SiloAddress.FromParsableString(silo)); + } + } + + if (!string.IsNullOrEmpty(tableEntry.SuspectingTimes)) + { + string[] times = tableEntry.SuspectingTimes.Split('|'); + foreach (string time in times) + suspectingTimes.Add(LogFormatter.ParseDate(time)); + } + + if (suspectingSilos.Count != suspectingTimes.Count) + throw new OrleansException(String.Format("SuspectingSilos.Length of {0} as read from Azure table is not eqaul to SuspectingTimes.Length of {1}", suspectingSilos.Count, suspectingTimes.Count)); + + for (int i = 0; i < suspectingSilos.Count; i++) + parse.AddSuspector(suspectingSilos[i], suspectingTimes[i]); + + return parse; + } + + private SiloInstanceRecord Convert(MembershipEntry memEntry) + { + var tableEntry = new SiloInstanceRecord + { + DeploymentId = deploymentId, + Address = memEntry.SiloAddress.Endpoint.Address.ToString(), + Port = memEntry.SiloAddress.Endpoint.Port, + Generation = memEntry.SiloAddress.Generation, + HostName = memEntry.HostName, + Status = (int)memEntry.Status, + ProxyPort = memEntry.ProxyPort, + SiloName = memEntry.SiloName, + StartTime = LogFormatter.PrintDate(memEntry.StartTime), + IAmAliveTime = LogFormatter.PrintDate(memEntry.IAmAliveTime), + SiloIdentity = SiloInstanceRecord.ConstructSiloIdentity(memEntry.SiloAddress) + }; + + if (memEntry.SuspectTimes != null) + { + var siloList = new StringBuilder(); + var timeList = new StringBuilder(); + bool first = true; + foreach (var tuple in memEntry.SuspectTimes) + { + if (!first) + { + siloList.Append('|'); + timeList.Append('|'); + } + siloList.Append(tuple.Item1.ToParsableString()); + timeList.Append(LogFormatter.PrintDate(tuple.Item2)); + first = false; + } + + tableEntry.SuspectingSilos = siloList.ToString(); + tableEntry.SuspectingTimes = timeList.ToString(); + } + else + { + tableEntry.SuspectingSilos = string.Empty; + tableEntry.SuspectingTimes = string.Empty; + } + + return tableEntry; + } + + private SiloInstanceRecord ConvertPartial(MembershipEntry memEntry) + { + return new SiloInstanceRecord + { + DeploymentId = deploymentId, + IAmAliveTime = LogFormatter.PrintDate(memEntry.IAmAliveTime), + SiloIdentity = SiloInstanceRecord.ConstructSiloIdentity(memEntry.SiloAddress) + }; + } + } +} diff --git a/src/OrleansAWSUtils/Membership/SiloInstanceRecord.cs b/src/OrleansAWSUtils/Membership/SiloInstanceRecord.cs new file mode 100644 index 0000000000..a6f06795b2 --- /dev/null +++ b/src/OrleansAWSUtils/Membership/SiloInstanceRecord.cs @@ -0,0 +1,210 @@ +using Amazon.DynamoDBv2.Model; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net; +using System.Text; + +namespace Orleans.Runtime.MembershipService +{ + internal class SiloInstanceRecord + { + public const string DEPLOYMENT_ID_PROPERTY_NAME = "DeploymentId"; + public const string SILO_IDENTITY_PROPERTY_NAME = "SiloIdentity"; + public const string ETAG_PROPERTY_NAME = "ETag"; + public const string ADDRESS_PROPERTY_NAME = "Address"; + public const string PORT_PROPERTY_NAME = "Port"; + public const string GENERATION_PROPERTY_NAME = "Generation"; + public const string HOSTNAME_PROPERTY_NAME = "HostName"; + public const string STATUS_PROPERTY_NAME = "SiloStatus"; + public const string PROXY_PORT_PROPERTY_NAME = "ProxyPort"; + public const string SILO_NAME_PROPERTY_NAME = "SiloName"; + public const string INSTANCE_NAME_PROPERTY_NAME = "InstanceName"; + public const string SUSPECTING_SILOS_PROPERTY_NAME = "SuspectingSilos"; + public const string SUSPECTING_TIMES_PROPERTY_NAME = "SuspectingTimes"; + public const string START_TIME_PROPERTY_NAME = "StartTime"; + public const string I_AM_ALIVE_TIME_PROPERTY_NAME = "IAmAliveTime"; + internal const char Seperator = '-'; + + public SiloInstanceRecord() { } + + public SiloInstanceRecord(Dictionary fields) + { + if (fields.ContainsKey(DEPLOYMENT_ID_PROPERTY_NAME)) + DeploymentId = fields[DEPLOYMENT_ID_PROPERTY_NAME].S; + + if (fields.ContainsKey(SILO_IDENTITY_PROPERTY_NAME)) + SiloIdentity = fields[SILO_IDENTITY_PROPERTY_NAME].S; + + if (fields.ContainsKey(ADDRESS_PROPERTY_NAME)) + Address = fields[ADDRESS_PROPERTY_NAME].S; + + int port; + if (fields.ContainsKey(PORT_PROPERTY_NAME) && + int.TryParse(fields[PORT_PROPERTY_NAME].N, out port)) + Port = port; + + int generation; + if (fields.ContainsKey(GENERATION_PROPERTY_NAME) && + int.TryParse(fields[GENERATION_PROPERTY_NAME].N, out generation)) + Generation = generation; + + if (fields.ContainsKey(HOSTNAME_PROPERTY_NAME)) + HostName = fields[HOSTNAME_PROPERTY_NAME].S; + + int status; + if (fields.ContainsKey(STATUS_PROPERTY_NAME) && + int.TryParse(fields[STATUS_PROPERTY_NAME].N, out status)) + Status = status; + + int proxyPort; + if (fields.ContainsKey(PROXY_PORT_PROPERTY_NAME) && + int.TryParse(fields[PROXY_PORT_PROPERTY_NAME].N, out proxyPort)) + ProxyPort = proxyPort; + + if (fields.ContainsKey(SILO_NAME_PROPERTY_NAME)) + SiloName = fields[SILO_NAME_PROPERTY_NAME].S; + + if (fields.ContainsKey(SUSPECTING_SILOS_PROPERTY_NAME)) + SuspectingSilos = fields[SUSPECTING_SILOS_PROPERTY_NAME].S; + + if (fields.ContainsKey(SUSPECTING_TIMES_PROPERTY_NAME)) + SuspectingTimes = fields[SUSPECTING_TIMES_PROPERTY_NAME].S; + + if (fields.ContainsKey(START_TIME_PROPERTY_NAME)) + StartTime = fields[START_TIME_PROPERTY_NAME].S; + + if (fields.ContainsKey(I_AM_ALIVE_TIME_PROPERTY_NAME)) + IAmAliveTime = fields[I_AM_ALIVE_TIME_PROPERTY_NAME].S; + + int etag; + if (fields.ContainsKey(ETAG_PROPERTY_NAME) && + int.TryParse(fields[ETAG_PROPERTY_NAME].N, out etag)) + ETag = etag; + } + + public string DeploymentId { get; set; } + public string SiloIdentity { get; set; } + public string Address { get; set; } + public int Port { get; set; } + public int Generation { get; set; } + public string HostName { get; set; } + public int Status { get; set; } + public int ProxyPort { get; set; } + public string SiloName { get; set; } + public string SuspectingSilos { get; set; } + public string SuspectingTimes { get; set; } + public string StartTime { get; set; } + public string IAmAliveTime { get; set; } + public int ETag { get; set; } + + internal static SiloAddress UnpackRowKey(string rowKey) + { + var debugInfo = "UnpackRowKey"; + try + { +#if DEBUG + debugInfo = String.Format("UnpackRowKey: RowKey={0}", rowKey); + Trace.TraceInformation(debugInfo); +#endif + int idx1 = rowKey.IndexOf(Seperator); + int idx2 = rowKey.LastIndexOf(Seperator); +#if DEBUG + debugInfo = String.Format("UnpackRowKey: RowKey={0} Idx1={1} Idx2={2}", rowKey, idx1, idx2); +#endif + var addressStr = rowKey.Substring(0, idx1); + var portStr = rowKey.Substring(idx1 + 1, idx2 - idx1 - 1); + var genStr = rowKey.Substring(idx2 + 1); +#if DEBUG + debugInfo = String.Format("UnpackRowKey: RowKey={0} -> Address={1} Port={2} Generation={3}", rowKey, addressStr, portStr, genStr); + Trace.TraceInformation(debugInfo); +#endif + IPAddress address = IPAddress.Parse(addressStr); + int port = Int32.Parse(portStr); + int generation = Int32.Parse(genStr); + return SiloAddress.New(new IPEndPoint(address, port), generation); + } + catch (Exception exc) + { + throw new AggregateException("Error from " + debugInfo, exc); + } + } + + public override string ToString() + { + var sb = new StringBuilder(); + sb.Append("OrleansSilo ["); + sb.Append(" Deployment=").Append(DeploymentId); + sb.Append(" LocalEndpoint=").Append(Address); + sb.Append(" LocalPort=").Append(Port); + sb.Append(" Generation=").Append(Generation); + + sb.Append(" Host=").Append(HostName); + sb.Append(" Status=").Append(Status); + sb.Append(" ProxyPort=").Append(ProxyPort); + + sb.Append(" SiloName=").Append(SiloName); + + if (!string.IsNullOrEmpty(SuspectingSilos)) sb.Append(" SuspectingSilos=").Append(SuspectingSilos); + if (!string.IsNullOrEmpty(SuspectingTimes)) sb.Append(" SuspectingTimes=").Append(SuspectingTimes); + sb.Append(" StartTime=").Append(StartTime); + sb.Append(" IAmAliveTime=").Append(IAmAliveTime); + sb.Append("]"); + return sb.ToString(); + } + + public static string ConstructSiloIdentity(SiloAddress silo) + { + return string.Format("{0}-{1}-{2}", silo.Endpoint.Address, silo.Endpoint.Port, silo.Generation); + } + + public Dictionary GetKeys() + { + var keys = new Dictionary(); + keys.Add(DEPLOYMENT_ID_PROPERTY_NAME, new AttributeValue(DeploymentId)); + keys.Add(SILO_IDENTITY_PROPERTY_NAME, new AttributeValue($"{Address}-{Port}-{Generation}")); + return keys; + } + + public Dictionary GetFields(bool includeKeys = false) + { + var fields = new Dictionary(); + + if (includeKeys) + { + fields.Add(DEPLOYMENT_ID_PROPERTY_NAME, new AttributeValue(DeploymentId)); + fields.Add(SILO_IDENTITY_PROPERTY_NAME, new AttributeValue($"{Address}-{Port}-{Generation}")); + } + + if (!string.IsNullOrWhiteSpace(Address)) + fields.Add(ADDRESS_PROPERTY_NAME, new AttributeValue(Address)); + + fields.Add(PORT_PROPERTY_NAME, new AttributeValue { N = Port.ToString() }); + fields.Add(GENERATION_PROPERTY_NAME, new AttributeValue { N = Generation.ToString() }); + + if (!string.IsNullOrWhiteSpace(HostName)) + fields.Add(HOSTNAME_PROPERTY_NAME, new AttributeValue(HostName)); + + fields.Add(STATUS_PROPERTY_NAME, new AttributeValue { N = Status.ToString() }); + fields.Add(PROXY_PORT_PROPERTY_NAME, new AttributeValue { N = ProxyPort.ToString() }); + + if (!string.IsNullOrWhiteSpace(SiloName)) + fields.Add(SILO_NAME_PROPERTY_NAME, new AttributeValue(SiloName)); + + if (!string.IsNullOrWhiteSpace(SuspectingSilos)) + fields.Add(SUSPECTING_SILOS_PROPERTY_NAME, new AttributeValue(SuspectingSilos)); + + if (!string.IsNullOrWhiteSpace(SuspectingTimes)) + fields.Add(SUSPECTING_TIMES_PROPERTY_NAME, new AttributeValue(SuspectingTimes)); + + if (!string.IsNullOrWhiteSpace(StartTime)) + fields.Add(START_TIME_PROPERTY_NAME, new AttributeValue(StartTime)); + + if (!string.IsNullOrWhiteSpace(IAmAliveTime)) + fields.Add(I_AM_ALIVE_TIME_PROPERTY_NAME, new AttributeValue(IAmAliveTime)); + + fields.Add(ETAG_PROPERTY_NAME, new AttributeValue { N = ETag.ToString() }); + return fields; + } + } +} diff --git a/src/OrleansAWSUtils/OrleansAWSUtils.csproj b/src/OrleansAWSUtils/OrleansAWSUtils.csproj index 583153f9aa..baf681e60a 100644 --- a/src/OrleansAWSUtils/OrleansAWSUtils.csproj +++ b/src/OrleansAWSUtils/OrleansAWSUtils.csproj @@ -47,6 +47,12 @@ + + + + + + diff --git a/src/OrleansAWSUtils/Statistics/DynamoDBClientMetricsPublisher.cs b/src/OrleansAWSUtils/Statistics/DynamoDBClientMetricsPublisher.cs new file mode 100644 index 0000000000..d48ce6643e --- /dev/null +++ b/src/OrleansAWSUtils/Statistics/DynamoDBClientMetricsPublisher.cs @@ -0,0 +1,95 @@ +using Orleans.Runtime; +using OrleansAWSUtils.Storage; +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans.Runtime.Configuration; +using System.Net; +using Amazon.DynamoDBv2.Model; +using Amazon.DynamoDBv2; + +namespace Orleans.Providers +{ + public class DynamoDBClientMetricsPublisher : IClientMetricsDataPublisher + { + private const string TABLE_NAME_DEFAULT_VALUE = "OrleansClientMetrics"; + private string deploymentId; + private string clientId; + private IPAddress address; + private string hostName; + + private DynamoDBStorage storage; + private Logger logger; + + private const string DEPLOYMENT_ID_PROPERTY_NAME = "DeploymentId"; + private const string ADDRESS_PROPERTY_NAME = "Address"; + private const string CLIENT_ID_PROPERTY_NAME = "ClientId"; + private const string HOSTNAME_PROPERTY_NAME = "HostName"; + private const string CPU_USAGE_PROPERTY_NAME = "CPUUsage"; + private const string MEMORY_USAGE_PROPERTY_NAME = "MemoryUsage"; + private const string SEND_QUEUE_LENGTH_PROPERTY_NAME = "SendQueueLength"; + private const string RECEIVE_QUEUE_LENGTH_PROPERTY_NAME = "ReceiveQueueLength"; + private const string SENT_MESSAGES_PROPERTY_NAME = "SentMessages"; + private const string RECEIVED_MESSAGES_PROPERTY_NAME = "ReceivedMessages"; + private const string CONNECTED_GATEWAY_COUNT_PROPERTY_NAME = "ConnectedGatewayCount"; + + private readonly Dictionary metrics = new Dictionary + { + { DEPLOYMENT_ID_PROPERTY_NAME, new AttributeValue() }, + { ADDRESS_PROPERTY_NAME, new AttributeValue() }, + { CLIENT_ID_PROPERTY_NAME, new AttributeValue() }, + { HOSTNAME_PROPERTY_NAME, new AttributeValue() }, + { CPU_USAGE_PROPERTY_NAME, new AttributeValue() }, + { MEMORY_USAGE_PROPERTY_NAME, new AttributeValue() }, + { SEND_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, + { RECEIVE_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, + { SENT_MESSAGES_PROPERTY_NAME, new AttributeValue() }, + { RECEIVED_MESSAGES_PROPERTY_NAME, new AttributeValue() }, + { CONNECTED_GATEWAY_COUNT_PROPERTY_NAME, new AttributeValue() }, + }; + + public DynamoDBClientMetricsPublisher() + { + logger = LogManager.GetLogger(this.GetType().Name, LoggerType.Runtime); + } + + public Task Init(ClientConfiguration config, IPAddress address, string clientId) + { + deploymentId = config.DeploymentId; + this.clientId = clientId; + this.address = address; + hostName = config.DNSHostName; + storage = new DynamoDBStorage(config.DataConnectionString, logger); + + return storage.InitializeTable(TABLE_NAME_DEFAULT_VALUE, + new List + { + new KeySchemaElement { AttributeName = DEPLOYMENT_ID_PROPERTY_NAME, KeyType = KeyType.HASH }, + new KeySchemaElement { AttributeName = CLIENT_ID_PROPERTY_NAME, KeyType = KeyType.RANGE } + }, + new List + { + new AttributeDefinition { AttributeName = DEPLOYMENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, + new AttributeDefinition { AttributeName = CLIENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } + }); + } + + public Task ReportMetrics(IClientPerformanceMetrics metricsData) + { + metrics[DEPLOYMENT_ID_PROPERTY_NAME].S = deploymentId; + metrics[ADDRESS_PROPERTY_NAME].S = address.ToString(); + metrics[CLIENT_ID_PROPERTY_NAME].S = clientId; + metrics[HOSTNAME_PROPERTY_NAME].S = hostName; + metrics[CPU_USAGE_PROPERTY_NAME].N = metricsData.CpuUsage.ToString(); + metrics[MEMORY_USAGE_PROPERTY_NAME].N = metricsData.MemoryUsage.ToString(); + metrics[SEND_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.SendQueueLength.ToString(); + metrics[RECEIVE_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.ReceiveQueueLength.ToString(); + metrics[SENT_MESSAGES_PROPERTY_NAME].N = metricsData.SentMessages.ToString(); + metrics[RECEIVED_MESSAGES_PROPERTY_NAME].N = metricsData.ReceivedMessages.ToString(); + metrics[CONNECTED_GATEWAY_COUNT_PROPERTY_NAME].N = metricsData.ConnectedGatewayCount.ToString(); + + if (logger.IsVerbose) logger.Verbose("Updated client metrics table entry: {0}", Utils.DictionaryToString(metrics)); + + return storage.PutEntryAsync(TABLE_NAME_DEFAULT_VALUE, metrics); + } + } +} diff --git a/src/OrleansAWSUtils/Statistics/DynamoDBSiloMetricsPublisher.cs b/src/OrleansAWSUtils/Statistics/DynamoDBSiloMetricsPublisher.cs new file mode 100644 index 0000000000..c436bff3cc --- /dev/null +++ b/src/OrleansAWSUtils/Statistics/DynamoDBSiloMetricsPublisher.cs @@ -0,0 +1,118 @@ +using Orleans.Runtime; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using System.Net; +using OrleansAWSUtils.Storage; +using Amazon.DynamoDBv2.Model; +using Amazon.DynamoDBv2; + +namespace Orleans.Providers +{ + public class DynamoDBSiloMetricsPublisher : ISiloMetricsDataPublisher + { + private string deploymentId; + private SiloAddress siloAddress; + private string siloName; + private IPEndPoint gateway; + private string hostName; + + private DynamoDBStorage storage; + private Logger logger; + + private const string DEPLOYMENT_ID_PROPERTY_NAME = "DeploymentId"; + private const string ADDRESS_PROPERTY_NAME = "Address"; + private const string SILO_NAME_PROPERTY_NAME = "SiloName"; + private const string GATEWAY_ADDRESS_PROPERTY_NAME = "GatewayAddress"; + private const string HOSTNAME_PROPERTY_NAME = "HostName"; + private const string CPU_USAGE_PROPERTY_NAME = "CPUUsage"; + private const string MEMORY_USAGE_PROPERTY_NAME = "MemoryUsage"; + private const string ACTIVATIONS_PROPERTY_NAME = "Activations"; + private const string RECENTLY_USED_ACTIVATIONS_PROPERTY_NAME = "RecentlyUsedActivations"; + private const string SEND_QUEUE_LENGTH_PROPERTY_NAME = "SendQueueLength"; + private const string RECEIVE_QUEUE_LENGTH_PROPERTY_NAME = "ReceiveQueueLength"; + private const string REQUEST_QUEUE_LENGTH_PROPERTY_NAME = "RequestQueueLength"; + private const string SENT_MESSAGES_PROPERTY_NAME = "SentMessages"; + private const string RECEIVED_MESSAGES_PROPERTY_NAME = "ReceivedMessages"; + private const string LOAD_SHEDDING_PROPERTY_NAME = "LoadShedding"; + private const string CLIENT_COUNT_PROPERTY_NAME = "ClientCount"; + private const string TIMESTAMP_PROPERTY_NAME = "Timestamp"; + private const string TABLE_NAME_DEFAULT_VALUE = "OrleansSiloMetrics"; + + private readonly Dictionary metrics = new Dictionary + { + { DEPLOYMENT_ID_PROPERTY_NAME, new AttributeValue() }, + { ADDRESS_PROPERTY_NAME, new AttributeValue() }, + { SILO_NAME_PROPERTY_NAME, new AttributeValue() }, + { HOSTNAME_PROPERTY_NAME, new AttributeValue() }, + { GATEWAY_ADDRESS_PROPERTY_NAME, new AttributeValue() }, + { MEMORY_USAGE_PROPERTY_NAME, new AttributeValue() }, + { SEND_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, + { RECEIVE_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, + { SENT_MESSAGES_PROPERTY_NAME, new AttributeValue() }, + { RECEIVED_MESSAGES_PROPERTY_NAME, new AttributeValue() }, + { RECENTLY_USED_ACTIVATIONS_PROPERTY_NAME, new AttributeValue() }, + { CPU_USAGE_PROPERTY_NAME, new AttributeValue() }, + { ACTIVATIONS_PROPERTY_NAME, new AttributeValue() }, + { REQUEST_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, + { LOAD_SHEDDING_PROPERTY_NAME, new AttributeValue() }, + { CLIENT_COUNT_PROPERTY_NAME, new AttributeValue() }, + { TIMESTAMP_PROPERTY_NAME, new AttributeValue() }, + }; + + public DynamoDBSiloMetricsPublisher() + { + logger = LogManager.GetLogger(this.GetType().Name, LoggerType.Runtime); + } + + public Task Init(string deploymentId, string storageConnectionString, SiloAddress siloAddress, string siloName, IPEndPoint gateway, string hostName) + { + this.deploymentId = deploymentId; + this.siloAddress = siloAddress; + this.siloName = siloName; + this.gateway = gateway; + this.hostName = hostName; + storage = new DynamoDBStorage(storageConnectionString, logger); + + return storage.InitializeTable(TABLE_NAME_DEFAULT_VALUE, + new List + { + new KeySchemaElement { AttributeName = DEPLOYMENT_ID_PROPERTY_NAME, KeyType = KeyType.HASH }, + new KeySchemaElement { AttributeName = SILO_NAME_PROPERTY_NAME, KeyType = KeyType.RANGE } + }, + new List + { + new AttributeDefinition { AttributeName = DEPLOYMENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, + new AttributeDefinition { AttributeName = SILO_NAME_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } + }); + } + + public Task ReportMetrics(ISiloPerformanceMetrics metricsData) + { + metrics[DEPLOYMENT_ID_PROPERTY_NAME].S = deploymentId; + metrics[ADDRESS_PROPERTY_NAME].S = siloAddress.ToString(); + metrics[HOSTNAME_PROPERTY_NAME].S = hostName; + metrics[SILO_NAME_PROPERTY_NAME].S = siloName; + if (gateway != null) + { + metrics[GATEWAY_ADDRESS_PROPERTY_NAME].S = gateway.ToString(); + } + metrics[CPU_USAGE_PROPERTY_NAME].N = metricsData.CpuUsage.ToString(); + metrics[MEMORY_USAGE_PROPERTY_NAME].N = metricsData.MemoryUsage.ToString(); + metrics[SEND_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.SendQueueLength.ToString(); + metrics[RECEIVE_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.ReceiveQueueLength.ToString(); + metrics[RECENTLY_USED_ACTIVATIONS_PROPERTY_NAME].N = metricsData.RecentlyUsedActivationCount.ToString(); + metrics[SENT_MESSAGES_PROPERTY_NAME].N = metricsData.SentMessages.ToString(); + metrics[RECEIVED_MESSAGES_PROPERTY_NAME].N = metricsData.ReceivedMessages.ToString(); + metrics[ACTIVATIONS_PROPERTY_NAME].N = metricsData.ActivationCount.ToString(); + metrics[REQUEST_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.RequestQueueLength.ToString(); + metrics[LOAD_SHEDDING_PROPERTY_NAME].BOOL = metricsData.IsOverloaded; + metrics[CLIENT_COUNT_PROPERTY_NAME].N = metricsData.ClientCount.ToString(); + metrics[TIMESTAMP_PROPERTY_NAME].S = DateTime.UtcNow.ToString(); + + if (logger.IsVerbose) logger.Verbose("Updated silo metrics table entry: {0}", Utils.DictionaryToString(metrics)); + + return storage.PutEntryAsync(TABLE_NAME_DEFAULT_VALUE, metrics); + } + } +} diff --git a/src/OrleansAWSUtils/Statistics/DynamoDBStatisticsPublisher.cs b/src/OrleansAWSUtils/Statistics/DynamoDBStatisticsPublisher.cs new file mode 100644 index 0000000000..d7e886a95f --- /dev/null +++ b/src/OrleansAWSUtils/Statistics/DynamoDBStatisticsPublisher.cs @@ -0,0 +1,138 @@ +using Amazon.DynamoDBv2; +using Amazon.DynamoDBv2.Model; +using Orleans.Runtime; +using OrleansAWSUtils.Storage; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Threading.Tasks; + +namespace Orleans.Providers +{ + public class DynamoDBStatisticsPublisher : IStatisticsPublisher + { + private const string PARTITION_KEY_PROPERTY_NAME = "PartitionKey"; + private const string ROW_KEY_PROPERTY_NAME = "RowKey"; + private const string DEPLOYMENT_ID_PROPERTY_NAME = "DeploymentId"; + private const string TIME_PROPERTY_NAME = "Time"; + private const string ADDRESS_PROPERTY_NAME = "Address"; + private const string NAME_PROPERTY_NAME = "Name"; + private const string HOSTNAME_PROPERTY_NAME = "HostName"; + private const string STATISTIC_PROPERTY_NAME = "Statistic"; + private const string STATVALUE_PROPERTY_NAME = "StatValue"; + private const string ISDELTA_PROPERTY_NAME = "IsDelta"; + private const string DATE_TIME_FORMAT = "yyyy-MM-dd-" + "HH:mm:ss.fff 'GMT'"; + + private string deploymentId; + private string address; + private string name; + private bool isSilo; + private long clientEpoch; + private int counter; + private string hostName; + private string tableName; + + private DynamoDBStorage storage; + private readonly Logger logger; + + public DynamoDBStatisticsPublisher() + { + logger = LogManager.GetLogger(this.GetType().Name, LoggerType.Runtime); + } + + public Task Init(bool isSilo, string storageConnectionString, string deploymentId, string address, string siloName, string hostName) + { + this.deploymentId = deploymentId; + this.address = address; + name = isSilo ? siloName : hostName; + this.hostName = hostName; + this.isSilo = isSilo; + if (!this.isSilo) + { + clientEpoch = SiloAddress.AllocateNewGeneration(); + } + counter = 0; + tableName = isSilo ? "OrleansSiloStatistics" : "OrleansClientStatistics"; + + storage = new DynamoDBStorage(storageConnectionString, logger); + + return storage.InitializeTable(tableName, + new List + { + new KeySchemaElement { AttributeName = PARTITION_KEY_PROPERTY_NAME, KeyType = KeyType.HASH }, + new KeySchemaElement { AttributeName = ROW_KEY_PROPERTY_NAME, KeyType = KeyType.RANGE } + }, + new List + { + new AttributeDefinition { AttributeName = PARTITION_KEY_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, + new AttributeDefinition { AttributeName = ROW_KEY_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } + }); + } + + public Task ReportStats(List statsCounters) + { + try + { + var toWrite = new List>(); + foreach (var counter in statsCounters) + { + var fields = ParseCounter(counter); + if (fields == null) + continue; + + toWrite.Add(fields); + } + + if (toWrite.Count <= 25) + { + return storage.DeleteEntriesAsync(tableName, toWrite); + } + else + { + var tasks = new List(); + foreach (var batch in toWrite.BatchIEnumerable(25)) + { + tasks.Add(storage.DeleteEntriesAsync(tableName, batch)); + } + return Task.WhenAll(tasks); + } + } + catch (Exception exc) + { + logger.Error(ErrorCode.PerfStatistics, string.Format("Unable to write statistics records on table {0} for deploymentId {1}: Exception={2}", + tableName, deploymentId, exc)); + throw; + } + } + + private Dictionary ParseCounter(ICounter statsCounter) + { + string statValue = statsCounter.IsValueDelta ? statsCounter.GetDeltaString() : statsCounter.GetValueString(); + if ("0".Equals(statValue)) + { + return null; + } + + counter++; + var now = DateTime.UtcNow; + var ticks = DateTime.MaxValue.Ticks - now.Ticks; + + return new Dictionary + { + // PartitionKey: DeploymentId$ReverseTimestampToTheNearestHour + // RowKey: ReverseTimestampToTheNearestSecond$Name$counter + // As defined in http://dotnet.github.io/orleans/Runtime-Implementation-Details/Runtime-Tables + { PARTITION_KEY_PROPERTY_NAME, new AttributeValue(string.Join("$", deploymentId, string.Format("{0:d19}", ticks - ticks % TimeSpan.TicksPerHour))) }, + { ROW_KEY_PROPERTY_NAME, new AttributeValue(string.Join("$", string.Format("{0:d19}", ticks), name, string.Format("{0:000000}", counter))) }, + { DEPLOYMENT_ID_PROPERTY_NAME, new AttributeValue(deploymentId) }, + { TIME_PROPERTY_NAME, new AttributeValue(now.ToString(DATE_TIME_FORMAT, CultureInfo.InvariantCulture)) }, + { ADDRESS_PROPERTY_NAME, new AttributeValue(address)}, + { NAME_PROPERTY_NAME, new AttributeValue(name)}, + { HOSTNAME_PROPERTY_NAME, new AttributeValue(hostName)}, + { STATISTIC_PROPERTY_NAME, new AttributeValue(statsCounter.Name)}, + { ISDELTA_PROPERTY_NAME, new AttributeValue { BOOL = statsCounter.IsValueDelta } }, + { STATVALUE_PROPERTY_NAME, new AttributeValue { N = statValue } } + }; + } + } +} diff --git a/test/TestGrainInterfaces/IPersistenceTestGrains.cs b/test/TestGrainInterfaces/IPersistenceTestGrains.cs index de90547e0c..6e2e4c7cb6 100644 --- a/test/TestGrainInterfaces/IPersistenceTestGrains.cs +++ b/test/TestGrainInterfaces/IPersistenceTestGrains.cs @@ -19,7 +19,7 @@ public interface IPersistenceTestGrain : IGrainWithGuidKey } - public interface IPersistenceTestGenericGrain : IPersistenceTestGrain // IGrainWithGuidKey + public interface IPersistenceTestGenericGrain : IPersistenceTestGrain // IGrainWithGuidKeyaws { } // Task CheckStateInit(); // Task CheckProviderType(); diff --git a/test/Tester/MembershipTests/LivenessTests.cs b/test/Tester/MembershipTests/LivenessTests.cs index c732697607..edf0b62af0 100644 --- a/test/Tester/MembershipTests/LivenessTests.cs +++ b/test/Tester/MembershipTests/LivenessTests.cs @@ -273,6 +273,55 @@ public async Task Liveness_Azure_5_Kill_Silo_1_With_Timers() } } + public class LivenessTests_DynamoDB : LivenessTestsBase + { + public LivenessTests_DynamoDB(ITestOutputHelper output) : base(output) + { + } + + public override TestCluster CreateTestCluster() + { + var options = new TestClusterOptions(2); + options.ClusterConfiguration.Globals.DataConnectionString = "Service=http://localhost:8000;"; ; + options.ClusterConfiguration.Globals.LivenessType = GlobalConfiguration.LivenessProviderType.Custom; + options.ClusterConfiguration.Globals.MembershipTableAssembly = "OrleansAWSUtils"; + options.ClusterConfiguration.Globals.ReminderServiceType = GlobalConfiguration.ReminderServiceProviderType.Disabled; + options.ClusterConfiguration.PrimaryNode = null; + options.ClusterConfiguration.Globals.SeedNodes.Clear(); + return new TestCluster(options); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task Liveness_AWS_DynamoDB_1() + { + await Do_Liveness_OracleTest_1(); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task Liveness_AWS_DynamoDB_2_Restart_Primary() + { + await Do_Liveness_OracleTest_2(0); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task Liveness_AWS_DynamoDB_3_Restart_GW() + { + await Do_Liveness_OracleTest_2(1); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task Liveness_AWS_DynamoDB_4_Restart_Silo_1() + { + await Do_Liveness_OracleTest_2(2); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task Liveness_AWS_DynamoDB_5_Kill_Silo_1_With_Timers() + { + await Do_Liveness_OracleTest_2(2, false, true); + } + } + public class LivenessTests_ZK : LivenessTestsBase { public LivenessTests_ZK(ITestOutputHelper output) : base(output) diff --git a/test/Tester/Tester.csproj b/test/Tester/Tester.csproj index 4dbeeb1029..2a0e8a9b63 100644 --- a/test/Tester/Tester.csproj +++ b/test/Tester/Tester.csproj @@ -198,6 +198,10 @@ {e782dd19-51f7-4f66-8217-bacac33767e4} ClientGenerator + + {67738e6c-f292-46a2-994d-5b52e745205b} + OrleansAWSUtils + {792818ef-b3f8-4ce2-9886-4808713b15c4} OrleansAzureUtils diff --git a/test/TesterInternal/MembershipTests/DynamoDBMembershipTableTest.cs b/test/TesterInternal/MembershipTests/DynamoDBMembershipTableTest.cs new file mode 100644 index 0000000000..02390a1146 --- /dev/null +++ b/test/TesterInternal/MembershipTests/DynamoDBMembershipTableTest.cs @@ -0,0 +1,80 @@ +using Orleans; +using Orleans.Messaging; +using Orleans.Runtime; +using Orleans.Runtime.MembershipService; +using System.Threading.Tasks; +using UnitTests.StorageTests.AWSUtils; +using Xunit; + +namespace UnitTests.MembershipTests +{ + /// + /// Tests for operation of Orleans Membership Table using AWS DynamoDB - Requires access to external DynamoDB storage + /// + public class DynamoDBMembershipTableTest : MembershipTableTestsBase, IClassFixture + { + public DynamoDBMembershipTableTest(ConnectionStringFixture fixture) : base(fixture) + { + LogManager.AddTraceLevelOverride("DynamoDBDataManager", Severity.Verbose3); + LogManager.AddTraceLevelOverride("OrleansSiloInstanceManager", Severity.Verbose3); + LogManager.AddTraceLevelOverride("Storage", Severity.Verbose3); + } + + protected override IMembershipTable CreateMembershipTable(Logger logger) + { + return new DynamoDBMembershipTable(); + } + + protected override IGatewayListProvider CreateGatewayListProvider(Logger logger) + { + return new DynamoDBGatewayListProvider(); + } + + protected override string GetConnectionString() + { + return "Service=http://localhost:8000;"; + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task MembershipTable_DynamoDB_GetGateways() + { + await MembershipTable_GetGateways(false); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task MembershipTable_DynamoDB_ReadAll_EmptyTable() + { + await MembershipTable_ReadAll_EmptyTable(false); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task MembershipTable_DynamoDB_InsertRow() + { + await MembershipTable_InsertRow(false); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task MembershipTable_DynamoDB_ReadRow_Insert_Read() + { + await MembershipTable_ReadRow_Insert_Read(false); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task MembershipTable_DynamoDB_ReadAll_Insert_ReadAll() + { + await MembershipTable_ReadAll_Insert_ReadAll(false); + } + + [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] + public async Task MembershipTable_DynamoDB_UpdateRow() + { + await MembershipTable_UpdateRow(false); + } + + [Fact, TestCategory("Membership"), TestCategory("AWS")] + public async Task MembershipTable_DynamoDB_UpdateRowInParallel() + { + await MembershipTable_UpdateRowInParallel(false); + } + } +} diff --git a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs index 8d3ddb4ba5..69d70b8b66 100644 --- a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs +++ b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs @@ -64,7 +64,7 @@ public void Dispose() membershipTable.DeleteMembershipTableEntries(deploymentId).Wait(); } } - + protected abstract IGatewayListProvider CreateGatewayListProvider(Logger logger); protected abstract IMembershipTable CreateMembershipTable(Logger logger); protected abstract string GetConnectionString(); @@ -74,7 +74,7 @@ protected virtual string GetAdoInvariant() return null; } - protected async Task MembershipTable_GetGateways() + protected async Task MembershipTable_GetGateways(bool extendedProtocol = true) { var membershipEntries = Enumerable.Range(0, 10).Select(i => CreateMembershipEntryForTest()).ToArray(); @@ -91,9 +91,10 @@ protected async Task MembershipTable_GetGateways() foreach (var membershipEntry in membershipEntries) { Assert.True(await membershipTable.InsertRow(membershipEntry, version)); - version = (await membershipTable.ReadRow(membershipEntry.SiloAddress)).Version; + if (extendedProtocol) + version = (await membershipTable.ReadRow(membershipEntry.SiloAddress)).Version; } - + var gateways = await gatewayListProvider.GetGateways(); var entries = new List(gateways.Select(g => g.ToString())); @@ -102,19 +103,22 @@ protected async Task MembershipTable_GetGateways() Assert.True(entries.Contains(membershipEntries[9].SiloAddress.ToGatewayUri().ToString())); } - protected async Task MembershipTable_ReadAll_EmptyTable() + protected async Task MembershipTable_ReadAll_EmptyTable(bool extendedProtocol = true) { var data = await membershipTable.ReadAll(); Assert.NotNull(data); - logger.Info("Membership.ReadAll returned VableVersion={0} Data={1}", data.Version, data); + if (extendedProtocol) + { + logger.Info("Membership.ReadAll returned VableVersion={0} Data={1}", data.Version, data); - Assert.Equal(0, data.Members.Count); - Assert.NotNull(data.Version.VersionEtag); - Assert.Equal(0, data.Version.Version); + Assert.Equal(0, data.Members.Count); + Assert.NotNull(data.Version.VersionEtag); + Assert.Equal(0, data.Version.Version); + } } - protected async Task MembershipTable_InsertRow() + protected async Task MembershipTable_InsertRow(bool extendedProtocol = true) { var membershipEntry = CreateMembershipEntryForTest(); @@ -122,15 +126,22 @@ protected async Task MembershipTable_InsertRow() Assert.NotNull(data); Assert.Equal(0, data.Members.Count); - bool ok = await membershipTable.InsertRow(membershipEntry, data.Version.Next()); + TableVersion nextTableVersion = null; + if (extendedProtocol) + nextTableVersion = data.Version.Next(); + + bool ok = await membershipTable.InsertRow(membershipEntry, nextTableVersion); Assert.True(ok, "InsertRow failed"); data = await membershipTable.ReadAll(); - Assert.Equal(1, data.Version.Version); + + if (extendedProtocol) + Assert.Equal(1, data.Version.Version); + Assert.Equal(1, data.Members.Count); } - protected async Task MembershipTable_ReadRow_Insert_Read() + protected async Task MembershipTable_ReadRow_Insert_Read(bool extendedProtocol = true) { MembershipTableData data = await membershipTable.ReadAll(); @@ -138,7 +149,10 @@ protected async Task MembershipTable_ReadRow_Insert_Read() Assert.Equal(0, data.Members.Count); - TableVersion newTableVersion = data.Version.Next(); + TableVersion newTableVersion = null; + if (extendedProtocol) + newTableVersion = data.Version.Next(); + MembershipEntry newEntry = CreateMembershipEntryForTest(); bool ok = await membershipTable.InsertRow(newEntry, newTableVersion); @@ -147,13 +161,22 @@ protected async Task MembershipTable_ReadRow_Insert_Read() ok = await membershipTable.InsertRow(newEntry, newTableVersion); Assert.False(ok, "InsertRow should have failed - same entry, old table version"); - ok = await membershipTable.InsertRow(CreateMembershipEntryForTest(), newTableVersion); - Assert.False(ok, "InsertRow should have failed - new entry, old table version"); - + if (extendedProtocol) + { + ok = await membershipTable.InsertRow(CreateMembershipEntryForTest(), newTableVersion); + Assert.False(ok, "InsertRow should have failed - new entry, old table version"); + } + data = await membershipTable.ReadAll(); - Assert.Equal(1, data.Version.Version); - var nextTableVersion = data.Version.Next(); + if (extendedProtocol) + { + Assert.Equal(1, data.Version.Version); + } + + TableVersion nextTableVersion = null; + if (extendedProtocol) + nextTableVersion = data.Version.Next(); ok = await membershipTable.InsertRow(newEntry, nextTableVersion); Assert.False(ok, "InsertRow should have failed - duplicate entry"); @@ -163,16 +186,20 @@ protected async Task MembershipTable_ReadRow_Insert_Read() Assert.Equal(1, data.Members.Count); data = await membershipTable.ReadRow(newEntry.SiloAddress); - Assert.Equal(newTableVersion.Version, data.Version.Version); + if (extendedProtocol) + Assert.Equal(newTableVersion.Version, data.Version.Version); logger.Info("Membership.ReadRow returned VableVersion={0} Data={1}", data.Version, data); Assert.Equal(1, data.Members.Count); - Assert.NotNull(data.Version.VersionEtag); - Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag); - Assert.Equal(newTableVersion.Version, data.Version.Version); + if (extendedProtocol) + { + Assert.NotNull(data.Version.VersionEtag); + Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag); + Assert.Equal(newTableVersion.Version, data.Version.Version); + } var membershipEntry = data.Members[0].Item1; string eTag = data.Members[0].Item2; logger.Info("Membership.ReadRow returned MembershipEntry ETag={0} Entry={1}", eTag, membershipEntry); @@ -181,14 +208,17 @@ protected async Task MembershipTable_ReadRow_Insert_Read() Assert.NotNull(membershipEntry); } - protected async Task MembershipTable_ReadAll_Insert_ReadAll() + protected async Task MembershipTable_ReadAll_Insert_ReadAll(bool extendedProtocol = true) { MembershipTableData data = await membershipTable.ReadAll(); logger.Info("Membership.ReadAll returned VableVersion={0} Data={1}", data.Version, data); Assert.Equal(0, data.Members.Count); - TableVersion newTableVersion = data.Version.Next(); + TableVersion newTableVersion = null; + if (extendedProtocol) + newTableVersion = data.Version.Next(); + MembershipEntry newEntry = CreateMembershipEntryForTest(); bool ok = await membershipTable.InsertRow(newEntry, newTableVersion); @@ -199,9 +229,12 @@ protected async Task MembershipTable_ReadAll_Insert_ReadAll() Assert.Equal(1, data.Members.Count); - Assert.NotNull(data.Version.VersionEtag); - Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag); - Assert.Equal(newTableVersion.Version, data.Version.Version); + if (extendedProtocol) + { + Assert.NotNull(data.Version.VersionEtag); + Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag); + Assert.Equal(newTableVersion.Version, data.Version.Version); + } var membershipEntry = data.Members[0].Item1; string eTag = data.Members[0].Item2; @@ -211,12 +244,15 @@ protected async Task MembershipTable_ReadAll_Insert_ReadAll() Assert.NotNull(membershipEntry); } - protected async Task MembershipTable_UpdateRow() + protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) { var tableData = await membershipTable.ReadAll(); - Assert.NotNull(tableData.Version); - Assert.Equal(0, tableData.Version.Version); + if (extendedProtocol) + { + Assert.NotNull(tableData.Version); + Assert.Equal(0, tableData.Version.Version); + } Assert.Equal(0, tableData.Members.Count); for (int i = 1; i < 10; i++) @@ -230,7 +266,10 @@ protected async Task MembershipTable_UpdateRow() new Tuple(CreateSiloAddressForTest(), GetUtcNowWithSecondsResolution().AddSeconds(2)) }; - TableVersion tableVersion = tableData.Version.Next(); + TableVersion tableVersion = null; + if (extendedProtocol) + tableVersion = tableData.Version.Next(); + logger.Info("Calling InsertRow with Entry = {0} TableVersion = {1}", siloEntry, tableVersion); bool ok = await membershipTable.InsertRow(siloEntry, tableVersion); @@ -243,43 +282,50 @@ protected async Task MembershipTable_UpdateRow() Assert.NotNull(etagBefore); logger.Info("Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}", siloEntry, - etagBefore, tableVersion); - - ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); - - Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + etagBefore, tableVersion != null ? tableVersion.ToString() : "null"); + if (extendedProtocol) + { + ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); + Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + } tableData = await membershipTable.ReadAll(); - tableVersion = tableData.Version.Next(); + if (extendedProtocol) + tableVersion = tableData.Version.Next(); logger.Info("Calling UpdateRow with Entry = {0} correct eTag = {1} correct version={2}", siloEntry, - etagBefore, tableVersion); + etagBefore, tableVersion != null ? tableVersion.ToString() : "null"); + ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); Assert.True(ok, $"UpdateRow failed - Table Data = {tableData}"); - logger.Info("Calling UpdateRow with Entry = {0} old eTag = {1} old version={2}", siloEntry, - etagBefore, tableVersion); + if (extendedProtocol) + { + logger.Info("Calling UpdateRow with Entry = {0} old eTag = {1} old version={2}", siloEntry, + etagBefore, tableVersion != null ? tableVersion.ToString() : "null"); + ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); + Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + } - ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); - - Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); - tableData = await membershipTable.ReadAll(); - + var tuple = tableData.Get(siloEntry.SiloAddress); Assert.Equal(tuple.Item1.ToFullString(true), siloEntry.ToFullString(true)); var etagAfter = tuple.Item2; - logger.Info("Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}", siloEntry, - etagAfter, tableVersion); + if (extendedProtocol) + { + logger.Info("Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}", siloEntry, + etagAfter, tableVersion != null ? tableVersion.ToString() : "null"); - ok = await membershipTable.UpdateRow(siloEntry, etagAfter, tableVersion); + ok = await membershipTable.UpdateRow(siloEntry, etagAfter, tableVersion); - Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + } //var nextTableVersion = tableData.Version.Next(); @@ -297,19 +343,25 @@ protected async Task MembershipTable_UpdateRow() etagAfter = tableData.Get(siloEntry.SiloAddress).Item2; Assert.Equal(etagBefore, etagAfter); - Assert.NotNull(tableData.Version); - Assert.Equal(tableVersion.Version, tableData.Version.Version); + if (extendedProtocol) + { + Assert.NotNull(tableData.Version); + Assert.Equal(tableVersion.Version, tableData.Version.Version); + } Assert.Equal(i, tableData.Members.Count); } } - protected async Task MembershipTable_UpdateRowInParallel() + protected async Task MembershipTable_UpdateRowInParallel(bool extendedProtocol = true) { var tableData = await membershipTable.ReadAll(); var data = CreateMembershipEntryForTest(); - var newTableVer = tableData.Version.Next(); + TableVersion newTableVer = null; + + if (extendedProtocol) + tableData.Version.Next(); var insertions = Task.WhenAll(Enumerable.Range(1, 20).Select(i => membershipTable.InsertRow(data, newTableVer))); @@ -322,7 +374,11 @@ await Task.WhenAll(Enumerable.Range(1, 19).Select(async i => { var updatedTableData = await membershipTable.ReadAll(); var updatedRow = updatedTableData.Get(data.SiloAddress); - var tableVersion = updatedTableData.Version.Next(); + TableVersion tableVersion = null; + + if (extendedProtocol) + tableVersion = updatedTableData.Version.Next(); + await Task.Delay(10); done = await membershipTable.UpdateRow(updatedRow.Item1, updatedRow.Item2, tableVersion); } while (!done); @@ -330,19 +386,21 @@ await Task.WhenAll(Enumerable.Range(1, 19).Select(async i => tableData = await membershipTable.ReadAll(); - Assert.NotNull(tableData.Version); - Assert.Equal(20, tableData.Version.Version); + if (extendedProtocol) + { + Assert.NotNull(tableData.Version); + Assert.Equal(20, tableData.Version.Version); + } Assert.Equal(1, tableData.Members.Count); } - private static int generation; // Utility methods private static MembershipEntry CreateMembershipEntryForTest() { SiloAddress siloAddress = CreateSiloAddressForTest(); - + var membershipEntry = new MembershipEntry { SiloAddress = siloAddress, diff --git a/test/TesterInternal/StorageTests/AWSUtils/DynamoDBSiloInstanceManagerTests.cs b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBSiloInstanceManagerTests.cs new file mode 100644 index 0000000000..ea847aea84 --- /dev/null +++ b/test/TesterInternal/StorageTests/AWSUtils/DynamoDBSiloInstanceManagerTests.cs @@ -0,0 +1,239 @@ +using Orleans.Runtime; +using Orleans.Runtime.Configuration; +using OrleansAWSUtils.Providers.Membership; +using System; +using System.Globalization; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.StorageTests.AWSUtils +{ + public class DynamoDBSiloInstanceManagerTests : IClassFixture, IDisposable + { + public class Fixture + { + public Fixture() + { + LogManager.Initialize(new NodeConfiguration()); + } + } + + private string deploymentId; + private int generation; + private SiloAddress siloAddress; + private SiloInstanceTableEntry myEntry; + private OrleansSiloInstanceManager manager; + private readonly Logger logger; + private readonly ITestOutputHelper output; + + public DynamoDBSiloInstanceManagerTests(ITestOutputHelper output) + { + this.output = output; + logger = LogManager.GetLogger("SiloInstanceTableManagerTests", LoggerType.Application); + + deploymentId = "test-" + Guid.NewGuid(); + generation = SiloAddress.AllocateNewGeneration(); + siloAddress = SiloAddress.NewLocalAddress(generation); + + logger.Info("DeploymentId={0} Generation={1}", deploymentId, generation); + + logger.Info("Initializing SiloInstanceManager"); + manager = OrleansSiloInstanceManager.GetManager(deploymentId, "OrleansSiloInstances", "", "", "http://localhost:8000").Result; + } + + // Use TestCleanup to run code after each test has run + public void Dispose() + { + if (manager != null && SiloInstanceTableTestConstants.DeleteEntriesAfterTest) + { + TimeSpan timeout = SiloInstanceTableTestConstants.Timeout; + + logger.Info("TestCleanup Timeout={0}", timeout); + + //manager.DeleteTableEntries(deploymentId).Wait(); + + logger.Info("TestCleanup - Finished"); + manager = null; + } + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public void SiloInstanceTable_Op_RegisterSiloInstance() + { + RegisterSiloInstance(); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public void SiloInstanceTable_Op_ActivateSiloInstance() + { + RegisterSiloInstance(); + + manager.ActivateSiloInstance(myEntry); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public void SiloInstanceTable_Op_UnregisterSiloInstance() + { + RegisterSiloInstance(); + + manager.UnregisterSiloInstance(myEntry); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task SiloInstanceTable_Register_CheckData() + { + const string testName = "SiloInstanceTable_Register_CheckData"; + logger.Info("Start {0}", testName); + + RegisterSiloInstance(); + + var data = await FindSiloEntry(siloAddress); + + Assert.NotNull(data); // SiloInstanceTableEntry should not be null + Assert.True(data.ETag != 0); + + Assert.Equal(SiloInstanceTableTestConstants.INSTANCE_STATUS_CREATED, data.Status); + + CheckSiloInstanceTableEntry(myEntry, data); + logger.Info("End {0}", testName); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task SiloInstanceTable_Activate_CheckData() + { + RegisterSiloInstance(); + + manager.ActivateSiloInstance(myEntry); + + var data = await FindSiloEntry(siloAddress); + + Assert.NotNull(data); // SiloInstanceTableEntry should not be null + Assert.True(data.ETag != 0); + + Assert.Equal(SiloInstanceTableTestConstants.INSTANCE_STATUS_ACTIVE, data.Status); + + CheckSiloInstanceTableEntry(myEntry, data); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task SiloInstanceTable_Unregister_CheckData() + { + RegisterSiloInstance(); + + manager.UnregisterSiloInstance(myEntry); + + var data = await FindSiloEntry(siloAddress); + + Assert.NotNull(data); // SiloInstanceTableEntry should not be null + Assert.True(data.ETag != 0); + + Assert.Equal(SiloInstanceTableTestConstants.INSTANCE_STATUS_DEAD, data.Status); + + CheckSiloInstanceTableEntry(myEntry, data); + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public async Task SiloInstanceTable_FindAllGatewayProxyEndpoints() + { + RegisterSiloInstance(); + + var gateways = await manager.FindAllGatewayProxyEndpoints(); + Assert.Equal(0, gateways.Count); // "Number of gateways before Silo.Activate" + + manager.ActivateSiloInstance(myEntry); + + gateways = await manager.FindAllGatewayProxyEndpoints(); + Assert.Equal(1, gateways.Count); // "Number of gateways after Silo.Activate" + + Uri myGateway = gateways.First(); + Assert.Equal(myEntry.Address, myGateway.Host.ToString()); // "Gateway address" + Assert.Equal(myEntry.ProxyPort, myGateway.Port.ToString(CultureInfo.InvariantCulture)); // "Gateway port" + } + + [Fact, TestCategory("Functional"), TestCategory("AWS"), TestCategory("Storage")] + public void SiloAddress_ToFrom_RowKey() + { + string ipAddress = "1.2.3.4"; + int port = 5555; + int generation = 6666; + + IPAddress address = IPAddress.Parse(ipAddress); + IPEndPoint endpoint = new IPEndPoint(address, port); + SiloAddress siloAddress = SiloAddress.New(endpoint, generation); + + string MembershipRowKey = SiloInstanceTableEntry.ConstructRowKey(siloAddress); + + output.WriteLine("SiloAddress = {0} Row Key string = {1}", siloAddress, MembershipRowKey); + + SiloAddress fromRowKey = SiloInstanceTableEntry.UnpackRowKey(MembershipRowKey); + + output.WriteLine("SiloAddress result = {0} From Row Key string = {1}", fromRowKey, MembershipRowKey); + + Assert.Equal(siloAddress, fromRowKey); + Assert.Equal(SiloInstanceTableEntry.ConstructRowKey(siloAddress), SiloInstanceTableEntry.ConstructRowKey(fromRowKey)); + } + + private void RegisterSiloInstance() + { + string partitionKey = deploymentId; + string rowKey = SiloInstanceTableEntry.ConstructRowKey(siloAddress); + + IPEndPoint myEndpoint = siloAddress.Endpoint; + + myEntry = new SiloInstanceTableEntry + { + PartitionKey = partitionKey, + RowKey = rowKey, + + DeploymentId = deploymentId, + Address = myEndpoint.Address.ToString(), + Port = myEndpoint.Port.ToString(CultureInfo.InvariantCulture), + Generation = generation.ToString(CultureInfo.InvariantCulture), + + HostName = myEndpoint.Address.ToString(), + ProxyPort = "30000", + + SiloName = "MyInstance", + StartTime = LogFormatter.PrintDate(DateTime.UtcNow), + }; + + logger.Info("MyEntry={0}", myEntry); + + manager.RegisterSiloInstance(myEntry); + } + + private async Task FindSiloEntry(SiloAddress siloAddr) + { + string partitionKey = deploymentId; + string rowKey = SiloInstanceTableEntry.ConstructRowKey(siloAddr); + + logger.Info("FindSiloEntry for SiloAddress={0} PartitionKey={1} RowKey={2}", siloAddr, partitionKey, rowKey); + + SiloInstanceTableEntry data = await manager.ReadSingleTableEntryAsync(partitionKey, rowKey); + + logger.Info("FindSiloEntry returning Data={0}", data); + return data; + } + + private void CheckSiloInstanceTableEntry(SiloInstanceTableEntry referenceEntry, SiloInstanceTableEntry entry) + { + Assert.Equal(referenceEntry.DeploymentId, entry.DeploymentId); + Assert.Equal(referenceEntry.Address, entry.Address); + Assert.Equal(referenceEntry.Port, entry.Port); + Assert.Equal(referenceEntry.Generation, entry.Generation); + Assert.Equal(referenceEntry.HostName, entry.HostName); + //Assert.Equal(referenceEntry.Status, entry.Status); + Assert.Equal(referenceEntry.ProxyPort, entry.ProxyPort); + Assert.Equal(referenceEntry.SiloName, entry.SiloName); + Assert.Equal(referenceEntry.StartTime, entry.StartTime); + Assert.Equal(referenceEntry.IAmAliveTime, entry.IAmAliveTime); + Assert.Equal(referenceEntry.MembershipVersion, entry.MembershipVersion); + + Assert.Equal(referenceEntry.SuspectingTimes, entry.SuspectingTimes); + Assert.Equal(referenceEntry.SuspectingSilos, entry.SuspectingSilos); + } + } +} diff --git a/test/TesterInternal/TesterInternal.csproj b/test/TesterInternal/TesterInternal.csproj index 69333b41dd..f2d8bbbf20 100644 --- a/test/TesterInternal/TesterInternal.csproj +++ b/test/TesterInternal/TesterInternal.csproj @@ -55,6 +55,7 @@ + From 25aae2f466c5af6f364e8c026183232d67d062ef Mon Sep 17 00:00:00 2001 From: Gutemberg Ribeiro Date: Sun, 14 Aug 2016 15:11:17 -0300 Subject: [PATCH 2/3] Changes to address feedback. --- .../Membership/DynamoDBMembershipTable.cs | 45 +++--- .../Membership/SiloInstanceRecord.cs | 45 ++---- src/OrleansAWSUtils/OrleansAWSUtils.csproj | 3 - .../DynamoDBClientMetricsPublisher.cs | 95 ------------ .../DynamoDBSiloMetricsPublisher.cs | 118 --------------- .../Statistics/DynamoDBStatisticsPublisher.cs | 138 ------------------ .../IPersistenceTestGrains.cs | 2 +- .../DynamoDBMembershipTableTest.cs | 4 +- .../MembershipTableTestsBase.cs | 74 +++------- 9 files changed, 62 insertions(+), 462 deletions(-) delete mode 100644 src/OrleansAWSUtils/Statistics/DynamoDBClientMetricsPublisher.cs delete mode 100644 src/OrleansAWSUtils/Statistics/DynamoDBSiloMetricsPublisher.cs delete mode 100644 src/OrleansAWSUtils/Statistics/DynamoDBStatisticsPublisher.cs diff --git a/src/OrleansAWSUtils/Membership/DynamoDBMembershipTable.cs b/src/OrleansAWSUtils/Membership/DynamoDBMembershipTable.cs index 14af17c8a2..71635ffe56 100644 --- a/src/OrleansAWSUtils/Membership/DynamoDBMembershipTable.cs +++ b/src/OrleansAWSUtils/Membership/DynamoDBMembershipTable.cs @@ -17,6 +17,7 @@ internal class DynamoDBMembershipTable : IMembershipTable private const string TABLE_NAME_DEFAULT_VALUE = "OrleansSiloInstances"; private const string CURRENT_ETAG_ALIAS = ":currentETag"; + private const int MAX_BATCH_SIZE = 25; private Logger logger; private DynamoDBStorage storage; @@ -54,19 +55,12 @@ public async Task DeleteMembershipTableEntries(string deploymentId) toDelete.Add(record.GetKeys()); } - if (records.Count <= 25) + List tasks = new List(); + foreach (var batch in toDelete.BatchIEnumerable(MAX_BATCH_SIZE)) { - await storage.DeleteEntriesAsync(TABLE_NAME_DEFAULT_VALUE, toDelete); - } - else - { - List tasks = new List(); - foreach (var batch in toDelete.BatchIEnumerable(25)) - { - tasks.Add(storage.DeleteEntriesAsync(TABLE_NAME_DEFAULT_VALUE, batch)); - } - await Task.WhenAll(tasks); + tasks.Add(storage.DeleteEntriesAsync(TABLE_NAME_DEFAULT_VALUE, batch)); } + await Task.WhenAll(tasks); } catch (Exception exc) { @@ -137,11 +131,10 @@ public async Task InsertRow(MembershipEntry entry, TableVersion tableVersi catch (ConditionalCheckFailedException) { result = false; - } - - if (result == false) logger.Warn(ErrorCode.MembershipBase, $"Insert failed due to contention on the table. Will retry. Entry {entry.ToFullString()}"); + } + return result; } catch (Exception exc) @@ -159,29 +152,33 @@ public async Task UpdateRow(MembershipEntry entry, string etag, TableVersi if (logger.IsVerbose) logger.Verbose("UpdateRow entry = {0}, etag = {1}", entry.ToFullString(), etag); var siloEntry = Convert(entry); int currentEtag = 0; - int.TryParse(etag, out currentEtag); - siloEntry.ETag = currentEtag; - siloEntry.ETag++; + if (!int.TryParse(etag, out currentEtag)) + { + logger.Warn(ErrorCode.MembershipBase, + $"Update failed. Invalid ETag value. Will retry. Entry {entry.ToFullString()}, eTag {etag}"); + return false; + } + + siloEntry.ETag = currentEtag + 1; bool result; try { var conditionalValues = new Dictionary { { CURRENT_ETAG_ALIAS, new AttributeValue { N = etag } } }; - var expression = $"{SiloInstanceRecord.ETAG_PROPERTY_NAME} = {CURRENT_ETAG_ALIAS}"; + var etagConditionalExpression = $"{SiloInstanceRecord.ETAG_PROPERTY_NAME} = {CURRENT_ETAG_ALIAS}"; await storage.UpsertEntryAsync(TABLE_NAME_DEFAULT_VALUE, siloEntry.GetKeys(), - siloEntry.GetFields(), expression, conditionalValues); + siloEntry.GetFields(), etagConditionalExpression, conditionalValues); result = true; } catch (ConditionalCheckFailedException) { result = false; - } - - if (result == false) logger.Warn(ErrorCode.MembershipBase, $"Update failed due to contention on the table. Will retry. Entry {entry.ToFullString()}, eTag {etag}"); + } + return result; } catch (Exception exc) @@ -198,9 +195,9 @@ public async Task UpdateIAmAlive(MembershipEntry entry) { if (logger.IsVerbose) logger.Verbose("Merge entry = {0}", entry.ToFullString()); var siloEntry = ConvertPartial(entry); - siloEntry.ETag++; + var fields = new Dictionary { { SiloInstanceRecord.I_AM_ALIVE_TIME_PROPERTY_NAME, new AttributeValue(siloEntry.IAmAliveTime) } }; var expression = $"attribute_exists({SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}) AND attribute_exists({SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME})"; - await storage.UpsertEntryAsync(TABLE_NAME_DEFAULT_VALUE, siloEntry.GetKeys(), siloEntry.GetFields(), expression); + await storage.UpsertEntryAsync(TABLE_NAME_DEFAULT_VALUE, siloEntry.GetKeys(),fields, expression); } catch (Exception exc) { diff --git a/src/OrleansAWSUtils/Membership/SiloInstanceRecord.cs b/src/OrleansAWSUtils/Membership/SiloInstanceRecord.cs index a6f06795b2..a976e0bce0 100644 --- a/src/OrleansAWSUtils/Membership/SiloInstanceRecord.cs +++ b/src/OrleansAWSUtils/Membership/SiloInstanceRecord.cs @@ -1,7 +1,6 @@ using Amazon.DynamoDBv2.Model; using System; using System.Collections.Generic; -using System.Diagnostics; using System.Net; using System.Text; @@ -26,6 +25,21 @@ internal class SiloInstanceRecord public const string I_AM_ALIVE_TIME_PROPERTY_NAME = "IAmAliveTime"; internal const char Seperator = '-'; + public string DeploymentId { get; set; } + public string SiloIdentity { get; set; } + public string Address { get; set; } + public int Port { get; set; } + public int Generation { get; set; } + public string HostName { get; set; } + public int Status { get; set; } + public int ProxyPort { get; set; } + public string SiloName { get; set; } + public string SuspectingSilos { get; set; } + public string SuspectingTimes { get; set; } + public string StartTime { get; set; } + public string IAmAliveTime { get; set; } + public int ETag { get; set; } + public SiloInstanceRecord() { } public SiloInstanceRecord(Dictionary fields) @@ -82,43 +96,16 @@ public SiloInstanceRecord(Dictionary fields) int.TryParse(fields[ETAG_PROPERTY_NAME].N, out etag)) ETag = etag; } - - public string DeploymentId { get; set; } - public string SiloIdentity { get; set; } - public string Address { get; set; } - public int Port { get; set; } - public int Generation { get; set; } - public string HostName { get; set; } - public int Status { get; set; } - public int ProxyPort { get; set; } - public string SiloName { get; set; } - public string SuspectingSilos { get; set; } - public string SuspectingTimes { get; set; } - public string StartTime { get; set; } - public string IAmAliveTime { get; set; } - public int ETag { get; set; } internal static SiloAddress UnpackRowKey(string rowKey) { - var debugInfo = "UnpackRowKey"; try { -#if DEBUG - debugInfo = String.Format("UnpackRowKey: RowKey={0}", rowKey); - Trace.TraceInformation(debugInfo); -#endif int idx1 = rowKey.IndexOf(Seperator); int idx2 = rowKey.LastIndexOf(Seperator); -#if DEBUG - debugInfo = String.Format("UnpackRowKey: RowKey={0} Idx1={1} Idx2={2}", rowKey, idx1, idx2); -#endif var addressStr = rowKey.Substring(0, idx1); var portStr = rowKey.Substring(idx1 + 1, idx2 - idx1 - 1); var genStr = rowKey.Substring(idx2 + 1); -#if DEBUG - debugInfo = String.Format("UnpackRowKey: RowKey={0} -> Address={1} Port={2} Generation={3}", rowKey, addressStr, portStr, genStr); - Trace.TraceInformation(debugInfo); -#endif IPAddress address = IPAddress.Parse(addressStr); int port = Int32.Parse(portStr); int generation = Int32.Parse(genStr); @@ -126,7 +113,7 @@ internal static SiloAddress UnpackRowKey(string rowKey) } catch (Exception exc) { - throw new AggregateException("Error from " + debugInfo, exc); + throw new AggregateException("Error from UnpackRowKey", exc); } } diff --git a/src/OrleansAWSUtils/OrleansAWSUtils.csproj b/src/OrleansAWSUtils/OrleansAWSUtils.csproj index baf681e60a..73412113a5 100644 --- a/src/OrleansAWSUtils/OrleansAWSUtils.csproj +++ b/src/OrleansAWSUtils/OrleansAWSUtils.csproj @@ -50,9 +50,6 @@ - - - diff --git a/src/OrleansAWSUtils/Statistics/DynamoDBClientMetricsPublisher.cs b/src/OrleansAWSUtils/Statistics/DynamoDBClientMetricsPublisher.cs deleted file mode 100644 index d48ce6643e..0000000000 --- a/src/OrleansAWSUtils/Statistics/DynamoDBClientMetricsPublisher.cs +++ /dev/null @@ -1,95 +0,0 @@ -using Orleans.Runtime; -using OrleansAWSUtils.Storage; -using System.Collections.Generic; -using System.Threading.Tasks; -using Orleans.Runtime.Configuration; -using System.Net; -using Amazon.DynamoDBv2.Model; -using Amazon.DynamoDBv2; - -namespace Orleans.Providers -{ - public class DynamoDBClientMetricsPublisher : IClientMetricsDataPublisher - { - private const string TABLE_NAME_DEFAULT_VALUE = "OrleansClientMetrics"; - private string deploymentId; - private string clientId; - private IPAddress address; - private string hostName; - - private DynamoDBStorage storage; - private Logger logger; - - private const string DEPLOYMENT_ID_PROPERTY_NAME = "DeploymentId"; - private const string ADDRESS_PROPERTY_NAME = "Address"; - private const string CLIENT_ID_PROPERTY_NAME = "ClientId"; - private const string HOSTNAME_PROPERTY_NAME = "HostName"; - private const string CPU_USAGE_PROPERTY_NAME = "CPUUsage"; - private const string MEMORY_USAGE_PROPERTY_NAME = "MemoryUsage"; - private const string SEND_QUEUE_LENGTH_PROPERTY_NAME = "SendQueueLength"; - private const string RECEIVE_QUEUE_LENGTH_PROPERTY_NAME = "ReceiveQueueLength"; - private const string SENT_MESSAGES_PROPERTY_NAME = "SentMessages"; - private const string RECEIVED_MESSAGES_PROPERTY_NAME = "ReceivedMessages"; - private const string CONNECTED_GATEWAY_COUNT_PROPERTY_NAME = "ConnectedGatewayCount"; - - private readonly Dictionary metrics = new Dictionary - { - { DEPLOYMENT_ID_PROPERTY_NAME, new AttributeValue() }, - { ADDRESS_PROPERTY_NAME, new AttributeValue() }, - { CLIENT_ID_PROPERTY_NAME, new AttributeValue() }, - { HOSTNAME_PROPERTY_NAME, new AttributeValue() }, - { CPU_USAGE_PROPERTY_NAME, new AttributeValue() }, - { MEMORY_USAGE_PROPERTY_NAME, new AttributeValue() }, - { SEND_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, - { RECEIVE_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, - { SENT_MESSAGES_PROPERTY_NAME, new AttributeValue() }, - { RECEIVED_MESSAGES_PROPERTY_NAME, new AttributeValue() }, - { CONNECTED_GATEWAY_COUNT_PROPERTY_NAME, new AttributeValue() }, - }; - - public DynamoDBClientMetricsPublisher() - { - logger = LogManager.GetLogger(this.GetType().Name, LoggerType.Runtime); - } - - public Task Init(ClientConfiguration config, IPAddress address, string clientId) - { - deploymentId = config.DeploymentId; - this.clientId = clientId; - this.address = address; - hostName = config.DNSHostName; - storage = new DynamoDBStorage(config.DataConnectionString, logger); - - return storage.InitializeTable(TABLE_NAME_DEFAULT_VALUE, - new List - { - new KeySchemaElement { AttributeName = DEPLOYMENT_ID_PROPERTY_NAME, KeyType = KeyType.HASH }, - new KeySchemaElement { AttributeName = CLIENT_ID_PROPERTY_NAME, KeyType = KeyType.RANGE } - }, - new List - { - new AttributeDefinition { AttributeName = DEPLOYMENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, - new AttributeDefinition { AttributeName = CLIENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } - }); - } - - public Task ReportMetrics(IClientPerformanceMetrics metricsData) - { - metrics[DEPLOYMENT_ID_PROPERTY_NAME].S = deploymentId; - metrics[ADDRESS_PROPERTY_NAME].S = address.ToString(); - metrics[CLIENT_ID_PROPERTY_NAME].S = clientId; - metrics[HOSTNAME_PROPERTY_NAME].S = hostName; - metrics[CPU_USAGE_PROPERTY_NAME].N = metricsData.CpuUsage.ToString(); - metrics[MEMORY_USAGE_PROPERTY_NAME].N = metricsData.MemoryUsage.ToString(); - metrics[SEND_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.SendQueueLength.ToString(); - metrics[RECEIVE_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.ReceiveQueueLength.ToString(); - metrics[SENT_MESSAGES_PROPERTY_NAME].N = metricsData.SentMessages.ToString(); - metrics[RECEIVED_MESSAGES_PROPERTY_NAME].N = metricsData.ReceivedMessages.ToString(); - metrics[CONNECTED_GATEWAY_COUNT_PROPERTY_NAME].N = metricsData.ConnectedGatewayCount.ToString(); - - if (logger.IsVerbose) logger.Verbose("Updated client metrics table entry: {0}", Utils.DictionaryToString(metrics)); - - return storage.PutEntryAsync(TABLE_NAME_DEFAULT_VALUE, metrics); - } - } -} diff --git a/src/OrleansAWSUtils/Statistics/DynamoDBSiloMetricsPublisher.cs b/src/OrleansAWSUtils/Statistics/DynamoDBSiloMetricsPublisher.cs deleted file mode 100644 index c436bff3cc..0000000000 --- a/src/OrleansAWSUtils/Statistics/DynamoDBSiloMetricsPublisher.cs +++ /dev/null @@ -1,118 +0,0 @@ -using Orleans.Runtime; -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using System.Net; -using OrleansAWSUtils.Storage; -using Amazon.DynamoDBv2.Model; -using Amazon.DynamoDBv2; - -namespace Orleans.Providers -{ - public class DynamoDBSiloMetricsPublisher : ISiloMetricsDataPublisher - { - private string deploymentId; - private SiloAddress siloAddress; - private string siloName; - private IPEndPoint gateway; - private string hostName; - - private DynamoDBStorage storage; - private Logger logger; - - private const string DEPLOYMENT_ID_PROPERTY_NAME = "DeploymentId"; - private const string ADDRESS_PROPERTY_NAME = "Address"; - private const string SILO_NAME_PROPERTY_NAME = "SiloName"; - private const string GATEWAY_ADDRESS_PROPERTY_NAME = "GatewayAddress"; - private const string HOSTNAME_PROPERTY_NAME = "HostName"; - private const string CPU_USAGE_PROPERTY_NAME = "CPUUsage"; - private const string MEMORY_USAGE_PROPERTY_NAME = "MemoryUsage"; - private const string ACTIVATIONS_PROPERTY_NAME = "Activations"; - private const string RECENTLY_USED_ACTIVATIONS_PROPERTY_NAME = "RecentlyUsedActivations"; - private const string SEND_QUEUE_LENGTH_PROPERTY_NAME = "SendQueueLength"; - private const string RECEIVE_QUEUE_LENGTH_PROPERTY_NAME = "ReceiveQueueLength"; - private const string REQUEST_QUEUE_LENGTH_PROPERTY_NAME = "RequestQueueLength"; - private const string SENT_MESSAGES_PROPERTY_NAME = "SentMessages"; - private const string RECEIVED_MESSAGES_PROPERTY_NAME = "ReceivedMessages"; - private const string LOAD_SHEDDING_PROPERTY_NAME = "LoadShedding"; - private const string CLIENT_COUNT_PROPERTY_NAME = "ClientCount"; - private const string TIMESTAMP_PROPERTY_NAME = "Timestamp"; - private const string TABLE_NAME_DEFAULT_VALUE = "OrleansSiloMetrics"; - - private readonly Dictionary metrics = new Dictionary - { - { DEPLOYMENT_ID_PROPERTY_NAME, new AttributeValue() }, - { ADDRESS_PROPERTY_NAME, new AttributeValue() }, - { SILO_NAME_PROPERTY_NAME, new AttributeValue() }, - { HOSTNAME_PROPERTY_NAME, new AttributeValue() }, - { GATEWAY_ADDRESS_PROPERTY_NAME, new AttributeValue() }, - { MEMORY_USAGE_PROPERTY_NAME, new AttributeValue() }, - { SEND_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, - { RECEIVE_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, - { SENT_MESSAGES_PROPERTY_NAME, new AttributeValue() }, - { RECEIVED_MESSAGES_PROPERTY_NAME, new AttributeValue() }, - { RECENTLY_USED_ACTIVATIONS_PROPERTY_NAME, new AttributeValue() }, - { CPU_USAGE_PROPERTY_NAME, new AttributeValue() }, - { ACTIVATIONS_PROPERTY_NAME, new AttributeValue() }, - { REQUEST_QUEUE_LENGTH_PROPERTY_NAME, new AttributeValue() }, - { LOAD_SHEDDING_PROPERTY_NAME, new AttributeValue() }, - { CLIENT_COUNT_PROPERTY_NAME, new AttributeValue() }, - { TIMESTAMP_PROPERTY_NAME, new AttributeValue() }, - }; - - public DynamoDBSiloMetricsPublisher() - { - logger = LogManager.GetLogger(this.GetType().Name, LoggerType.Runtime); - } - - public Task Init(string deploymentId, string storageConnectionString, SiloAddress siloAddress, string siloName, IPEndPoint gateway, string hostName) - { - this.deploymentId = deploymentId; - this.siloAddress = siloAddress; - this.siloName = siloName; - this.gateway = gateway; - this.hostName = hostName; - storage = new DynamoDBStorage(storageConnectionString, logger); - - return storage.InitializeTable(TABLE_NAME_DEFAULT_VALUE, - new List - { - new KeySchemaElement { AttributeName = DEPLOYMENT_ID_PROPERTY_NAME, KeyType = KeyType.HASH }, - new KeySchemaElement { AttributeName = SILO_NAME_PROPERTY_NAME, KeyType = KeyType.RANGE } - }, - new List - { - new AttributeDefinition { AttributeName = DEPLOYMENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, - new AttributeDefinition { AttributeName = SILO_NAME_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } - }); - } - - public Task ReportMetrics(ISiloPerformanceMetrics metricsData) - { - metrics[DEPLOYMENT_ID_PROPERTY_NAME].S = deploymentId; - metrics[ADDRESS_PROPERTY_NAME].S = siloAddress.ToString(); - metrics[HOSTNAME_PROPERTY_NAME].S = hostName; - metrics[SILO_NAME_PROPERTY_NAME].S = siloName; - if (gateway != null) - { - metrics[GATEWAY_ADDRESS_PROPERTY_NAME].S = gateway.ToString(); - } - metrics[CPU_USAGE_PROPERTY_NAME].N = metricsData.CpuUsage.ToString(); - metrics[MEMORY_USAGE_PROPERTY_NAME].N = metricsData.MemoryUsage.ToString(); - metrics[SEND_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.SendQueueLength.ToString(); - metrics[RECEIVE_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.ReceiveQueueLength.ToString(); - metrics[RECENTLY_USED_ACTIVATIONS_PROPERTY_NAME].N = metricsData.RecentlyUsedActivationCount.ToString(); - metrics[SENT_MESSAGES_PROPERTY_NAME].N = metricsData.SentMessages.ToString(); - metrics[RECEIVED_MESSAGES_PROPERTY_NAME].N = metricsData.ReceivedMessages.ToString(); - metrics[ACTIVATIONS_PROPERTY_NAME].N = metricsData.ActivationCount.ToString(); - metrics[REQUEST_QUEUE_LENGTH_PROPERTY_NAME].N = metricsData.RequestQueueLength.ToString(); - metrics[LOAD_SHEDDING_PROPERTY_NAME].BOOL = metricsData.IsOverloaded; - metrics[CLIENT_COUNT_PROPERTY_NAME].N = metricsData.ClientCount.ToString(); - metrics[TIMESTAMP_PROPERTY_NAME].S = DateTime.UtcNow.ToString(); - - if (logger.IsVerbose) logger.Verbose("Updated silo metrics table entry: {0}", Utils.DictionaryToString(metrics)); - - return storage.PutEntryAsync(TABLE_NAME_DEFAULT_VALUE, metrics); - } - } -} diff --git a/src/OrleansAWSUtils/Statistics/DynamoDBStatisticsPublisher.cs b/src/OrleansAWSUtils/Statistics/DynamoDBStatisticsPublisher.cs deleted file mode 100644 index d7e886a95f..0000000000 --- a/src/OrleansAWSUtils/Statistics/DynamoDBStatisticsPublisher.cs +++ /dev/null @@ -1,138 +0,0 @@ -using Amazon.DynamoDBv2; -using Amazon.DynamoDBv2.Model; -using Orleans.Runtime; -using OrleansAWSUtils.Storage; -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Threading.Tasks; - -namespace Orleans.Providers -{ - public class DynamoDBStatisticsPublisher : IStatisticsPublisher - { - private const string PARTITION_KEY_PROPERTY_NAME = "PartitionKey"; - private const string ROW_KEY_PROPERTY_NAME = "RowKey"; - private const string DEPLOYMENT_ID_PROPERTY_NAME = "DeploymentId"; - private const string TIME_PROPERTY_NAME = "Time"; - private const string ADDRESS_PROPERTY_NAME = "Address"; - private const string NAME_PROPERTY_NAME = "Name"; - private const string HOSTNAME_PROPERTY_NAME = "HostName"; - private const string STATISTIC_PROPERTY_NAME = "Statistic"; - private const string STATVALUE_PROPERTY_NAME = "StatValue"; - private const string ISDELTA_PROPERTY_NAME = "IsDelta"; - private const string DATE_TIME_FORMAT = "yyyy-MM-dd-" + "HH:mm:ss.fff 'GMT'"; - - private string deploymentId; - private string address; - private string name; - private bool isSilo; - private long clientEpoch; - private int counter; - private string hostName; - private string tableName; - - private DynamoDBStorage storage; - private readonly Logger logger; - - public DynamoDBStatisticsPublisher() - { - logger = LogManager.GetLogger(this.GetType().Name, LoggerType.Runtime); - } - - public Task Init(bool isSilo, string storageConnectionString, string deploymentId, string address, string siloName, string hostName) - { - this.deploymentId = deploymentId; - this.address = address; - name = isSilo ? siloName : hostName; - this.hostName = hostName; - this.isSilo = isSilo; - if (!this.isSilo) - { - clientEpoch = SiloAddress.AllocateNewGeneration(); - } - counter = 0; - tableName = isSilo ? "OrleansSiloStatistics" : "OrleansClientStatistics"; - - storage = new DynamoDBStorage(storageConnectionString, logger); - - return storage.InitializeTable(tableName, - new List - { - new KeySchemaElement { AttributeName = PARTITION_KEY_PROPERTY_NAME, KeyType = KeyType.HASH }, - new KeySchemaElement { AttributeName = ROW_KEY_PROPERTY_NAME, KeyType = KeyType.RANGE } - }, - new List - { - new AttributeDefinition { AttributeName = PARTITION_KEY_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }, - new AttributeDefinition { AttributeName = ROW_KEY_PROPERTY_NAME, AttributeType = ScalarAttributeType.S } - }); - } - - public Task ReportStats(List statsCounters) - { - try - { - var toWrite = new List>(); - foreach (var counter in statsCounters) - { - var fields = ParseCounter(counter); - if (fields == null) - continue; - - toWrite.Add(fields); - } - - if (toWrite.Count <= 25) - { - return storage.DeleteEntriesAsync(tableName, toWrite); - } - else - { - var tasks = new List(); - foreach (var batch in toWrite.BatchIEnumerable(25)) - { - tasks.Add(storage.DeleteEntriesAsync(tableName, batch)); - } - return Task.WhenAll(tasks); - } - } - catch (Exception exc) - { - logger.Error(ErrorCode.PerfStatistics, string.Format("Unable to write statistics records on table {0} for deploymentId {1}: Exception={2}", - tableName, deploymentId, exc)); - throw; - } - } - - private Dictionary ParseCounter(ICounter statsCounter) - { - string statValue = statsCounter.IsValueDelta ? statsCounter.GetDeltaString() : statsCounter.GetValueString(); - if ("0".Equals(statValue)) - { - return null; - } - - counter++; - var now = DateTime.UtcNow; - var ticks = DateTime.MaxValue.Ticks - now.Ticks; - - return new Dictionary - { - // PartitionKey: DeploymentId$ReverseTimestampToTheNearestHour - // RowKey: ReverseTimestampToTheNearestSecond$Name$counter - // As defined in http://dotnet.github.io/orleans/Runtime-Implementation-Details/Runtime-Tables - { PARTITION_KEY_PROPERTY_NAME, new AttributeValue(string.Join("$", deploymentId, string.Format("{0:d19}", ticks - ticks % TimeSpan.TicksPerHour))) }, - { ROW_KEY_PROPERTY_NAME, new AttributeValue(string.Join("$", string.Format("{0:d19}", ticks), name, string.Format("{0:000000}", counter))) }, - { DEPLOYMENT_ID_PROPERTY_NAME, new AttributeValue(deploymentId) }, - { TIME_PROPERTY_NAME, new AttributeValue(now.ToString(DATE_TIME_FORMAT, CultureInfo.InvariantCulture)) }, - { ADDRESS_PROPERTY_NAME, new AttributeValue(address)}, - { NAME_PROPERTY_NAME, new AttributeValue(name)}, - { HOSTNAME_PROPERTY_NAME, new AttributeValue(hostName)}, - { STATISTIC_PROPERTY_NAME, new AttributeValue(statsCounter.Name)}, - { ISDELTA_PROPERTY_NAME, new AttributeValue { BOOL = statsCounter.IsValueDelta } }, - { STATVALUE_PROPERTY_NAME, new AttributeValue { N = statValue } } - }; - } - } -} diff --git a/test/TestGrainInterfaces/IPersistenceTestGrains.cs b/test/TestGrainInterfaces/IPersistenceTestGrains.cs index 6e2e4c7cb6..de90547e0c 100644 --- a/test/TestGrainInterfaces/IPersistenceTestGrains.cs +++ b/test/TestGrainInterfaces/IPersistenceTestGrains.cs @@ -19,7 +19,7 @@ public interface IPersistenceTestGrain : IGrainWithGuidKey } - public interface IPersistenceTestGenericGrain : IPersistenceTestGrain // IGrainWithGuidKeyaws + public interface IPersistenceTestGenericGrain : IPersistenceTestGrain // IGrainWithGuidKey { } // Task CheckStateInit(); // Task CheckProviderType(); diff --git a/test/TesterInternal/MembershipTests/DynamoDBMembershipTableTest.cs b/test/TesterInternal/MembershipTests/DynamoDBMembershipTableTest.cs index 02390a1146..2f215f610d 100644 --- a/test/TesterInternal/MembershipTests/DynamoDBMembershipTableTest.cs +++ b/test/TesterInternal/MembershipTests/DynamoDBMembershipTableTest.cs @@ -38,13 +38,13 @@ protected override string GetConnectionString() [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] public async Task MembershipTable_DynamoDB_GetGateways() { - await MembershipTable_GetGateways(false); + await MembershipTable_GetGateways(); } [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] public async Task MembershipTable_DynamoDB_ReadAll_EmptyTable() { - await MembershipTable_ReadAll_EmptyTable(false); + await MembershipTable_ReadAll_EmptyTable(); } [Fact, TestCategory("Functional"), TestCategory("Membership"), TestCategory("AWS")] diff --git a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs index 69d70b8b66..c54185a644 100644 --- a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs +++ b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs @@ -74,7 +74,7 @@ protected virtual string GetAdoInvariant() return null; } - protected async Task MembershipTable_GetGateways(bool extendedProtocol = true) + protected async Task MembershipTable_GetGateways() { var membershipEntries = Enumerable.Range(0, 10).Select(i => CreateMembershipEntryForTest()).ToArray(); @@ -91,8 +91,7 @@ protected async Task MembershipTable_GetGateways(bool extendedProtocol = true) foreach (var membershipEntry in membershipEntries) { Assert.True(await membershipTable.InsertRow(membershipEntry, version)); - if (extendedProtocol) - version = (await membershipTable.ReadRow(membershipEntry.SiloAddress)).Version; + version = (await membershipTable.ReadRow(membershipEntry.SiloAddress)).Version; } var gateways = await gatewayListProvider.GetGateways(); @@ -103,19 +102,16 @@ protected async Task MembershipTable_GetGateways(bool extendedProtocol = true) Assert.True(entries.Contains(membershipEntries[9].SiloAddress.ToGatewayUri().ToString())); } - protected async Task MembershipTable_ReadAll_EmptyTable(bool extendedProtocol = true) + protected async Task MembershipTable_ReadAll_EmptyTable() { var data = await membershipTable.ReadAll(); Assert.NotNull(data); - if (extendedProtocol) - { - logger.Info("Membership.ReadAll returned VableVersion={0} Data={1}", data.Version, data); + logger.Info("Membership.ReadAll returned VableVersion={0} Data={1}", data.Version, data); - Assert.Equal(0, data.Members.Count); - Assert.NotNull(data.Version.VersionEtag); - Assert.Equal(0, data.Version.Version); - } + Assert.Equal(0, data.Members.Count); + Assert.NotNull(data.Version.VersionEtag); + Assert.Equal(0, data.Version.Version); } protected async Task MembershipTable_InsertRow(bool extendedProtocol = true) @@ -126,9 +122,7 @@ protected async Task MembershipTable_InsertRow(bool extendedProtocol = true) Assert.NotNull(data); Assert.Equal(0, data.Members.Count); - TableVersion nextTableVersion = null; - if (extendedProtocol) - nextTableVersion = data.Version.Next(); + TableVersion nextTableVersion = data.Version.Next(); bool ok = await membershipTable.InsertRow(membershipEntry, nextTableVersion); Assert.True(ok, "InsertRow failed"); @@ -149,9 +143,7 @@ protected async Task MembershipTable_ReadRow_Insert_Read(bool extendedProtocol = Assert.Equal(0, data.Members.Count); - TableVersion newTableVersion = null; - if (extendedProtocol) - newTableVersion = data.Version.Next(); + TableVersion newTableVersion = data.Version.Next(); MembershipEntry newEntry = CreateMembershipEntryForTest(); bool ok = await membershipTable.InsertRow(newEntry, newTableVersion); @@ -174,9 +166,7 @@ protected async Task MembershipTable_ReadRow_Insert_Read(bool extendedProtocol = Assert.Equal(1, data.Version.Version); } - TableVersion nextTableVersion = null; - if (extendedProtocol) - nextTableVersion = data.Version.Next(); + TableVersion nextTableVersion = data.Version.Next(); ok = await membershipTable.InsertRow(newEntry, nextTableVersion); Assert.False(ok, "InsertRow should have failed - duplicate entry"); @@ -215,9 +205,7 @@ protected async Task MembershipTable_ReadAll_Insert_ReadAll(bool extendedProtoco Assert.Equal(0, data.Members.Count); - TableVersion newTableVersion = null; - if (extendedProtocol) - newTableVersion = data.Version.Next(); + TableVersion newTableVersion = data.Version.Next(); MembershipEntry newEntry = CreateMembershipEntryForTest(); bool ok = await membershipTable.InsertRow(newEntry, newTableVersion); @@ -228,10 +216,10 @@ protected async Task MembershipTable_ReadAll_Insert_ReadAll(bool extendedProtoco logger.Info("Membership.ReadAll returned VableVersion={0} Data={1}", data.Version, data); Assert.Equal(1, data.Members.Count); + Assert.NotNull(data.Version.VersionEtag); if (extendedProtocol) { - Assert.NotNull(data.Version.VersionEtag); Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag); Assert.Equal(newTableVersion.Version, data.Version.Version); } @@ -247,10 +235,10 @@ protected async Task MembershipTable_ReadAll_Insert_ReadAll(bool extendedProtoco protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) { var tableData = await membershipTable.ReadAll(); + Assert.NotNull(tableData.Version); if (extendedProtocol) { - Assert.NotNull(tableData.Version); Assert.Equal(0, tableData.Version.Version); } Assert.Equal(0, tableData.Members.Count); @@ -266,9 +254,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) new Tuple(CreateSiloAddressForTest(), GetUtcNowWithSecondsResolution().AddSeconds(2)) }; - TableVersion tableVersion = null; - if (extendedProtocol) - tableVersion = tableData.Version.Next(); + TableVersion tableVersion = tableData.Version.Next(); logger.Info("Calling InsertRow with Entry = {0} TableVersion = {1}", siloEntry, tableVersion); bool ok = await membershipTable.InsertRow(siloEntry, tableVersion); @@ -291,8 +277,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) } tableData = await membershipTable.ReadAll(); - if (extendedProtocol) - tableVersion = tableData.Version.Next(); + tableVersion = tableData.Version.Next(); logger.Info("Calling UpdateRow with Entry = {0} correct eTag = {1} correct version={2}", siloEntry, etagBefore, tableVersion != null ? tableVersion.ToString() : "null"); @@ -327,15 +312,6 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); } - //var nextTableVersion = tableData.Version.Next(); - - //logger.Info("Calling UpdateRow with Entry = {0} old eTag = {1} correct version={2}", siloEntry, - // etagBefore, nextTableVersion); - - //ok = await membershipTable.UpdateRow(siloEntry, etagBefore, nextTableVersion); - - //Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); - tableData = await membershipTable.ReadAll(); etagBefore = etagAfter; @@ -343,11 +319,10 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) etagAfter = tableData.Get(siloEntry.SiloAddress).Item2; Assert.Equal(etagBefore, etagAfter); + Assert.NotNull(tableData.Version); if (extendedProtocol) - { - Assert.NotNull(tableData.Version); Assert.Equal(tableVersion.Version, tableData.Version.Version); - } + Assert.Equal(i, tableData.Members.Count); } } @@ -358,10 +333,7 @@ protected async Task MembershipTable_UpdateRowInParallel(bool extendedProtocol = var data = CreateMembershipEntryForTest(); - TableVersion newTableVer = null; - - if (extendedProtocol) - tableData.Version.Next(); + TableVersion newTableVer = tableData.Version.Next(); var insertions = Task.WhenAll(Enumerable.Range(1, 20).Select(i => membershipTable.InsertRow(data, newTableVer))); @@ -374,10 +346,8 @@ await Task.WhenAll(Enumerable.Range(1, 19).Select(async i => { var updatedTableData = await membershipTable.ReadAll(); var updatedRow = updatedTableData.Get(data.SiloAddress); - TableVersion tableVersion = null; - if (extendedProtocol) - tableVersion = updatedTableData.Version.Next(); + TableVersion tableVersion = updatedTableData.Version.Next(); await Task.Delay(10); done = await membershipTable.UpdateRow(updatedRow.Item1, updatedRow.Item2, tableVersion); @@ -386,11 +356,11 @@ await Task.WhenAll(Enumerable.Range(1, 19).Select(async i => tableData = await membershipTable.ReadAll(); + Assert.NotNull(tableData.Version); + if (extendedProtocol) - { - Assert.NotNull(tableData.Version); Assert.Equal(20, tableData.Version.Version); - } + Assert.Equal(1, tableData.Members.Count); } From 6f5ae6cf2fc71a1396c8866b47db41bd7eab5525 Mon Sep 17 00:00:00 2001 From: Gutemberg Ribeiro Date: Mon, 15 Aug 2016 13:46:40 -0300 Subject: [PATCH 3/3] Fixes on tests --- src/Orleans.sln | 8 ++--- .../MembershipTableTestsBase.cs | 29 ++++++------------- 2 files changed, 13 insertions(+), 24 deletions(-) diff --git a/src/Orleans.sln b/src/Orleans.sln index d1321fb975..fe06546075 100644 --- a/src/Orleans.sln +++ b/src/Orleans.sln @@ -312,14 +312,14 @@ Global {B99C744A-7F62-430C-9255-E64875D39486}.Debug|Any CPU.Build.0 = Debug|Any CPU {B99C744A-7F62-430C-9255-E64875D39486}.Release|Any CPU.ActiveCfg = Release|Any CPU {B99C744A-7F62-430C-9255-E64875D39486}.Release|Any CPU.Build.0 = Release|Any CPU - {67738E6C-F292-46A2-994D-5B52E745205B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {67738E6C-F292-46A2-994D-5B52E745205B}.Debug|Any CPU.Build.0 = Debug|Any CPU - {67738E6C-F292-46A2-994D-5B52E745205B}.Release|Any CPU.ActiveCfg = Release|Any CPU - {67738E6C-F292-46A2-994D-5B52E745205B}.Release|Any CPU.Build.0 = Release|Any CPU {6AD37425-7CB4-4D23-80C3-A9D143329A66}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {6AD37425-7CB4-4D23-80C3-A9D143329A66}.Debug|Any CPU.Build.0 = Debug|Any CPU {6AD37425-7CB4-4D23-80C3-A9D143329A66}.Release|Any CPU.ActiveCfg = Release|Any CPU {6AD37425-7CB4-4D23-80C3-A9D143329A66}.Release|Any CPU.Build.0 = Release|Any CPU + {67738E6C-F292-46A2-994D-5B52E745205B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {67738E6C-F292-46A2-994D-5B52E745205B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {67738E6C-F292-46A2-994D-5B52E745205B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {67738E6C-F292-46A2-994D-5B52E745205B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs index c54185a644..0de00b248c 100644 --- a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs +++ b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs @@ -162,9 +162,7 @@ protected async Task MembershipTable_ReadRow_Insert_Read(bool extendedProtocol = data = await membershipTable.ReadAll(); if (extendedProtocol) - { Assert.Equal(1, data.Version.Version); - } TableVersion nextTableVersion = data.Version.Next(); @@ -182,11 +180,9 @@ protected async Task MembershipTable_ReadRow_Insert_Read(bool extendedProtocol = logger.Info("Membership.ReadRow returned VableVersion={0} Data={1}", data.Version, data); Assert.Equal(1, data.Members.Count); - + Assert.NotNull(data.Version.VersionEtag); if (extendedProtocol) { - Assert.NotNull(data.Version.VersionEtag); - Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag); Assert.Equal(newTableVersion.Version, data.Version.Version); } @@ -237,10 +233,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) var tableData = await membershipTable.ReadAll(); Assert.NotNull(tableData.Version); - if (extendedProtocol) - { - Assert.Equal(0, tableData.Version.Version); - } + Assert.Equal(0, tableData.Version.Version); Assert.Equal(0, tableData.Members.Count); for (int i = 1; i < 10; i++) @@ -267,15 +260,14 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) Assert.NotNull(etagBefore); - logger.Info("Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}", siloEntry, - etagBefore, tableVersion != null ? tableVersion.ToString() : "null"); - if (extendedProtocol) { + logger.Info("Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}", siloEntry, + etagBefore, tableVersion != null ? tableVersion.ToString() : "null"); ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + tableData = await membershipTable.ReadAll(); } - tableData = await membershipTable.ReadAll(); tableVersion = tableData.Version.Next(); @@ -286,13 +278,10 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) Assert.True(ok, $"UpdateRow failed - Table Data = {tableData}"); - if (extendedProtocol) - { - logger.Info("Calling UpdateRow with Entry = {0} old eTag = {1} old version={2}", siloEntry, - etagBefore, tableVersion != null ? tableVersion.ToString() : "null"); - ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); - Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); - } + logger.Info("Calling UpdateRow with Entry = {0} old eTag = {1} old version={2}", siloEntry, + etagBefore, tableVersion != null ? tableVersion.ToString() : "null"); + ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); + Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); tableData = await membershipTable.ReadAll();