Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify on IModel #858

Merged
merged 2 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public AsyncConsumerWorkService(int concurrency) : base(concurrency)
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
}

public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
public void Schedule<TWork>(IModel model, TWork work) where TWork : Work
{
/*
* rabbitmq/rabbitmq-dotnet-client#841
Expand All @@ -29,7 +29,7 @@ public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work

private WorkPool StartNewWorkPool(IModel model)
{
var newWorkPool = new WorkPool(model as ModelBase, _concurrency);
var newWorkPool = new WorkPool(model, _concurrency);
newWorkPool.Start();
return newWorkPool;
}
Expand All @@ -47,13 +47,13 @@ public Task Stop(IModel model)
class WorkPool
{
readonly Channel<Work> _channel;
readonly ModelBase _model;
readonly IModel _model;
private Task _worker;
private readonly int _concurrency;
private SemaphoreSlim _limiter;
private CancellationTokenSource _tokenSource;

public WorkPool(ModelBase model, int concurrency)
public WorkPool(IModel model, int concurrency)
{
_concurrency = concurrency;
_model = model;
Expand Down Expand Up @@ -125,7 +125,7 @@ async Task LoopWithConcurrency(CancellationToken cancellationToken)
}
}

static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim limiter)
static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
{
try
{
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/BasicCancel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
_consumerTag = consumerTag;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicCancel(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleBasicCancel"}
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consume
_consumerTag = consumerTag;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicCancelOk(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicCancelOk"}
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consum
_consumerTag = consumerTag;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicConsumeOk(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicConsumeOk"}
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions projects/RabbitMQ.Client/client/impl/BasicDeliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ sealed class BasicDeliver : Work
readonly IBasicProperties _basicProperties;
readonly ReadOnlyMemory<byte> _body;

public BasicDeliver(IBasicConsumer consumer,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to roll back if you want to keep the white space

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, turning down a PR because of a single whitespace change sounds unreasonable :)

string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
public BasicDeliver(IBasicConsumer consumer,
string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body) : base(consumer)
{
Expand All @@ -36,7 +36,7 @@ public BasicDeliver(IBasicConsumer consumer,
_body = body;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
Expand All @@ -50,12 +50,17 @@ await consumer.HandleBasicDeliver(_consumerTag,
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicDeliver"}
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
{
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/ModelShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(c
_reason = reason;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleModelShutdown(model, _reason).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{ "consumer", consumer },
{ "context", "HandleModelShutdown" }
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Work.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ protected Work(IBasicConsumer consumer)
_asyncConsumer = (IAsyncBasicConsumer)consumer;
}

public Task Execute(ModelBase model)
public Task Execute(IModel model)
{
return Execute(model, _asyncConsumer);
}

protected abstract Task Execute(ModelBase model, IAsyncBasicConsumer consumer);
protected abstract Task Execute(IModel model, IAsyncBasicConsumer consumer);
}
}