Skip to content

Commit

Permalink
refactor: use dict of link pool
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem committed Feb 2, 2024
1 parent e00387c commit 33fbecc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 39 deletions.
74 changes: 35 additions & 39 deletions Adaptors/Amqp/src/PushQueueStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,20 @@ namespace ArmoniK.Core.Adapters.Amqp;

public class PushQueueStorage : QueueStorage, IPushQueueStorage
{
private const int MaxInternalQueuePriority = 10;
private readonly TimeSpan baseDelay_ = TimeSpan.FromMilliseconds(100);
private readonly ILogger<PushQueueStorage> logger_;
private readonly ConcurrentDictionary<string, SenderLink> senders_ = new();
private readonly ObjectPool<Session> sessionPool_;
private const int MaxInternalQueuePriority = 10;
private readonly TimeSpan baseDelay_ = TimeSpan.FromMilliseconds(100);
private readonly ILogger<PushQueueStorage> logger_;
private readonly int parallelismLimit_;
private readonly ConcurrentDictionary<string, ObjectPool<SenderLink>> senders_ = new();

public PushQueueStorage(QueueCommon.Amqp options,
IConnectionAmqp connectionAmqp,
ILogger<PushQueueStorage> logger)
: base(options,
connectionAmqp)
{
logger_ = logger;
sessionPool_ = new ObjectPool<Session>(200,
async token => new Session(await connectionAmqp.GetConnectionAsync(token)
.ConfigureAwait(false)),
(session,
_) => new ValueTask<bool>(!session.IsClosed));
parallelismLimit_ = options.ParallelismLimit;
logger_ = logger;
}

/// <inheritdoc />
Expand Down Expand Up @@ -98,7 +94,8 @@ private async Task PushMessagesAsync(IEnumerable<MessageData> messages,
internalPriority);


await messages.ParallelForEach(new ParallelTaskOptions(cancellationToken),
await messages.ParallelForEach(new ParallelTaskOptions(parallelismLimit_,
cancellationToken),
async msgData =>
{
for (var retry = 0; retry < Options.MaxRetries; retry++)
Expand All @@ -107,34 +104,33 @@ await messages.ParallelForEach(new ParallelTaskOptions(cancellationToken),
{
var address = $"{partitionId}###q{whichQueue}";

SenderLink sender;
while ((sender = senders_.GetOrAdd(address,
s => new SenderLink(sessionPool_.Get(),
Guid.NewGuid()
.ToString(),
s))).Session.IsClosed)
{
if (senders_.TryRemove(address,
out sender!))
{
await sender.CloseAsync()
.ConfigureAwait(false);
}
}
var pool = senders_.GetOrAdd(address,
s => new ObjectPool<SenderLink>(parallelismLimit_,
async token => new SenderLink(new Session(await ConnectionAmqp
.GetConnectionAsync(token)
.ConfigureAwait(false)),
Guid.NewGuid()
.ToString(),
s),
(link,
_) => new ValueTask<bool>(!link.IsClosed &&
!link.Session.IsClosed)));

await pool.WithInstanceAsync(sender => sender.SendAsync(new Message(Encoding.UTF8.GetBytes(msgData.TaskId))
{
Header = new Header
{
Priority = (byte)internalPriority,
},
Properties = new Properties
{
MessageId = Guid.NewGuid()
.ToString(),
},
}),
cancellationToken)
.ConfigureAwait(false);

await sender.SendAsync(new Message(Encoding.UTF8.GetBytes(msgData.TaskId))
{
Header = new Header
{
Priority = (byte)internalPriority,
},
Properties = new Properties
{
MessageId = Guid.NewGuid()
.ToString(),
},
})
.ConfigureAwait(false);
break;
}
catch (Exception e)
Expand Down
1 change: 1 addition & 0 deletions Adaptors/QueueCommon/src/Amqp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ public class Amqp
public bool AllowHostMismatch { get; set; }
public int MaxRetries { get; set; }
public int LinkCredit { get; set; }
public int ParallelismLimit { get; set; } = 200;
}

0 comments on commit 33fbecc

Please sign in to comment.