From 33b2a404efe9c4ee76e0938d7fc7a02c011c7625 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 2 Jun 2020 12:45:27 +0200 Subject: [PATCH] Unify on IModel --- .../client/impl/AsyncConsumerWorkService.cs | 10 ++++----- .../client/impl/BasicCancel.cs | 9 ++++++-- .../client/impl/BasicCancelOk.cs | 9 ++++++-- .../client/impl/BasicConsumeOk.cs | 9 ++++++-- .../client/impl/BasicDeliver.cs | 21 ++++++++++++------- .../client/impl/ModelShutdown.cs | 9 ++++++-- projects/RabbitMQ.Client/client/impl/Work.cs | 4 ++-- 7 files changed, 48 insertions(+), 23 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs b/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs index 06ef7433f1..3a953056be 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs @@ -16,7 +16,7 @@ public AsyncConsumerWorkService(int concurrency) : base(concurrency) _startNewWorkPoolFunc = model => StartNewWorkPool(model); } - public void Schedule(ModelBase model, TWork work) where TWork : Work + public void Schedule(IModel model, TWork work) where TWork : Work { /* * rabbitmq/rabbitmq-dotnet-client#841 @@ -29,7 +29,7 @@ public void Schedule(ModelBase model, TWork work) where TWork : Work private WorkPool StartNewWorkPool(IModel model) { - var newWorkPool = new WorkPool(model as ModelBase, _concurrency); + var newWorkPool = new WorkPool(model, _concurrency); newWorkPool.Start(); return newWorkPool; } @@ -47,13 +47,13 @@ public Task Stop(IModel model) class WorkPool { readonly Channel _channel; - readonly ModelBase _model; + readonly IModel _model; private Task _worker; private readonly int _concurrency; private SemaphoreSlim _limiter; private CancellationTokenSource _tokenSource; - public WorkPool(ModelBase model, int concurrency) + public WorkPool(IModel model, int concurrency) { _concurrency = concurrency; _model = model; @@ -125,7 +125,7 @@ async Task LoopWithConcurrency(CancellationToken cancellationToken) } } - static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim limiter) + static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter) { try { diff --git a/projects/RabbitMQ.Client/client/impl/BasicCancel.cs b/projects/RabbitMQ.Client/client/impl/BasicCancel.cs index 3839a5c91e..3917955dbc 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicCancel.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicCancel.cs @@ -15,7 +15,7 @@ public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer) _consumerTag = consumerTag; } - protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) { try { @@ -23,12 +23,17 @@ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consu } catch (Exception e) { + if (!(model is ModelBase modelBase)) + { + return; + } + var details = new Dictionary { {"consumer", consumer}, {"context", "HandleBasicCancel"} }; - model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); } } } diff --git a/projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs b/projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs index 191c3efe63..1e38f3b6dc 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs @@ -15,7 +15,7 @@ public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consume _consumerTag = consumerTag; } - protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) { try { @@ -23,12 +23,17 @@ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consu } catch (Exception e) { + if (!(model is ModelBase modelBase)) + { + return; + } + var details = new Dictionary() { {"consumer", consumer}, {"context", "HandleBasicCancelOk"} }; - model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); } } } diff --git a/projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs b/projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs index 7645221fb2..81c429db1e 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs @@ -15,7 +15,7 @@ public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consum _consumerTag = consumerTag; } - protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) { try { @@ -23,12 +23,17 @@ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consu } catch (Exception e) { + if (!(model is ModelBase modelBase)) + { + return; + } + var details = new Dictionary() { {"consumer", consumer}, {"context", "HandleBasicConsumeOk"} }; - model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); } } } diff --git a/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs b/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs index 067e6908eb..3df3e24b82 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs @@ -18,12 +18,12 @@ sealed class BasicDeliver : Work readonly IBasicProperties _basicProperties; readonly ReadOnlyMemory _body; - public BasicDeliver(IBasicConsumer consumer, - string consumerTag, - ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, + public BasicDeliver(IBasicConsumer consumer, + string consumerTag, + ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, IBasicProperties basicProperties, ReadOnlyMemory body) : base(consumer) { @@ -36,7 +36,7 @@ public BasicDeliver(IBasicConsumer consumer, _body = body; } - protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) { try { @@ -50,12 +50,17 @@ await consumer.HandleBasicDeliver(_consumerTag, } catch (Exception e) { + if (!(model is ModelBase modelBase)) + { + return; + } + var details = new Dictionary() { {"consumer", consumer}, {"context", "HandleBasicDeliver"} }; - model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); } finally { diff --git a/projects/RabbitMQ.Client/client/impl/ModelShutdown.cs b/projects/RabbitMQ.Client/client/impl/ModelShutdown.cs index 5d94643986..aa4b27682c 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelShutdown.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelShutdown.cs @@ -15,7 +15,7 @@ public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(c _reason = reason; } - protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) { try { @@ -23,12 +23,17 @@ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consu } catch (Exception e) { + if (!(model is ModelBase modelBase)) + { + return; + } + var details = new Dictionary() { { "consumer", consumer }, { "context", "HandleModelShutdown" } }; - model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); } } } diff --git a/projects/RabbitMQ.Client/client/impl/Work.cs b/projects/RabbitMQ.Client/client/impl/Work.cs index d09b4607c2..325435e255 100644 --- a/projects/RabbitMQ.Client/client/impl/Work.cs +++ b/projects/RabbitMQ.Client/client/impl/Work.cs @@ -11,11 +11,11 @@ protected Work(IBasicConsumer consumer) _asyncConsumer = (IAsyncBasicConsumer)consumer; } - public Task Execute(ModelBase model) + public Task Execute(IModel model) { return Execute(model, _asyncConsumer); } - protected abstract Task Execute(ModelBase model, IAsyncBasicConsumer consumer); + protected abstract Task Execute(IModel model, IAsyncBasicConsumer consumer); } }