Skip to content

Commit

Permalink
accepted user commands and alerts are now registered as events (#127)
Browse files Browse the repository at this point in the history
* accepted user commands and alerts are now registered as events

By registering these low frequency events it will be possible later to check relevant actions and alerts that have been triggered for the system

Also:
* fixed bug in multi command alerts
* CommandHandler.AlertFired has been removed, use CommandHandler.EventFired and check type == EventType.Alert instead
* fixed double firing of command events
  • Loading branch information
freddyrios authored Oct 1, 2021
1 parent e988ea9 commit 9e1fae4
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 34 deletions.
11 changes: 5 additions & 6 deletions CA_DataUploaderLib/Alerts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected override Task Command(List<string> args)

public override void OnNewVectorReceived(object sender, NewVectorReceivedArgs e)
{
var timestamp = e.GetVectorTime().ToString("yyyy.MM.dd HH:mm:ss");
var timestamp = e.GetVectorTime();
var (alertsToTrigger, noSensorAlerts) = GetAlertsToTrigger(e); // we gather alerts separately from triggering, to reduce time locking the _alerts list

foreach (var a in alertsToTrigger ?? Enumerable.Empty<IOconfAlert>())
Expand Down Expand Up @@ -104,17 +104,16 @@ public override void OnNewVectorReceived(object sender, NewVectorReceivedArgs e)
noSensorAlerts ?? Enumerable.Empty<IOconfAlert>());
}

private void TriggerAlert(IOconfAlert a, string timestamp, string message)
private void TriggerAlert(IOconfAlert a, DateTime timestamp, string message)
{
message = timestamp + message;
logger.LogError(message);
logger.LogError(timestamp.ToString("yyyy.MM.dd HH:mm:ss") + message);
if (a.Command != default)
{
foreach (var commands in a.Command.Split('|'))
ExecuteCommand(a.Command);
ExecuteCommand(commands);
}

_cmd.FireAlert(message);
_cmd.FireAlert(message, timestamp);
}

private List<T> EnsureInitialized<T>(ref List<T> list) => list = list ?? new List<T>();
Expand Down
20 changes: 13 additions & 7 deletions CA_DataUploaderLib/CommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public sealed class CommandHandler : IDisposable
private Lazy<VectorFilterAndMath> _fullsystemFilterAndMath;

public event EventHandler<NewVectorReceivedArgs> NewVectorReceived;
public event EventHandler<AlertFiredArgs> AlertFired;
public event EventHandler<EventFiredArgs> EventFired;
public bool IsRunning { get { return _running; } }

public CommandHandler(SerialNumberMapper mapper = null)
Expand Down Expand Up @@ -99,11 +99,12 @@ public bool AssertArgs(List<string> args, int minimumLen)

public void OnNewVectorReceived(IEnumerable<SensorSample> vector) =>
NewVectorReceived?.Invoke(this, new NewVectorReceivedArgs(vector.ToDictionary(v => v.Name, v => v.Value)));

public void FireAlert(string msg)
{
AlertFired?.Invoke(this, new AlertFiredArgs(msg));
}

public void FireAlert(string msg) => FireAlert(msg, DateTime.UtcNow);
public void FireAlert(string msg, DateTime timespan) => FireCustomEvent(msg, timespan, (byte)EventType.Alert);
/// <summary>registers a custom event (low frequency, such like user commands and alerts that have a max firing rate)</summary>
/// <remarks>preferably use values above 100 for eventType to avoid future collisions with built in event types</remarks>
public void FireCustomEvent(string msg, DateTime timespan, byte eventType) => EventFired?.Invoke(this, new EventFiredArgs(msg, eventType, timespan));
private bool Stop(List<string> args)
{
_running = false;
Expand Down Expand Up @@ -168,14 +169,18 @@ private void HandleCommand(string cmdString, bool addToCommandHistory)
private List<bool> RunCommandFunctions(string cmdString, bool addToCommandHistory, List<string> cmd, List<Func<List<string>, bool>> commandFunctions)
{
List<bool> executionResults = new List<bool>(commandFunctions.Count);
var isFirstAccepted = true;
foreach (var func in commandFunctions)
{
try
{
bool accepted;
executionResults.Add(accepted = func.Invoke(cmd));
if (accepted)
if (accepted && isFirstAccepted)
{//avoid unnecesarily trying to add the command multiple times + triggering the command's EventFired
isFirstAccepted = false;
OnCommandAccepted(cmdString, addToCommandHistory); // track it in the history if at least one execution accepted the command
}
else
break; // avoid running the command on another subsystem when it was already rejected
}
Expand Down Expand Up @@ -208,6 +213,7 @@ private void OnCommandAccepted(string cmdString, bool addToCommandHistory)
if (AcceptedCommands.LastOrDefault() != cmdString)
AcceptedCommands.Add(cmdString);
AcceptedCommandsIndex = AcceptedCommands.Count;
FireCustomEvent(cmdString, DateTime.UtcNow, (byte)EventType.Command);
}

private string GetCommand()
Expand Down
36 changes: 36 additions & 0 deletions CA_DataUploaderLib/EventFiredArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Runtime.InteropServices;
using System.Text;

namespace CA_DataUploaderLib
{
public class EventFiredArgs : EventArgs
{
public EventFiredArgs(string data, EventType eventType, DateTime timespan) : this(data, (byte) eventType, timespan) { }
public EventFiredArgs(string data, byte eventType, DateTime timespan)
{
Data = data;
EventType = eventType;
TimeSpan = timespan;
}

public string Data { get; }
public byte EventType { get; }
public DateTime TimeSpan { get; }

public byte[] ToByteArray()
{
var timeTicks = TimeSpan.Ticks;
var encoding = Encoding.UTF8;
var dataBytesCount = encoding.GetByteCount(Data);
var bytes = new byte[1 + 1 + sizeof(long) + dataBytesCount]; //version + event type + time ticks + data bytes
bytes[0] = 0;
bytes[1] = EventType;
MemoryMarshal.Write(bytes.AsSpan()[2..], ref timeTicks);
var startOfDataIndex = 1 + 1 + sizeof(long);
if (dataBytesCount != encoding.GetBytes(Data, 0, Data.Length, bytes, startOfDataIndex))
throw new InvalidOperationException($"unexpected error getting utf8 bytes for event: {Data}");
return bytes;
}
}
}
8 changes: 8 additions & 0 deletions CA_DataUploaderLib/EventType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace CA_DataUploaderLib
{
public enum EventType : byte
{
Alert = 0,
Command = 1
}
}
41 changes: 20 additions & 21 deletions CA_DataUploaderLib/ServerUploader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;
Expand All @@ -20,7 +19,7 @@ public sealed class ServerUploader : IDisposable
private readonly PlotConnection _plot;
private readonly Signing _signing;
private readonly Queue<DataVector> _queue = new Queue<DataVector>();
private readonly Queue<string> _alertQueue = new Queue<string>();
private readonly Queue<EventFiredArgs> _eventsQueue = new Queue<EventFiredArgs>();
private readonly List<DateTime> _badPackages = new List<DateTime>();
private DateTime _lastTimestamp;
private bool _running;
Expand All @@ -47,7 +46,7 @@ public ServerUploader(VectorDescription vectorDescription, CommandHandler cmd)
_cmd = cmd;
cmd?.AddCommand("escape", Stop);
if (cmd != null)
cmd.AlertFired += SendAlert;
cmd.EventFired += SendEvent;
}
catch (Exception ex)
{
Expand All @@ -56,12 +55,11 @@ public ServerUploader(VectorDescription vectorDescription, CommandHandler cmd)
}
}

private void SendAlert(object sender, AlertFiredArgs e) => SendAlert(e.Message);
internal void SendAlert(string message)
private void SendEvent(object sender, EventFiredArgs e)
{
lock (_alertQueue)
if (_alertQueue.Count < 10000) // if sending thread can't catch up, then drop packages.
_alertQueue.Enqueue(message);
lock (_eventsQueue)
if (_eventsQueue.Count < 10000) // if sending thread can't catch up, then drop packages.
_eventsQueue.Enqueue(e);
}

public void SendVector(List<double> vector, DateTime timestamp)
Expand Down Expand Up @@ -103,11 +101,11 @@ private void LoopForever()
if (list != null)
PostVectorAsync(GetSignedVectors(list), list.First().timestamp);

var alerts = DequeueAllEntries(_alertQueue);
if (alerts != null)
var events = DequeueAllEntries(_eventsQueue);
if (events != null)
{
foreach (var alert in alerts)
PostAlertAsync(alert);
foreach (var @event in events)
PostEventAsync(@event);
}

PrintBadPackagesMessage(false);
Expand Down Expand Up @@ -164,10 +162,10 @@ private byte[] GetSignedVectors(List<DataVector> vectors)
return SignAndCompress(listLen.Concat(theData).ToArray());
}

private byte[] SignedMessage(string message)
{
var buffer = Encoding.UTF8.GetBytes(message);
return _signing.GetSignature(buffer).Concat(buffer).ToArray();
private byte[] GetSignedEvent(EventFiredArgs @event)
{ //this can be made more efficient to avoid extra allocations and/or use a memory pool, but these are for low frequency events so postponing looking at that.
var bytes = @event.ToByteArray();
return _signing.GetSignature(bytes).Concat(bytes).ToArray();
}

private static void OnError(string message, Exception ex) => CALog.LogErrorAndConsoleLn(LogID.A, message, ex);
Expand All @@ -188,19 +186,20 @@ private async void PostVectorAsync(byte[] buffer, DateTime timestamp)
}
}

private async void PostAlertAsync(string message)
private async void PostEventAsync(EventFiredArgs args)
{
try
{
await _plot.PostAlertAsync(SignedMessage(message));
await _plot.PostEventAsync(GetSignedEvent(args));
}
catch (Exception ex)
{
lock (_badPackages)
{
_badPackages.Add(DateTime.UtcNow);
}
OnError("failed posting alert: " + message, ex);

OnError($"failed posting event: {args.EventType} - {args.Data} - {args.TimeSpan}", ex);
}
}

Expand Down Expand Up @@ -252,9 +251,9 @@ public async Task PostVectorAsync(byte[] buffer, DateTime timestamp)
response.EnsureSuccessStatusCode();
}

public async Task PostAlertAsync(byte[] signedMessage)
public async Task PostEventAsync(byte[] signedMessage)
{
string query = $"api/LoopApi/AlertMessage?plotnameID={_plotID}";
string query = $"api/LoopApi/LogEvent?plotnameID={_plotID}";
var response = await _client.PutAsJsonAsync(query, signedMessage);
response.EnsureSuccessStatusCode();
}
Expand Down
24 changes: 24 additions & 0 deletions UnitTests/EventFiredArgsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using CA_DataUploaderLib;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Linq;
using System.Text;

namespace UnitTests
{
[TestClass]
public class EventFiredArgsTests
{
[TestMethod]
public void ToByteArrayReturnsExpectedBytes()
{
const string Data = "my message";
DateTime timespan = new DateTime(2012, 1, 1, 1, 1, 1, 1, DateTimeKind.Utc);
var args = new EventFiredArgs(Data, 2, timespan);
var bytes = args.ToByteArray();
CollectionAssert.AreEqual(new byte[] { 0, 2 }, bytes.Take(2).ToList());
CollectionAssert.AreEqual(BitConverter.GetBytes(timespan.Ticks), bytes.Skip(2).Take(sizeof(long)).ToList());
CollectionAssert.AreEqual(Encoding.UTF8.GetBytes(Data), bytes.Skip(2).Skip(sizeof(long)).ToList());
}
}
}

0 comments on commit 9e1fae4

Please sign in to comment.