From ac4ad8bb099172741e52cb75e72939f96bba2b73 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 20 May 2020 15:23:25 -0700 Subject: [PATCH] Refactor other use of ConcurrentDictionary GetOrAdd --- .../client/impl/AsyncConsumerWorkService.cs | 16 +++++++--------- .../client/impl/ConsumerWorkService.cs | 12 ++++++++++-- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs b/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs index 9bcecad2eb..232c383d88 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs @@ -8,14 +8,8 @@ namespace RabbitMQ.Client.Impl { internal sealed class AsyncConsumerWorkService : ConsumerWorkService { - private readonly ConcurrentDictionary _workPools; - private readonly Func _startNewWorkPoolFunc; - - public AsyncConsumerWorkService() - { - _workPools = new ConcurrentDictionary(); - _startNewWorkPoolFunc = model => StartNewWorkPool(model); - } + private readonly ConcurrentDictionary _workPools = new ConcurrentDictionary(); + private readonly Func _startNewWorkPoolFunc = model => StartNewWorkPool(model); public void Schedule(ModelBase model, TWork work) where TWork : Work { @@ -25,10 +19,14 @@ public void Schedule(ModelBase model, TWork work) where TWork : Work * * The lock is necessary because calling the value delegate is not atomic. */ + WorkPool workPool; + lock (_workPools) { - _workPools.GetOrAdd(model, _startNewWorkPoolFunc).Enqueue(work); + workPool = _workPools.GetOrAdd(model, _startNewWorkPoolFunc); } + + workPool.Enqueue(work); } private static WorkPool StartNewWorkPool(IModel model) diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs b/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs index 9fc732e3a0..b37d5e9257 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs @@ -8,13 +8,21 @@ namespace RabbitMQ.Client.Impl class ConsumerWorkService { private readonly ConcurrentDictionary _workPools = new ConcurrentDictionary(); + private readonly Func _startNewWorkPoolFunc = model => StartNewWorkPool(model); public void AddWork(IModel model, Action fn) { - _workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(fn); + WorkPool workPool; + + lock (_workPools) + { + workPool = _workPools.GetOrAdd(model, _startNewWorkPoolFunc); + } + + workPool.Enqueue(fn); } - private WorkPool StartNewWorkPool(IModel model) + private static WorkPool StartNewWorkPool(IModel model) { var newWorkPool = new WorkPool(); newWorkPool.Start();