Skip to content

Commit

Permalink
Unify on IModel
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Jul 7, 2020
1 parent d0c136c commit 33b2a40
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 23 deletions.
10 changes: 5 additions & 5 deletions projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public AsyncConsumerWorkService(int concurrency) : base(concurrency)
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
}

public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
public void Schedule<TWork>(IModel model, TWork work) where TWork : Work
{
/*
* rabbitmq/rabbitmq-dotnet-client#841
Expand All @@ -29,7 +29,7 @@ public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work

private WorkPool StartNewWorkPool(IModel model)
{
var newWorkPool = new WorkPool(model as ModelBase, _concurrency);
var newWorkPool = new WorkPool(model, _concurrency);
newWorkPool.Start();
return newWorkPool;
}
Expand All @@ -47,13 +47,13 @@ public Task Stop(IModel model)
class WorkPool
{
readonly Channel<Work> _channel;
readonly ModelBase _model;
readonly IModel _model;
private Task _worker;
private readonly int _concurrency;
private SemaphoreSlim _limiter;
private CancellationTokenSource _tokenSource;

public WorkPool(ModelBase model, int concurrency)
public WorkPool(IModel model, int concurrency)
{
_concurrency = concurrency;
_model = model;
Expand Down Expand Up @@ -125,7 +125,7 @@ async Task LoopWithConcurrency(CancellationToken cancellationToken)
}
}

static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim limiter)
static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
{
try
{
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/BasicCancel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
_consumerTag = consumerTag;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicCancel(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleBasicCancel"}
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consume
_consumerTag = consumerTag;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicCancelOk(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicCancelOk"}
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consum
_consumerTag = consumerTag;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicConsumeOk(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicConsumeOk"}
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions projects/RabbitMQ.Client/client/impl/BasicDeliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ sealed class BasicDeliver : Work
readonly IBasicProperties _basicProperties;
readonly ReadOnlyMemory<byte> _body;

public BasicDeliver(IBasicConsumer consumer,
string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
public BasicDeliver(IBasicConsumer consumer,
string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body) : base(consumer)
{
Expand All @@ -36,7 +36,7 @@ public BasicDeliver(IBasicConsumer consumer,
_body = body;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
Expand All @@ -50,12 +50,17 @@ await consumer.HandleBasicDeliver(_consumerTag,
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicDeliver"}
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
{
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/ModelShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(c
_reason = reason;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleModelShutdown(model, _reason).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{ "consumer", consumer },
{ "context", "HandleModelShutdown" }
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Work.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ protected Work(IBasicConsumer consumer)
_asyncConsumer = (IAsyncBasicConsumer)consumer;
}

public Task Execute(ModelBase model)
public Task Execute(IModel model)
{
return Execute(model, _asyncConsumer);
}

protected abstract Task Execute(ModelBase model, IAsyncBasicConsumer consumer);
protected abstract Task Execute(IModel model, IAsyncBasicConsumer consumer);
}
}

0 comments on commit 33b2a40

Please sign in to comment.