Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle default authentication for SQS #809

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Adaptors/S3/src/ArmoniK.Core.Adapters.S3.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="AWSSDK.S3" Version="3.7.405" />
<!-- AWSSDK.SecurityToken is required to handle the default authentication -->
<PackageReference Include="AWSSDK.SecurityToken" Version="3.7.400.36" />
</ItemGroup>

Expand Down
25 changes: 22 additions & 3 deletions Adaptors/SQS/src/AmazonSQSClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -25,9 +26,10 @@ namespace ArmoniK.Core.Adapters.SQS;

internal static class AmazonSqsClientExt
{
public static async Task<string> GetOrCreateQueueUrlAsync(this AmazonSQSClient client,
string queueName,
CancellationToken cancellationToken)
public static async Task<string> GetOrCreateQueueUrlAsync(this AmazonSQSClient client,
string queueName,
Dictionary<string, string> tags,
CancellationToken cancellationToken)
{
try
{
Expand All @@ -40,9 +42,26 @@ public static async Task<string> GetOrCreateQueueUrlAsync(this AmazonSQSClient c
return (await client.CreateQueueAsync(new CreateQueueRequest
{
QueueName = queueName,
Tags = tags,
},
cancellationToken)
.ConfigureAwait(false)).QueueUrl;
}
}

public static string GetQueueName(this AmazonSQSClient client,
SQS options,
string? partition = null)
{
_ = client;

if (string.IsNullOrEmpty(partition))
{
partition = options.PartitionId;
}

return string.IsNullOrEmpty(options.Prefix)
? partition
: $"{options.Prefix}-{partition}";
}
}
2 changes: 2 additions & 0 deletions Adaptors/SQS/src/ArmoniK.Core.Adapters.SQS.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

<ItemGroup>
<PackageReference Include="AWSSDK.SQS" Version="3.7.400.36" />
<!-- AWSSDK.SecurityToken is required to handle the default authentication -->
<PackageReference Include="AWSSDK.SecurityToken" Version="3.7.400.36" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.1" />
</ItemGroup>

Expand Down
11 changes: 7 additions & 4 deletions Adaptors/SQS/src/PullQueueStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ internal class PullQueueStorage : IPullQueueStorage
// ReSharper disable once NotAccessedField.Local
private readonly ILogger<PullQueueStorage> logger_;

private readonly string queueName_;
private bool isInitialized_;
private string? queueUrl_;
private readonly string queueName_;
private readonly Dictionary<string, string> tags_;
private bool isInitialized_;
private string? queueUrl_;

public PullQueueStorage(AmazonSQSClient client,
SQS options,
ILogger<PullQueueStorage> logger)
{
client_ = client;
logger_ = logger;
queueName_ = $"a{options.Prefix}-{options.PartitionId}";
queueName_ = client.GetQueueName(options);
tags_ = options.Tags;

ackDeadlinePeriod_ = options.AckDeadlinePeriod;
ackExtendDeadlineStep_ = options.AckExtendDeadlineStep;
Expand Down Expand Up @@ -95,6 +97,7 @@ public async Task Init(CancellationToken cancellationToken)
if (!isInitialized_)
{
queueUrl_ = await client_.GetOrCreateQueueUrlAsync(queueName_,
tags_,
cancellationToken)
.ConfigureAwait(false);

Expand Down
4 changes: 3 additions & 1 deletion Adaptors/SQS/src/PushQueueStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ public async Task PushMessagesAsync(IEnumerable<MessageData> messages,
throw new InvalidOperationException($"{nameof(PushQueueStorage)} should be initialized before calling this method.");
}

var queueName = $"a{options_.Prefix}-{partitionId}";
var queueName = client_.GetQueueName(options_,
partitionId);

var queueUrl = await cache_.GetOrCreateAsync(queueName,
_ => client_.GetOrCreateQueueUrlAsync(queueName,
options_.Tags,
cancellationToken))
.ConfigureAwait(false);

Expand Down
5 changes: 1 addition & 4 deletions Adaptors/SQS/src/QueueBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

using System;

using Amazon.Runtime;
using Amazon.SQS;

using ArmoniK.Core.Base;
Expand All @@ -40,9 +39,7 @@ public void Build(IServiceCollection serviceCollection,
var sqsOptions = configuration.GetSection(SQS.SettingSection)
.Get<SQS>() ?? throw new InvalidOperationException("Options not found");

var credentials = new EnvironmentVariablesAWSCredentials();
var client = new AmazonSQSClient(credentials,
new AmazonSQSConfig
var client = new AmazonSQSClient(new AmazonSQSConfig
{
ServiceURL = sqsOptions.ServiceURL,
});
Expand Down
7 changes: 7 additions & 0 deletions Adaptors/SQS/src/SQS.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System.Collections.Generic;

namespace ArmoniK.Core.Adapters.SQS;

internal class SQS
Expand All @@ -37,6 +39,11 @@ internal class SQS
/// </summary>
public string Prefix { get; set; } = string.Empty;

/// <summary>
/// AWS Tags to add to the Queues when they are created
/// </summary>
public Dictionary<string, string> Tags { get; set; } = new();
lemaitre-aneo marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Acknowledgment deadline in seconds: If a message wasn't acknowledged within this deadline, it will be
/// redelivered .
Expand Down
2 changes: 2 additions & 0 deletions terraform/modules/storage/queue/sqs/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ output "generated_env_vars" {
"Components__QueueAdaptorSettings__AdapterAbsolutePath" = "/adapters/queue/sqs/ArmoniK.Core.Adapters.SQS.dll"
"SQS__ServiceURL" = "http://${var.queue_envs.host}:4566"
"SQS__PartitionId" = "TestPartition0"
"SQS__Prefix" = "armonik"
"SQS__Tags__deployment" = "docker"
"AWS_ACCESS_KEY_ID" = "localkey"
"AWS_SECRET_ACCESS_KEY" = "localsecret"
})
Expand Down
Loading