Skip to content

Commit

Permalink
Added delete command for dead letters (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
orjan authored Apr 16, 2024
2 parents 9213a86 + a79a729 commit 9630603
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 26 deletions.
31 changes: 31 additions & 0 deletions src/AzOps.Sb/Commands/DeadLetterDeleteCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using AzOps.Sb.Requests;
using AzOps.Sb.Requests.Filters;
using MediatR;
using Spectre.Console;
using Spectre.Console.Cli;

namespace AzOps.Sb.Commands;

public class DeadLetterDeleteCommand : AsyncCommand<DeadLetterSettings>
{
private readonly IAnsiConsole _ansiConsole;
private readonly IMediator _mediator;

public DeadLetterDeleteCommand(IAnsiConsole ansiConsole, IMediator mediator)
{
_ansiConsole = ansiConsole;
_mediator = mediator;
}
public override async Task<int> ExecuteAsync(CommandContext context, DeadLetterSettings settings)
{
var messageFilter = MessageFilter.Builder
.Create(settings.Limit)
.Build();

var deadLetters = _mediator.CreateStream(new DeadLetterDeleteRequest(settings.MapDeadLetterId(), messageFilter));

await _ansiConsole.RenderMessages(deadLetters);

return 0;
}
}
26 changes: 1 addition & 25 deletions src/AzOps.Sb/Commands/DeadLetterListCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,8 @@ public override async Task<int> ExecuteAsync(CommandContext context, DeadLetterS

var deadLetters = _mediator.CreateStream(new DeadLetterListRequest(settings.MapDeadLetterId(), messageFilter));

await Render(_ansiConsole, deadLetters);
await _ansiConsole.RenderMessages(deadLetters);

return 0;
}
private async static Task Render(IAnsiConsole console, IAsyncEnumerable<ServiceBusReceivedMessage> deadLetters)
{
await foreach (var message in deadLetters)
{
var root = new Tree(new Markup("[blue]Dead Letter Properties[/]"))
{
Guide = TreeGuide.Line,
};
root.AddNode("Message Id").AddNode(message.MessageId);
root.AddNode("Sequence Number").AddNode(message.SequenceNumber.ToString());
root.AddNode("Enqueued").AddNode(message.EnqueuedTime.ToString("O"));
root.AddNode("Dead Letter Reason").AddNode(message.DeadLetterReason);
var applicationPropertiesNode = root.AddNode("Application Properties");

foreach (var property in message.ApplicationProperties)
{
applicationPropertiesNode.AddNode(property.Key).AddNode(property.Value?.ToString() ?? "-");
}
console.Write(root);
console.MarkupLine("[blue]Message Body[/]");
console.Write(new Spectre.Console.Json.JsonText(message.Body.ToString()));
console.WriteLine();
}
}
}
2 changes: 1 addition & 1 deletion src/AzOps.Sb/Commands/DeadLetterSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace AzOps.Sb.Commands;

public sealed class DeadLetterSettings : CommandSettings
public class DeadLetterSettings : CommandSettings
{
[Description("ServiceBus namespace")]
[CommandOption("-n|--namespace")]
Expand Down
32 changes: 32 additions & 0 deletions src/AzOps.Sb/Commands/RenderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Azure.Messaging.ServiceBus;
using Spectre.Console;

namespace AzOps.Sb.Commands;

public static class RenderExtensions
{
public async static Task RenderMessages(this IAnsiConsole console, IAsyncEnumerable<ServiceBusReceivedMessage> deadLetters)
{
await foreach (var message in deadLetters)
{
var root = new Tree(new Markup("[blue]Dead Letter Properties[/]"))
{
Guide = TreeGuide.Line,
};
root.AddNode("Message Id").AddNode(message.MessageId);
root.AddNode("Sequence Number").AddNode(message.SequenceNumber.ToString());
root.AddNode("Enqueued").AddNode(message.EnqueuedTime.ToString("O"));
root.AddNode("Dead Letter Reason").AddNode(message.DeadLetterReason);
var applicationPropertiesNode = root.AddNode("Application Properties");

foreach (var property in message.ApplicationProperties)
{
applicationPropertiesNode.AddNode(property.Key).AddNode(property.Value?.ToString() ?? "-");
}
console.Write(root);
console.MarkupLine("[blue]Message Body[/]");
console.Write(new Spectre.Console.Json.JsonText(message.Body.ToString()));
console.WriteLine();
}
}
}
1 change: 1 addition & 0 deletions src/AzOps.Sb/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static int Main(string[] args)
{
add.AddCommand<DeadLetterListCommand>("list");
add.AddCommand<DeadLetterRequeueCommand>("requeue");
add.AddCommand<DeadLetterDeleteCommand>("delete");
});
#if DEBUG
appConfig.PropagateExceptions();
Expand Down
69 changes: 69 additions & 0 deletions src/AzOps.Sb/Requests/DeadLetterRequeueDelete.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System.Runtime.CompilerServices;
using AzOps.Sb.Requests.Filters;
using Azure.Messaging.ServiceBus;
using MediatR;

namespace AzOps.Sb.Requests;

public record DeadLetterDeleteRequest(DeadLetterId DeadLetterId, MessageFilter Filters) : IStreamRequest<ServiceBusReceivedMessage>
{
}

public class
DeadLetterDeleteRequestHandler : IStreamRequestHandler<DeadLetterDeleteRequest, ServiceBusReceivedMessage>
{
private readonly ServiceBusClientFactory _serviceBusClientFactory;

public DeadLetterDeleteRequestHandler(ServiceBusClientFactory serviceBusClientFactory)
{
_serviceBusClientFactory = serviceBusClientFactory ?? throw new ArgumentNullException(nameof(serviceBusClientFactory));
}

public async IAsyncEnumerable<ServiceBusReceivedMessage> Handle(
DeadLetterDeleteRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var filter = new ServiceBusMessageFilter(request.Filters);
await using var serviceBusClient = _serviceBusClientFactory(request.DeadLetterId);

var serviceBusReceiver = serviceBusClient.CreateReceiver(request.DeadLetterId.Topic, request.DeadLetterId.Subscription,
new ServiceBusReceiverOptions
{
ReceiveMode = ServiceBusReceiveMode.PeekLock,
SubQueue = SubQueue.DeadLetter
});

while (true)
{
cancellationToken.ThrowIfCancellationRequested();

var message = await serviceBusReceiver.ReceiveMessageAsync(
maxWaitTime: TimeSpan.FromSeconds(3),
cancellationToken: cancellationToken);

if (message == null)
{
yield break;
}

if (filter.IsValidMessage(message))
{
try
{
await serviceBusReceiver.CompleteMessageAsync(message, cancellationToken);
}
catch (Exception)
{
await serviceBusReceiver.AbandonMessageAsync(message, cancellationToken: cancellationToken);
throw;
}
yield return message;
}
else
{
await serviceBusReceiver.AbandonMessageAsync(message, cancellationToken: cancellationToken);
yield break;
}
}
}
}

0 comments on commit 9630603

Please sign in to comment.