Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/pp9 12207 service provider exchange q creation responsibility #757

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Newtonsoft.Json;
using System;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

namespace Picturepark.SDK.V1.ServiceProvider
Expand Down Expand Up @@ -26,6 +27,7 @@ public Configuration()

public string ServiceProviderId { get; set; }

[Obsolete("NodeId will be removed in a future release")]
public string NodeId { get; set; }

public JsonSerializerSettings SerializerSettings { get; set; }
Expand Down
104 changes: 69 additions & 35 deletions src/Picturepark.SDK.V1.ServiceProvider/ServiceProviderClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,32 @@
using System.Reactive.Linq;
using System.Net.Security;
using System.Security.Authentication;
using RabbitMQ.Client.Exceptions;

namespace Picturepark.SDK.V1.ServiceProvider
{
public class ServiceProviderClient : IDisposable
{
private readonly Configuration _configuration;
private const string DefaultExchangeName = "pp.livestream";
private const string DefaultQueueName = "pp.livestream.messages";

private readonly IConnection _connection;
private readonly IModel _liveStreamModel;
private readonly IModel _requestMessageModel;
private readonly Configuration _configuration;
private readonly ConnectionFactory _factory;
private readonly LiveStreamBuffer _liveStreamBuffer;

private IConnection _connection;
private IModel _liveStreamModel;
private IModel _requestMessageModel;

private LiveStreamConsumer _liveStreamConsumer;

public ServiceProviderClient(Configuration configuration)
{
_configuration = configuration;

ConnectionFactory factory = new ConnectionFactory();
_factory = CreateConnectionFactory(configuration);

factory.HostName = configuration.Host;
factory.Port = int.Parse(configuration.Port);
factory.UserName = configuration.User;
factory.Password = configuration.Password;
factory.AutomaticRecoveryEnabled = true;
factory.VirtualHost = configuration.ServiceProviderId;
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

if (_configuration.UseSsl)
{
factory.Ssl = new SslOption()
{
Version = SslProtocols.Tls12,
Enabled = true,
ServerName = factory.HostName,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors
};
}

_connection = factory.CreateConnection();
_connection = _factory.CreateConnection();
_liveStreamModel = _connection.CreateModel();
_requestMessageModel = _connection.CreateModel();

Expand All @@ -68,16 +54,18 @@ public IObservable<EventPattern<EventArgsLiveStreamMessage>> GetLiveStreamObserv
// buffer
_liveStreamBuffer.BufferHoldBackTimeMilliseconds = delayMilliseconds;

// exchange
var exchangeName = "pp.livestream";
_liveStreamModel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);

var args = new Dictionary<string, object>();
args.Add("x-max-priority", _configuration.DefaultQueuePriorityMax);
#pragma warning disable CS0618 // Type or member is obsolete
var queueName = $"{DefaultExchangeName}.{_configuration.NodeId}";
#pragma warning restore CS0618 // Type or member is obsolete
var isUnprotectedProvider = TryDeclareExchangeAndBindQueue(queueName);
if (!isUnprotectedProvider)
{
_connection = _factory.CreateConnection();
_liveStreamModel = _connection.CreateModel();
_requestMessageModel = _connection.CreateModel();
queueName = DefaultQueueName;
}

// queue
var queueName = _liveStreamModel.QueueDeclare($"{exchangeName}.{_configuration.NodeId}", true, false, false, args);
_liveStreamModel.QueueBind(queueName, exchangeName, string.Empty, null);
_liveStreamModel.BasicQos(0, (ushort)bufferSize, false);

// create observable
Expand All @@ -88,7 +76,7 @@ public IObservable<EventPattern<EventArgsLiveStreamMessage>> GetLiveStreamObserv

// create consumer for RabbitMQ events
_liveStreamConsumer = new LiveStreamConsumer(_configuration, _liveStreamModel);
_liveStreamConsumer.Received += (o, e) => { _liveStreamBuffer.Enqueue(e); };
_liveStreamConsumer.Received += (_, e) => { _liveStreamBuffer.Enqueue(e); };

// consumer
var consumer = new EventingBasicConsumer(_requestMessageModel);
Expand All @@ -97,5 +85,51 @@ public IObservable<EventPattern<EventArgsLiveStreamMessage>> GetLiveStreamObserv

return result;
}

private bool TryDeclareExchangeAndBindQueue(string queueName)
{
try
{
_liveStreamModel.ExchangeDeclare(DefaultExchangeName, ExchangeType.Fanout);

var args = new Dictionary<string, object> { { "x-max-priority", _configuration.DefaultQueuePriorityMax } };

// queue
var queueDeclareOk = _liveStreamModel.QueueDeclare(queueName, true, false, false, args);
_liveStreamModel.QueueBind(queueDeclareOk, DefaultExchangeName, string.Empty, null);
return true;
}
catch (OperationInterruptedException ex) when (ex.ShutdownReason.ReplyCode == 403)
{
return false;
}
}

private ConnectionFactory CreateConnectionFactory(Configuration configuration)
{
var factory = new ConnectionFactory
{
HostName = configuration.Host,
Port = int.Parse(configuration.Port),
UserName = configuration.User,
Password = configuration.Password,
AutomaticRecoveryEnabled = true,
VirtualHost = configuration.ServiceProviderId,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};

if (_configuration.UseSsl)
{
factory.Ssl = new SslOption
{
Version = SslProtocols.Tls12,
Enabled = true,
ServerName = factory.HostName,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors
};
}

return factory;
}
}
}