Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/value updater #440

Merged
merged 12 commits into from
Nov 3, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Added

- Added new service UpdateValues which propagates changes (e.g. ECHI mapping changes) throughout the deployed database tables.
- ConsensusRule for combining 2+ other rules e.g. SocketRules (See IsIdentifiable Readme.md for more details)
- Added runtime and total failures count to IsIdentifiable logs
- Added NoSuffixProjectPathResolver which generates anonymous image path names that do not contain "-an" (which is the default behaviour).
Expand Down
20 changes: 19 additions & 1 deletion SmiServices.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29411.108
Expand Down Expand Up @@ -79,6 +78,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microservices.FileCopier",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microservices.FileCopier.Tests", "tests\microservices\Microservices.FileCopier.Tests\Microservices.FileCopier.Tests.csproj", "{D61F6BF9-E857-457C-B745-40489A8CFE65}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Updating", "Updating", "{758C7C8C-683A-42E8-A8C5-C32AED1741BE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microservices.UpdateValues", "src\microservices\Updating\Microservices.UpdateValues\Microservices.UpdateValues.csproj", "{C710F837-3E9A-4D0B-8ADE-A76A87187BB0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microservices.UpdateValues.Tests", "tests\microservices\Microservices.UpdateValues.Tests\Microservices.UpdateValues.Tests.csproj", "{1B27F8A1-1A0F-4EFB-8A0F-4DB461E8B739}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
Expand Down Expand Up @@ -193,6 +198,16 @@ Global
{D61F6BF9-E857-457C-B745-40489A8CFE65}.Debug|x64.Build.0 = Debug|x64
{D61F6BF9-E857-457C-B745-40489A8CFE65}.Release|x64.ActiveCfg = Release|x64
{D61F6BF9-E857-457C-B745-40489A8CFE65}.Release|x64.Build.0 = Release|x64
{C710F837-3E9A-4D0B-8ADE-A76A87187BB0}.Debug|x64.ActiveCfg = Debug|x64
{C710F837-3E9A-4D0B-8ADE-A76A87187BB0}.Debug|x64.Build.0 = Debug|x64
{C710F837-3E9A-4D0B-8ADE-A76A87187BB0}.Release|x64.ActiveCfg = Release|x64
{C710F837-3E9A-4D0B-8ADE-A76A87187BB0}.Release|x64.Build.0 = Release|x64
{1B27F8A1-1A0F-4EFB-8A0F-4DB461E8B739}.Debug|x64.ActiveCfg = Debug|x64
{1B27F8A1-1A0F-4EFB-8A0F-4DB461E8B739}.Debug|x64.Build.0 = Debug|x64
{1B27F8A1-1A0F-4EFB-8A0F-4DB461E8B739}.Release|x64.ActiveCfg = Release|x64{C710F837-3E9A-4D0B-8ADE-A76A87187BB0}
{1B27F8A1-1A0F-4EFB-8A0F-4DB461E8B739}.Release|x64.Build.0 = Release|x64{C710F837-3E9A-4D0B-8ADE-A76A87187BB0}
{1B27F8A1-1A0F-4EFB-8A0F-4DB461E8B739}.Release|x64.ActiveCfg = Release|x64
{1B27F8A1-1A0F-4EFB-8A0F-4DB461E8B739}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -225,6 +240,9 @@ Global
{C2031E86-81B4-405A-A923-9B82E0CE196F} = {8B943F2C-835B-484A-86D2-3F1462970605}
{D4E52707-FFF7-41E6-8057-C6DB344B8CD7} = {421CCD37-3817-4748-B184-A134E19DD75C}
{D61F6BF9-E857-457C-B745-40489A8CFE65} = {421CCD37-3817-4748-B184-A134E19DD75C}
{758C7C8C-683A-42E8-A8C5-C32AED1741BE} = {421CCD37-3817-4748-B184-A134E19DD75C}
{C710F837-3E9A-4D0B-8ADE-A76A87187BB0} = {758C7C8C-683A-42E8-A8C5-C32AED1741BE}
{1B27F8A1-1A0F-4EFB-8A0F-4DB461E8B739} = {758C7C8C-683A-42E8-A8C5-C32AED1741BE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {11CDEA53-71E8-4A9B-BC0D-74F4EB54F740}
Expand Down
7 changes: 7 additions & 0 deletions data/microserviceConfigs/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ DicomRelationalMapperOptions:
RetryDelayInSeconds: 60
RunChecks: true

UpdateValuesOptions:
QueueName: 'TEST.UpdateValuesQueue'
QoSPrefetchCount: 10000
AutoAck: false
#TableInfosToUpdate: 1,2,3 <-set this to limit which tables get updated when no explicit table is listed in the message
UpdateTimeout: 500 # number of seconds to wait for each UPDATE statement

CohortExtractorOptions:
QueueName: 'TEST.RequestQueue'
QoSPrefetchCount: 10000
Expand Down
106 changes: 106 additions & 0 deletions src/common/Smi.Common/Messages/Updating/UpdateValuesMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Smi.Common.Messages.Updating
{
/// <summary>
/// Requests to update the values in the fields <see cref="WriteIntoFields"/> to <see cref="Values"/> where the value in <see cref="WhereFields"/> match <see cref="HaveValues"/>
/// </summary>
public class UpdateValuesMessage : IMessage
{
/// <summary>
/// Optional Sql operators e.g. "=", "<" etc to use in WHERE Sql when looking for <see cref="HaveValues"/> in <see cref="WhereFields"/>. If null or empty "=" is assumed for all WHERE comparisons
/// </summary>
public string[] Operators {get;set;} = null;

/// <summary>
/// The field(s) to search the database for (this should be the human readable name without qualifiers as it may match multiple tables e.g. ECHI)
/// </summary>
public string[] WhereFields {get;set;} = new string[0];

/// <summary>
/// The values to search for when deciding which records to update
/// </summary>
public string[] HaveValues {get;set;} = new string[0];

/// <summary>
/// The field(s) which should be updated, may be the same as the <see cref="WhereFields"/>
/// </summary>
public string[] WriteIntoFields {get;set;} = new string[0];

/// <summary>
/// The values to write into matching records (see <see cref="WriteIntoFields"/>). Null elements in this array should be treated as <see cref="DBNull.Value"/>
/// </summary>
public string[] Values {get;set;} = new string[0];

/// <summary>
/// Optional. Where present indicates the tables which should be updated. If empty then all tables matching the fields should be updated
/// </summary>
public int[] ExplicitTableInfo {get;set; } = new int[0];

public void Validate()
{
if (WhereFields.Length != HaveValues.Length)
throw new Exception($"{nameof(WhereFields)} length must match {nameof(HaveValues)} length");

if (WriteIntoFields.Length != Values.Length)
throw new Exception($"{nameof(WriteIntoFields)} length must match {nameof(Values)} length");

// If operators are specified then the WHERE column count must match the operator count
if(Operators != null && Operators.Length != 0)
if (Operators.Length != WhereFields.Length)
throw new Exception($"{nameof(WhereFields)} length must match {nameof(Operators)} length");

if(WhereFields.Length == 0)
throw new Exception("There must be at least one search field for WHERE section. Otherwise this would update entire tables");

if(WriteIntoFields.Length == 0)
throw new Exception("There must be at least one value to write");

}

/// <summary>
/// Describes the message in terms of fields that are updated and checked in WHERE logic (but not values)
/// </summary>
/// <returns></returns>
public override string ToString()
{
return
$"{nameof(UpdateValuesMessage)}: {nameof(WhereFields)}={string.Join(",",WhereFields)} {nameof(WriteIntoFields)}={string.Join(",",WriteIntoFields)}";
}

/// <summary>
/// Checks whether two messages update the same fields using the same where logic
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public override bool Equals(object obj)
{
return obj is UpdateValuesMessage message &&
Enumerable.SequenceEqual(Operators ?? new string[0], message.Operators?? new string[0]) &&
Enumerable.SequenceEqual(WhereFields ?? new string[0], message.WhereFields?? new string[0]) &&
Enumerable.SequenceEqual(HaveValues?? new string[0], message.HaveValues?? new string[0]) &&
Enumerable.SequenceEqual(WriteIntoFields?? new string[0], message.WriteIntoFields?? new string[0]) &&
Enumerable.SequenceEqual(Values?? new string[0], message.Values?? new string[0]) &&
Enumerable.SequenceEqual(ExplicitTableInfo?? new int[0], message.ExplicitTableInfo?? new int[0]);
}

/// <summary>
/// Returns a hashcode based on the sizes of arrays (ok so most of our messages would end up in the same hash bucket but that's probably fine).
/// </summary>
/// <returns></returns>
public override int GetHashCode()
{
int hashCode = -1341392600;
hashCode = hashCode * -1521134295 + EqualityComparer<int>.Default.GetHashCode(Operators?.Length ?? 0);
hashCode = hashCode * -1521134295 + EqualityComparer<int>.Default.GetHashCode(WhereFields?.Length ?? 0);
hashCode = hashCode * -1521134295 + EqualityComparer<int>.Default.GetHashCode(HaveValues?.Length ?? 0);
hashCode = hashCode * -1521134295 + EqualityComparer<int>.Default.GetHashCode(WriteIntoFields?.Length ?? 0);
hashCode = hashCode * -1521134295 + EqualityComparer<int>.Default.GetHashCode(Values?.Length ?? 0);
hashCode = hashCode * -1521134295 + EqualityComparer<int>.Default.GetHashCode(ExplicitTableInfo?.Length ?? 0);
return hashCode;
}
}
}
15 changes: 15 additions & 0 deletions src/common/Smi.Common/Options/GlobalOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class GlobalOptions : IOptions
public RDMPOptions RDMPOptions { get; set; }
public MongoDatabases MongoDatabases { get; set; }
public DicomRelationalMapperOptions DicomRelationalMapperOptions { get; set; }
public UpdateValuesOptions UpdateValuesOptions {get;set;}
public CohortExtractorOptions CohortExtractorOptions { get; set; }
public CohortPackagerOptions CohortPackagerOptions { get; set; }
public DicomReprocessorOptions DicomReprocessorOptions { get; set; }
Expand Down Expand Up @@ -384,6 +385,20 @@ public void Validate()

}
}

[UsedImplicitly]
public class UpdateValuesOptions: ConsumerOptions
{
/// <summary>
/// Number of seconds the updater will wait when running a single value UPDATE on the live table e.g. ECHI A needs to be replaced with ECHI B
/// </summary>
public int UpdateTimeout {get;set;} = 5000;

/// <summary>
/// IDs of TableInfos that should be updated
/// </summary>
public int[] TableInfosToUpdate {get;set;} = new int[0];
}

[UsedImplicitly]
public class DicomRelationalMapperOptions : ConsumerOptions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Smi.Common.Messages.Updating;

namespace Microservices.UpdateValues.Execution
{
public interface IUpdater
{
/// <summary>
/// Update one or more database tables to fully propagate <paramref name="message"/> to all relevant tables
/// </summary>
/// <param name="message">What should be updated</param>
/// <returns>total number of rows updated in the database(s)</returns>
int HandleUpdate(UpdateValuesMessage message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using JetBrains.Annotations;
using Rdmp.Core.Repositories;
using Smi.Common;
using Smi.Common.Execution;
using Smi.Common.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace Microservices.UpdateValues.Execution
{
public class UpdateValuesHost : MicroserviceHost
{
UpdateValuesQueueConsumer Consumer {get;set;}

public UpdateValuesHost([NotNull] GlobalOptions globals, IRabbitMqAdapter rabbitMqAdapter = null, bool loadSmiLogConfig = true, bool threaded = false) : base(globals, rabbitMqAdapter, loadSmiLogConfig, threaded)
{
}

public override void Start()
{

IRDMPPlatformRepositoryServiceLocator repositoryLocator = Globals.RDMPOptions.GetRepositoryProvider();
Consumer = new UpdateValuesQueueConsumer(Globals.UpdateValuesOptions,repositoryLocator.CatalogueRepository);

RabbitMqAdapter.StartConsumer(Globals.UpdateValuesOptions, Consumer, isSolo: false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Rdmp.Core.Repositories;
using Smi.Common.Messages;
using Smi.Common.Messages.Updating;
using Smi.Common.Messaging;
using Smi.Common.Options;

namespace Microservices.UpdateValues.Execution
{
public class UpdateValuesQueueConsumer : Consumer<UpdateValuesMessage>
{
private Updater _updater;

public UpdateValuesQueueConsumer(UpdateValuesOptions opts,ICatalogueRepository repo)
{
_updater = new Updater(repo);
_updater.UpdateTimeout = opts.UpdateTimeout;
_updater.TableInfosToUpdate = opts.TableInfosToUpdate;
}
protected override void ProcessMessageImpl(IMessageHeader header, UpdateValuesMessage message, ulong tag)
{
_updater.HandleUpdate(message);

Ack(header,tag);
}
}
}
Loading