Skip to content

Commit

Permalink
Resolved #2540. Resolved #2539.
Browse files Browse the repository at this point in the history
* Send IServiceProvider to DoWork method of the periodic background worker #2539
* Introduce AsyncPeriodicBackgroundWorkerBase #2540
  • Loading branch information
hikalkan committed Jan 3, 2020
1 parent d815b9c commit 401376e
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;

namespace Volo.Abp.BackgroundJobs
{
Expand All @@ -21,7 +20,7 @@ public BackgroundJobExecuter(IOptions<AbpBackgroundJobOptions> options)
Logger = NullLogger<BackgroundJobExecuter>.Instance;
}

public virtual void Execute(JobExecutionContext context)
public virtual async Task ExecuteAsync(JobExecutionContext context)
{
var job = context.ServiceProvider.GetService(context.JobType);
if (job == null)
Expand All @@ -41,7 +40,7 @@ public virtual void Execute(JobExecutionContext context)
{
if (jobExecuteMethod.Name == nameof(IAsyncBackgroundJob<object>.ExecuteAsync))
{
AsyncHelper.RunSync(() => (Task) jobExecuteMethod.Invoke(job, new[] {context.JobArgs}));
await ((Task) jobExecuteMethod.Invoke(job, new[] {context.JobArgs})).ConfigureAwait(false);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace Volo.Abp.BackgroundJobs
using System.Threading.Tasks;

namespace Volo.Abp.BackgroundJobs
{
public interface IBackgroundJobExecuter
{
void Execute(JobExecutionContext context);
Task ExecuteAsync(JobExecutionContext context);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.Threading;

namespace Volo.Abp.BackgroundJobs.Hangfire
{
Expand All @@ -25,7 +26,7 @@ public void Execute(TArgs args)
{
var jobType = Options.GetJob(typeof(TArgs)).JobType;
var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
JobExecuter.Execute(context);
AsyncHelper.RunSync(() => JobExecuter.ExecuteAsync(context));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Volo.Abp.RabbitMQ;
using Volo.Abp.Threading;

namespace Volo.Abp.BackgroundJobs.RabbitMQ
{
Expand Down Expand Up @@ -181,7 +182,7 @@ protected virtual void MessageReceived(object sender, BasicDeliverEventArgs ea)

try
{
JobExecuter.Execute(context);
AsyncHelper.RunSync(() => JobExecuter.ExecuteAsync(context));
ChannelAccessor.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (BackgroundJobExecutionException)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -9,92 +10,88 @@

namespace Volo.Abp.BackgroundJobs
{
public class BackgroundJobWorker : PeriodicBackgroundWorkerBase, IBackgroundJobWorker
public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroundJobWorker
{
protected AbpBackgroundJobOptions JobOptions { get; }

protected AbpBackgroundJobWorkerOptions WorkerOptions { get; }

protected IServiceScopeFactory ServiceScopeFactory { get; }

public BackgroundJobWorker(
AbpTimer timer,
IOptions<AbpBackgroundJobOptions> jobOptions,
IOptions<AbpBackgroundJobWorkerOptions> workerOptions,
IServiceScopeFactory serviceScopeFactory)
: base(timer)
: base(
timer,
serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
WorkerOptions = workerOptions.Value;
JobOptions = jobOptions.Value;
Timer.Period = WorkerOptions.JobPollPeriod;
}

protected override void DoWork()
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
using (var scope = ServiceScopeFactory.CreateScope())
{
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = AsyncHelper.RunSync(() => store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount));
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount).ConfigureAwait(false);

if (!waitingJobs.Any())
{
return;
}
if (!waitingJobs.Any())
{
return;
}

var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
var clock = workerContext.ServiceProvider.GetRequiredService<IClock>();
var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();

var jobExecuter = scope.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
var clock = scope.ServiceProvider.GetRequiredService<IClock>();
var serializer = scope.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();
foreach (var jobInfo in waitingJobs)
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;

foreach (var jobInfo in waitingJobs)
try
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(workerContext.ServiceProvider, jobConfiguration.JobType, jobArgs);

try
{
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);
await jobExecuter.ExecuteAsync(context).ConfigureAwait(false);

try
{
jobExecuter.Execute(context);
await store.DeleteAsync(jobInfo.Id).ConfigureAwait(false);
}
catch (BackgroundJobExecutionException)
{
var nextTryTime = CalculateNextTryTime(jobInfo, clock);

AsyncHelper.RunSync(() => store.DeleteAsync(jobInfo.Id));
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
catch (BackgroundJobExecutionException)
else
{
var nextTryTime = CalculateNextTryTime(jobInfo, clock);

if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
jobInfo.IsAbandoned = true;
}

TryUpdate(store, jobInfo);
jobInfo.IsAbandoned = true;
}

await TryUpdateAsync(store, jobInfo).ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
TryUpdate(store, jobInfo);
}
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
await TryUpdateAsync(store, jobInfo).ConfigureAwait(false);
}
}
}

protected virtual void TryUpdate(IBackgroundJobStore store, BackgroundJobInfo jobInfo)
protected virtual async Task TryUpdateAsync(IBackgroundJobStore store, BackgroundJobInfo jobInfo)
{
try
{
AsyncHelper.RunSync(() => store.UpdateAsync(jobInfo));
await store.UpdateAsync(jobInfo).ConfigureAwait(false);
}
catch (Exception updateEx)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.Threading;

namespace Volo.Abp.BackgroundWorkers
{
public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpTimer Timer { get; }

protected AsyncPeriodicBackgroundWorkerBase(
AbpTimer timer,
IServiceScopeFactory serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
Timer = timer;
Timer.Elapsed += Timer_Elapsed;
}

public override async Task StartAsync(CancellationToken cancellationToken = default)
{
await base.StartAsync(cancellationToken).ConfigureAwait(false);
Timer.Start(cancellationToken);
}

public override async Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken).ConfigureAwait(false);
}

private void Timer_Elapsed(object sender, System.EventArgs e)
{
try
{
using (var scope = ServiceScopeFactory.CreateScope())
{
AsyncHelper.RunSync(
() => DoWorkAsync(new PeriodicBackgroundWorkerContext(scope.ServiceProvider))
);
}
}
catch (Exception ex)
{
Logger.LogException(ex);
}
}

protected abstract Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace Volo.Abp.BackgroundWorkers
{
public class PeriodicBackgroundWorkerContext
{
public IServiceProvider ServiceProvider { get; }

public PeriodicBackgroundWorkerContext(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.Threading;

Expand All @@ -11,14 +12,14 @@ namespace Volo.Abp.BackgroundWorkers
/// </summary>
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected readonly AbpTimer Timer;
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpTimer Timer { get; }

/// <summary>
/// Initializes a new instance of the <see cref="PeriodicBackgroundWorkerBase"/> class.
/// </summary>
/// <param name="timer">A timer.</param>
protected PeriodicBackgroundWorkerBase(AbpTimer timer)
protected PeriodicBackgroundWorkerBase(
AbpTimer timer,
IServiceScopeFactory serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
Timer = timer;
Timer.Elapsed += Timer_Elapsed;
}
Expand All @@ -34,12 +35,15 @@ public override async Task StopAsync(CancellationToken cancellationToken = defau
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken).ConfigureAwait(false);
}

private void Timer_Elapsed(object sender, System.EventArgs e)
{
try
{
DoWork();
using (var scope = ServiceScopeFactory.CreateScope())
{
DoWork(new PeriodicBackgroundWorkerContext(scope.ServiceProvider));
}
}
catch (Exception ex)
{
Expand All @@ -50,6 +54,6 @@ private void Timer_Elapsed(object sender, System.EventArgs e)
/// <summary>
/// Periodic works should be done by implementing this method.
/// </summary>
protected abstract void DoWork();
protected abstract void DoWork(PeriodicBackgroundWorkerContext workerContext);
}
}

0 comments on commit 401376e

Please sign in to comment.