Skip to content

Commit

Permalink
just add a bunch of precondition checks
Browse files Browse the repository at this point in the history
  • Loading branch information
mookid8000 committed Nov 15, 2023
1 parent 90e4f77 commit 032ba15
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 52 deletions.
14 changes: 8 additions & 6 deletions Rebus/Auditing/Messages/AuditingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,28 @@ class AuditingHelper

bool _didInitializeAuditQueue;

public AuditingHelper(ITransport transport, string auditQueue, IRebusTime rebusTime)
public AuditingHelper(ITransport transport, string auditQueueName, IRebusTime rebusTime)
{
AuditQueue = auditQueue;
_transport = transport;
_rebusTime = rebusTime;
AuditQueueName = auditQueueName ?? throw new ArgumentNullException(nameof(auditQueueName));
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_rebusTime = rebusTime ?? throw new ArgumentNullException(nameof(rebusTime));
}

public string AuditQueue { get; }
public string AuditQueueName { get; }

public void EnsureAuditQueueHasBeenCreated()
{
if (_didInitializeAuditQueue) return;

_transport.CreateQueue(AuditQueue);
_transport.CreateQueue(AuditQueueName);

_didInitializeAuditQueue = true;
}

public void SetCommonHeaders(TransportMessage transportMessage)
{
if (transportMessage == null) throw new ArgumentNullException(nameof(transportMessage));

var headers = transportMessage.Headers;

if (_transport.Address != null)
Expand Down
8 changes: 4 additions & 4 deletions Rebus/Auditing/Messages/IncomingAuditingStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class IncomingAuditingStep : IIncomingStep, IInitializable
/// </summary>
public IncomingAuditingStep(AuditingHelper auditingHelper, ITransport transport, IRebusTime rebusTime)
{
_auditingHelper = auditingHelper;
_transport = transport;
_rebusTime = rebusTime;
_auditingHelper = auditingHelper ?? throw new ArgumentNullException(nameof(auditingHelper));
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_rebusTime = rebusTime ?? throw new ArgumentNullException(nameof(rebusTime));
}

public void Initialize()
Expand All @@ -48,6 +48,6 @@ public async Task Process(IncomingStepContext context, Func<Task> next)

clone.Headers[AuditHeaders.HandleTime] = begin.ToString("O");

await _transport.Send(_auditingHelper.AuditQueue, clone, transactionContext);
await _transport.Send(_auditingHelper.AuditQueueName, clone, transactionContext);
}
}
6 changes: 3 additions & 3 deletions Rebus/Auditing/Messages/OutgoingAuditingStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class OutgoingAuditingStep : IOutgoingStep, IInitializable
/// </summary>
public OutgoingAuditingStep(AuditingHelper auditingHelper, ITransport transport)
{
_auditingHelper = auditingHelper;
_transport = transport;
_auditingHelper = auditingHelper ?? throw new ArgumentNullException(nameof(auditingHelper));
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
}

public void Initialize()
Expand All @@ -42,7 +42,7 @@ public async Task Process(OutgoingStepContext context, Func<Task> next)

_auditingHelper.SetCommonHeaders(clone);

await _transport.Send(_auditingHelper.AuditQueue, clone, transactionContext);
await _transport.Send(_auditingHelper.AuditQueueName, clone, transactionContext);
}

await next();
Expand Down
7 changes: 6 additions & 1 deletion Rebus/Auditing/Sagas/LoggerSagaSnapperShotter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Rebus.Logging;
Expand All @@ -13,11 +14,15 @@ class LoggerSagaSnapperShotter : ISagaSnapshotStorage

public LoggerSagaSnapperShotter(IRebusLoggerFactory rebusLoggerFactory)
{
if (rebusLoggerFactory == null) throw new ArgumentNullException(nameof(rebusLoggerFactory));
_log = rebusLoggerFactory.GetLogger<LoggerSagaSnapperShotter>();
}

public async Task Save(ISagaData sagaData, Dictionary<string, string> sagaAuditMetadata)
{
if (sagaData == null) throw new ArgumentNullException(nameof(sagaData));
if (sagaAuditMetadata == null) throw new ArgumentNullException(nameof(sagaAuditMetadata));

var logData = new
{
Data = sagaData,
Expand Down
12 changes: 8 additions & 4 deletions Rebus/Auditing/Sagas/SagaAuditingConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,27 @@ public static void OutputToLog(this StandardConfigurer<ISagaSnapshotStorage> con
configurer.Register(c => new LoggerSagaSnapperShotter(c.Get<IRebusLoggerFactory>()));
}

static ITransport GetTransport(IResolutionContext c)
static ITransport GetTransport(IResolutionContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));

try
{
return c.Get<ITransport>();
return context.Get<ITransport>();
}
catch (Exception exception)
{
throw new RebusApplicationException(exception, @"Could not get transport - did you call 'EnableSagaAuditing' on a one-way client? (which is not capable of receiving messages, and therefore can never get to change the stage of any saga instances...)");
}
}

static ISagaSnapshotStorage GetSagaSnapshotStorage(IResolutionContext c)
static ISagaSnapshotStorage GetSagaSnapshotStorage(IResolutionContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));

try
{
return c.Get<ISagaSnapshotStorage>();
return context.Get<ISagaSnapshotStorage>();
}
catch (Exception exception)
{
Expand Down
4 changes: 2 additions & 2 deletions Rebus/Auditing/Sagas/SaveSagaDataSnapshotStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class SaveSagaDataSnapshotStep : IIncomingStep

public SaveSagaDataSnapshotStep(ISagaSnapshotStorage sagaSnapshotStorage, ITransport transport)
{
_sagaSnapshotStorage = sagaSnapshotStorage;
_transport = transport;
_sagaSnapshotStorage = sagaSnapshotStorage ?? throw new ArgumentNullException(nameof(sagaSnapshotStorage));
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
}

public async Task Process(IncomingStepContext context, Func<Task> next)
Expand Down
2 changes: 2 additions & 0 deletions Rebus/Bus/Advanced/AsyncHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ static class AsyncHelpers
/// </summary>
public static void RunSync(Func<Task> task)
{
if (task == null) throw new ArgumentNullException(nameof(task));

var currentContext = SynchronizationContext.Current;
var customContext = new CustomSynchronizationContext(task);

Expand Down
10 changes: 5 additions & 5 deletions Rebus/Bus/AdvancedRebusBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class AdvancedApi : IAdvancedApi

public AdvancedApi(RebusBus rebusBus, IRebusTime rebusTime)
{
_rebusBus = rebusBus;
_rebusTime = rebusTime;
_rebusBus = rebusBus ?? throw new ArgumentNullException(nameof(rebusBus));
_rebusTime = rebusTime ?? throw new ArgumentNullException(nameof(rebusTime));
}

public IWorkersApi Workers => new WorkersApi(_rebusBus);
Expand Down Expand Up @@ -110,7 +110,7 @@ class SyncApi : ISyncBus

public SyncApi(RebusBus rebusBus)
{
_rebusBus = rebusBus;
_rebusBus = rebusBus ?? throw new ArgumentNullException(nameof(rebusBus));
}

public void SendLocal(object commandMessage, IDictionary<string, string> optionalHeaders = null)
Expand Down Expand Up @@ -171,8 +171,8 @@ class RoutingApi : IRoutingApi

public RoutingApi(RebusBus rebusBus, IRebusTime rebusTime)
{
_rebusBus = rebusBus;
_rebusTime = rebusTime;
_rebusBus = rebusBus ?? throw new ArgumentNullException(nameof(rebusBus));
_rebusTime = rebusTime ?? throw new ArgumentNullException(nameof(rebusTime));
}

public Task Send(string destinationAddress, object explicitlyRoutedMessage, IDictionary<string, string> optionalHeaders = null)
Expand Down
31 changes: 4 additions & 27 deletions Rebus/Bus/MessageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,13 @@ namespace Rebus.Bus;
/// </summary>
public static class MessageExtensions
{
/// <summary>
/// Gets whether the message's <see cref="Headers.ReturnAddress"/> header is set to something
/// </summary>
public static bool HasReturnAddress(this Message message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
return message.Headers.ContainsKey(Headers.ReturnAddress);
}

/// <summary>
/// Uses the transport's input queue address as the <see cref="Headers.ReturnAddress"/> on the message
/// </summary>
public static void SetReturnAddressFromTransport(this Message message, ITransport transport)
{
if (message == null) throw new ArgumentNullException(nameof(message));
if (transport == null) throw new ArgumentNullException(nameof(transport));

var returnAddress = transport.Address;

if (string.IsNullOrWhiteSpace(returnAddress))
{
throw new InvalidOperationException("Cannot set return address from the given transport because it is not capable of receiving messages");
}

message.Headers[Headers.ReturnAddress] = returnAddress;
}

/// <summary>
/// Sets the <see cref="Headers.DeferredUntil"/> header to the specified time
/// </summary>
public static void SetDeferHeaders(this Message message, DateTimeOffset approximateDeliveryTime, string destinationAddress)
{
if (message == null) throw new ArgumentNullException(nameof(message));

InnerSetDeferHeaders(approximateDeliveryTime, message.Headers, destinationAddress);
}

Expand All @@ -51,6 +26,8 @@ public static void SetDeferHeaders(this Message message, DateTimeOffset approxim
/// </summary>
public static void SetDeferHeaders(this TransportMessage message, DateTimeOffset approximateDeliveryTime, string destinationAddress)
{
if (message == null) throw new ArgumentNullException(nameof(message));

InnerSetDeferHeaders(approximateDeliveryTime, message.Headers, destinationAddress);
}

Expand Down

0 comments on commit 032ba15

Please sign in to comment.