-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathAzureQueueService.cs
82 lines (76 loc) · 3.71 KB
/
AzureQueueService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
using Akka;
using Akka.Streams.Azure.StorageQueue;
using Akka.Streams.Dsl;
using Azure.Storage.Queues;
using Microsoft.Extensions.Logging;
using Snd.Sdk.Tasks;
using System;
using System.Threading.Tasks;
using Snd.Sdk.Storage.Base;
using Snd.Sdk.Storage.Models;
namespace Snd.Sdk.Storage.Azure
{
/// <summary>
/// Queue Service implementation for Azure.
/// </summary>
public class AzureQueueService : IQueueService
{
private readonly QueueServiceClient queueServiceClient;
private readonly ILogger<AzureQueueService> logger;
/// <summary>
/// Creates an instance of <see cref="AzureQueueService"/>.
/// </summary>
/// <param name="queueServiceClient"></param>
/// <param name="logger"></param>
public AzureQueueService(QueueServiceClient queueServiceClient, ILogger<AzureQueueService> logger)
{
this.queueServiceClient = queueServiceClient;
this.logger = logger;
}
/// <inheritdoc />
public Source<QueueElement, NotUsed> GetQueueMessages(string queueName, TimeSpan visibilityTimeout, int prefetchCount, TimeSpan pollInterval)
{
this.logger.LogDebug("Creating a stream from queue: {queueName}, using visibility timeout {visibilityTimeout}", queueName, visibilityTimeout);
return QueueSource.Create(queue: this.queueServiceClient.GetQueueClient(queueName),
prefetchCount: prefetchCount,
options: new GetRequestOptions(visibilityTimeout),
pollInterval: pollInterval)
.Select(qm => new QueueElement
{
Content = qm.Body,
ElementId = qm.MessageId,
DeleteHandle = qm.PopReceipt,
DequeueCount = qm.DequeueCount
});
}
/// <inheritdoc />
public Task<QueueReleaseResponse> ReleaseMessage(string queueName, string receiptId, string messageId)
{
this.logger.LogDebug("Changing visibility of {messageId} from {queueName} of account {queueAccount}", messageId, queueName, this.queueServiceClient.AccountName);
return this.queueServiceClient.GetQueueClient(queueName).UpdateMessageAsync(messageId: messageId, popReceipt: receiptId, visibilityTimeout: TimeSpan.FromSeconds(0))
.Map(result => new QueueReleaseResponse
{
MessageId = messageId,
VisibleAt = result.Value.NextVisibleOn,
DeleteHandle = result.Value.PopReceipt
});
}
/// <inheritdoc />
public Task<bool> RemoveQueueMessage(string queueName, string receiptId, string messageId)
{
this.logger.LogDebug("Removing {messageId} from {queueName} of account {queueAccount}", messageId, queueName, this.queueServiceClient.AccountName);
return this.queueServiceClient.GetQueueClient(queueName).DeleteMessageAsync(messageId, receiptId).Map(result => result.Status == 200);
}
/// <inheritdoc />
public Task<QueueSendResponse> SendQueueMessage(string queueName, string messageText)
{
this.logger.LogDebug("Sending {messageText} to {queueName} of account {queueAccount}", messageText, queueName, this.queueServiceClient.AccountName);
return this.queueServiceClient.GetQueueClient(queueName).SendMessageAsync(messageText).Map(result => new QueueSendResponse
{
MessageId = result.Value.MessageId,
DeleteHandle = result.Value.PopReceipt,
InsertedAt = result.Value.InsertionTime
});
}
}
}