Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-1294 Add support to FIFO #3437

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docker-compose-localstack.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '3'

services:
localstack:
image: localstack/localstack
environment:
# LocalStack configuration: https://docs.localstack.cloud/references/configuration/
- "SERVICES=s3,sqs,sns,dynamodb"
ports:
- "4566:4566" # LocalStack Gateway
- "4510-4559:4510-4559" # External services port range
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#region Licence

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Overall Code Complexity
This module has a mean cyclomatic complexity of 5.92 across 13 functions. The mean complexity threshold is 4

Suppress

/* The MIT License (MIT)
Copyright © 2022 Ian Cooper <[email protected]>

Expand All @@ -25,6 +26,7 @@ THE SOFTWARE. */
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Amazon.Runtime.Internal.Transform;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Microsoft.Extensions.Logging;
Expand All @@ -46,43 +48,73 @@ public AWSMessagingGateway(AWSMessagingGatewayConnection awsConnection)
_awsClientFactory = new AWSClientFactory(awsConnection);
}

protected async Task<string> EnsureTopicAsync(RoutingKey topic, SnsAttributes attributes, TopicFindBy topicFindBy, OnMissingChannel makeTopic)
protected async Task<string> EnsureTopicAsync(RoutingKey topic, SnsAttributes attributes,
TopicFindBy topicFindBy, OnMissingChannel makeTopic, SnsSqsType snsSqsType, bool deduplication)
{
//on validate or assume, turn a routing key into a topicARN
if ((makeTopic == OnMissingChannel.Assume) || (makeTopic == OnMissingChannel.Validate))
await ValidateTopicAsync(topic, topicFindBy, makeTopic);
else if (makeTopic == OnMissingChannel.Create) CreateTopic(topic, attributes);
if (makeTopic is OnMissingChannel.Assume or OnMissingChannel.Validate)
{
await ValidateTopicAsync(topic, topicFindBy, makeTopic, snsSqsType);
}
else if (makeTopic == OnMissingChannel.Create)
{
CreateTopic(topic, attributes, snsSqsType, deduplication);
}
return ChannelTopicArn;
}

private void CreateTopic(RoutingKey topicName, SnsAttributes snsAttributes)
private void CreateTopic(RoutingKey topicName,
SnsAttributes snsAttributes,
SnsSqsType snsSqsType,
bool deduplication)
{
using var snsClient = _awsClientFactory.CreateSnsClient();
var attributes = new Dictionary<string, string>();
if (snsAttributes != null)
{
if (!string.IsNullOrEmpty(snsAttributes.DeliveryPolicy)) attributes.Add("DeliveryPolicy", snsAttributes.DeliveryPolicy);
if (!string.IsNullOrEmpty(snsAttributes.Policy)) attributes.Add("Policy", snsAttributes.Policy);
if (!string.IsNullOrEmpty(snsAttributes.DeliveryPolicy))
{
attributes.Add("DeliveryPolicy", snsAttributes.DeliveryPolicy);
}

if (!string.IsNullOrEmpty(snsAttributes.Policy))
{
attributes.Add("Policy", snsAttributes.Policy);
}
}

var createTopicRequest = new CreateTopicRequest(topicName)
string name = topicName;
if (snsSqsType == SnsSqsType.Fifo)
{
Attributes = attributes,
Tags = new List<Tag> {new Tag {Key = "Source", Value = "Brighter"}}
};
name += ".fifo";

attributes.Add("FifoTopic", "true");
if (deduplication)
{
attributes.Add("ContentBasedDeduplication", "true");
}
}

var createTopicRequest = new CreateTopicRequest(name)
{
Attributes = attributes,
Tags = [new Tag { Key = "Source", Value = "Brighter" }]
};

//create topic is idempotent, so safe to call even if topic already exists
var createTopic = snsClient.CreateTopicAsync(createTopicRequest).Result;

if (!string.IsNullOrEmpty(createTopic.TopicArn))
ChannelTopicArn = createTopic.TopicArn;
else
throw new InvalidOperationException($"Could not create Topic topic: {topicName} on {_awsConnection.Region}");
throw new InvalidOperationException(
$"Could not create Topic topic: {name} on {_awsConnection.Region}");
}

private async Task ValidateTopicAsync(RoutingKey topic, TopicFindBy findTopicBy, OnMissingChannel onMissingChannel)
private async Task ValidateTopicAsync(RoutingKey topic, TopicFindBy findTopicBy,
OnMissingChannel onMissingChannel, SnsSqsType snsSqsType)
{
IValidateTopic topicValidationStrategy = GetTopicValidationStrategy(findTopicBy);
IValidateTopic topicValidationStrategy = GetTopicValidationStrategy(findTopicBy, snsSqsType);
(bool exists, string topicArn) = await topicValidationStrategy.ValidateAsync(topic);
if (exists)
ChannelTopicArn = topicArn;
Expand All @@ -91,16 +123,19 @@ private async Task ValidateTopicAsync(RoutingKey topic, TopicFindBy findTopicBy,
$"Topic validation error: could not find topic {topic}. Did you want Brighter to create infrastructure?");
}

private IValidateTopic GetTopicValidationStrategy(TopicFindBy findTopicBy)
private IValidateTopic GetTopicValidationStrategy(TopicFindBy findTopicBy, SnsSqsType type)
{
switch (findTopicBy)
{
case TopicFindBy.Arn:
return new ValidateTopicByArn(_awsConnection.Credentials, _awsConnection.Region, _awsConnection.ClientConfigAction);
return new ValidateTopicByArn(_awsConnection.Credentials, _awsConnection.Region,
_awsConnection.ClientConfigAction);
case TopicFindBy.Convention:
return new ValidateTopicByArnConvention(_awsConnection.Credentials, _awsConnection.Region, _awsConnection.ClientConfigAction);
return new ValidateTopicByArnConvention(_awsConnection.Credentials, _awsConnection.Region,
_awsConnection.ClientConfigAction, type);
case TopicFindBy.Name:
return new ValidateTopicByName(_awsConnection.Credentials, _awsConnection.Region, _awsConnection.ClientConfigAction);
return new ValidateTopicByName(_awsConnection.Credentials, _awsConnection.Region,
_awsConnection.ClientConfigAction, type);
default:
throw new ConfigurationException("Unknown TopicFindBy used to determine how to read RoutingKey");
}
Expand Down
Loading