Skip to content

Commit

Permalink
Merge pull request #757 from Picturepark/feature/PP9-12207-service-pr…
Browse files Browse the repository at this point in the history
…ovider-exchange-q-creation-responsibility

Feature/pp9 12207 service provider exchange q creation responsibility
  • Loading branch information
phoeniks-sk authored Oct 14, 2024
2 parents b4a76bb + 51bcba1 commit 9f4894a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Newtonsoft.Json;
using System;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

namespace Picturepark.SDK.V1.ServiceProvider
Expand Down Expand Up @@ -26,6 +27,7 @@ public Configuration()

public string ServiceProviderId { get; set; }

[Obsolete("NodeId will be removed in a future release")]
public string NodeId { get; set; }

public JsonSerializerSettings SerializerSettings { get; set; }
Expand Down
104 changes: 69 additions & 35 deletions src/Picturepark.SDK.V1.ServiceProvider/ServiceProviderClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,32 @@
using System.Reactive.Linq;
using System.Net.Security;
using System.Security.Authentication;
using RabbitMQ.Client.Exceptions;

namespace Picturepark.SDK.V1.ServiceProvider
{
public class ServiceProviderClient : IDisposable
{
private readonly Configuration _configuration;
private const string DefaultExchangeName = "pp.livestream";
private const string DefaultQueueName = "pp.livestream.messages";

private readonly IConnection _connection;
private readonly IModel _liveStreamModel;
private readonly IModel _requestMessageModel;
private readonly Configuration _configuration;
private readonly ConnectionFactory _factory;
private readonly LiveStreamBuffer _liveStreamBuffer;

private IConnection _connection;
private IModel _liveStreamModel;
private IModel _requestMessageModel;

private LiveStreamConsumer _liveStreamConsumer;

public ServiceProviderClient(Configuration configuration)
{
_configuration = configuration;

ConnectionFactory factory = new ConnectionFactory();
_factory = CreateConnectionFactory(configuration);

factory.HostName = configuration.Host;
factory.Port = int.Parse(configuration.Port);
factory.UserName = configuration.User;
factory.Password = configuration.Password;
factory.AutomaticRecoveryEnabled = true;
factory.VirtualHost = configuration.ServiceProviderId;
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

if (_configuration.UseSsl)
{
factory.Ssl = new SslOption()
{
Version = SslProtocols.Tls12,
Enabled = true,
ServerName = factory.HostName,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors
};
}

_connection = factory.CreateConnection();
_connection = _factory.CreateConnection();
_liveStreamModel = _connection.CreateModel();
_requestMessageModel = _connection.CreateModel();

Expand All @@ -68,16 +54,18 @@ public IObservable<EventPattern<EventArgsLiveStreamMessage>> GetLiveStreamObserv
// buffer
_liveStreamBuffer.BufferHoldBackTimeMilliseconds = delayMilliseconds;

// exchange
var exchangeName = "pp.livestream";
_liveStreamModel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);

var args = new Dictionary<string, object>();
args.Add("x-max-priority", _configuration.DefaultQueuePriorityMax);
#pragma warning disable CS0618 // Type or member is obsolete
var queueName = $"{DefaultExchangeName}.{_configuration.NodeId}";
#pragma warning restore CS0618 // Type or member is obsolete
var isUnprotectedProvider = TryDeclareExchangeAndBindQueue(queueName);
if (!isUnprotectedProvider)
{
_connection = _factory.CreateConnection();
_liveStreamModel = _connection.CreateModel();
_requestMessageModel = _connection.CreateModel();
queueName = DefaultQueueName;
}

// queue
var queueName = _liveStreamModel.QueueDeclare($"{exchangeName}.{_configuration.NodeId}", true, false, false, args);
_liveStreamModel.QueueBind(queueName, exchangeName, string.Empty, null);
_liveStreamModel.BasicQos(0, (ushort)bufferSize, false);

// create observable
Expand All @@ -88,7 +76,7 @@ public IObservable<EventPattern<EventArgsLiveStreamMessage>> GetLiveStreamObserv

// create consumer for RabbitMQ events
_liveStreamConsumer = new LiveStreamConsumer(_configuration, _liveStreamModel);
_liveStreamConsumer.Received += (o, e) => { _liveStreamBuffer.Enqueue(e); };
_liveStreamConsumer.Received += (_, e) => { _liveStreamBuffer.Enqueue(e); };

// consumer
var consumer = new EventingBasicConsumer(_requestMessageModel);
Expand All @@ -97,5 +85,51 @@ public IObservable<EventPattern<EventArgsLiveStreamMessage>> GetLiveStreamObserv

return result;
}

private bool TryDeclareExchangeAndBindQueue(string queueName)
{
try
{
_liveStreamModel.ExchangeDeclare(DefaultExchangeName, ExchangeType.Fanout);

var args = new Dictionary<string, object> { { "x-max-priority", _configuration.DefaultQueuePriorityMax } };

// queue
var queueDeclareOk = _liveStreamModel.QueueDeclare(queueName, true, false, false, args);
_liveStreamModel.QueueBind(queueDeclareOk, DefaultExchangeName, string.Empty, null);
return true;
}
catch (OperationInterruptedException ex) when (ex.ShutdownReason.ReplyCode == 403)
{
return false;
}
}

private ConnectionFactory CreateConnectionFactory(Configuration configuration)
{
var factory = new ConnectionFactory
{
HostName = configuration.Host,
Port = int.Parse(configuration.Port),
UserName = configuration.User,
Password = configuration.Password,
AutomaticRecoveryEnabled = true,
VirtualHost = configuration.ServiceProviderId,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};

if (_configuration.UseSsl)
{
factory.Ssl = new SslOption
{
Version = SslProtocols.Tls12,
Enabled = true,
ServerName = factory.HostName,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors
};
}

return factory;
}
}
}

0 comments on commit 9f4894a

Please sign in to comment.