Skip to content

Commit

Permalink
Fixed #1464: Add sync methods to IBackgroundJobStore
Browse files Browse the repository at this point in the history
hikalkan committed Jul 9, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent e026dfd commit 4ce0381
Showing 9 changed files with 140 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -39,9 +39,9 @@ public override async Task InterceptAsync(IAbpMethodInvocation invocation)
await invocation.ProceedAsync();
}

protected virtual Task AuthorizeAsync(IAbpMethodInvocation invocation)
protected virtual async Task AuthorizeAsync(IAbpMethodInvocation invocation)
{
return _methodInvocationAuthorizationService.CheckAsync(
await _methodInvocationAuthorizationService.CheckAsync(
new MethodInvocationAuthorizationContext(
invocation.Method
)
Original file line number Diff line number Diff line change
@@ -36,9 +36,7 @@ protected override void DoWork()
{
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = AsyncHelper.RunSync(
() => store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount)
);
var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);

if (!waitingJobs.Any())
{
@@ -64,7 +62,7 @@ protected override void DoWork()
{
jobExecuter.Execute(context);

AsyncHelper.RunSync(() => store.DeleteAsync(jobInfo.Id));
store.Delete(jobInfo.Id);
}
catch (BackgroundJobExecutionException)
{
@@ -96,7 +94,7 @@ protected virtual void TryUpdate(IBackgroundJobStore store, BackgroundJobInfo jo
{
try
{
store.UpdateAsync(jobInfo);
store.Update(jobInfo);
}
catch (Exception updateEx)
{
@@ -107,9 +105,8 @@ protected virtual void TryUpdate(IBackgroundJobStore store, BackgroundJobInfo jo
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1));
var nextTryDate = jobInfo.LastTryTime.HasValue
? jobInfo.LastTryTime.Value.AddSeconds(nextWaitDuration)
: clock.Now.AddSeconds(nextWaitDuration);
var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
clock.Now.AddSeconds(nextWaitDuration);

if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
{
Original file line number Diff line number Diff line change
@@ -9,19 +9,41 @@ namespace Volo.Abp.BackgroundJobs
/// </summary>
public interface IBackgroundJobStore
{
/// <summary>
/// Gets a BackgroundJobInfo based on the given jobId.
/// </summary>
/// <param name="jobId">The Job Unique Identifier.</param>
/// <returns>The BackgroundJobInfo object.</returns>
BackgroundJobInfo Find(Guid jobId);

/// <summary>
/// Gets a BackgroundJobInfo based on the given jobId.
/// </summary>
/// <param name="jobId">The Job Unique Identifier.</param>
/// <returns>The BackgroundJobInfo object.</returns>
Task<BackgroundJobInfo> FindAsync(Guid jobId);

/// <summary>
/// Inserts a background job.
/// </summary>
/// <param name="jobInfo">Job information.</param>
void Insert(BackgroundJobInfo jobInfo);

/// <summary>
/// Inserts a background job.
/// </summary>
/// <param name="jobInfo">Job information.</param>
Task InsertAsync(BackgroundJobInfo jobInfo);

/// <summary>
/// Gets waiting jobs. It should get jobs based on these:
/// Conditions: !IsAbandoned And NextTryTime &lt;= Clock.Now.
/// Order by: Priority DESC, TryCount ASC, NextTryTime ASC.
/// Maximum result: <paramref name="maxResultCount"/>.
/// </summary>
/// <param name="maxResultCount">Maximum result count.</param>
List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount);

/// <summary>
/// Gets waiting jobs. It should get jobs based on these:
/// Conditions: !IsAbandoned And NextTryTime &lt;= Clock.Now.
@@ -31,12 +53,24 @@ public interface IBackgroundJobStore
/// <param name="maxResultCount">Maximum result count.</param>
Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount);

/// <summary>
/// Deletes a job.
/// </summary>
/// <param name="jobId">The Job Unique Identifier.</param>
void Delete(Guid jobId);

/// <summary>
/// Deletes a job.
/// </summary>
/// <param name="jobId">The Job Unique Identifier.</param>
Task DeleteAsync(Guid jobId);

/// <summary>
/// Updates a job.
/// </summary>
/// <param name="jobInfo">Job information.</param>
void Update(BackgroundJobInfo jobInfo);

/// <summary>
/// Updates a job.
/// </summary>
Original file line number Diff line number Diff line change
@@ -23,18 +23,39 @@ public InMemoryBackgroundJobStore(IClock clock)
_jobs = new ConcurrentDictionary<Guid, BackgroundJobInfo>();
}

public BackgroundJobInfo Find(Guid jobId)
{
return _jobs.GetOrDefault(jobId);
}

public virtual Task<BackgroundJobInfo> FindAsync(Guid jobId)
{
return Task.FromResult(_jobs.GetOrDefault(jobId));
}


public void Insert(BackgroundJobInfo jobInfo)
{
_jobs[jobInfo.Id] = jobInfo;
}

public virtual Task InsertAsync(BackgroundJobInfo jobInfo)
{
_jobs[jobInfo.Id] = jobInfo;

return Task.FromResult(0);
}

public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
return _jobs.Values
.Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToList();
}

public virtual Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount)
{
var waitingJobs = _jobs.Values
@@ -48,13 +69,26 @@ public virtual Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCo
return Task.FromResult(waitingJobs);
}

public void Delete(Guid jobId)
{
_jobs.TryRemove(jobId, out _);
}

public virtual Task DeleteAsync(Guid jobId)
{
_jobs.TryRemove(jobId, out _);

return Task.FromResult(0);
}

public void Update(BackgroundJobInfo jobInfo)
{
if (jobInfo.IsAbandoned)
{
DeleteAsync(jobInfo.Id);
}
}

public virtual Task UpdateAsync(BackgroundJobInfo jobInfo)
{
if (jobInfo.IsAbandoned)
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Volo.Abp.Aspects;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;
Original file line number Diff line number Diff line change
@@ -19,32 +19,65 @@ public BackgroundJobStore(
BackgroundJobRepository = backgroundJobRepository;
}

public BackgroundJobInfo Find(Guid jobId)
{
return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
BackgroundJobRepository.Find(jobId)
);
}

public virtual async Task<BackgroundJobInfo> FindAsync(Guid jobId)
{
return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
await BackgroundJobRepository.FindAsync(jobId)
);
}

public void Insert(BackgroundJobInfo jobInfo)
{
BackgroundJobRepository.Insert(
ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
);
}

public virtual async Task InsertAsync(BackgroundJobInfo jobInfo)
{
await BackgroundJobRepository.InsertAsync(
ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
);
}

public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
return ObjectMapper.Map<List<BackgroundJobRecord>, List<BackgroundJobInfo>>(
BackgroundJobRepository.GetWaitingList(maxResultCount)
);
}

public virtual async Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount)
{
return ObjectMapper.Map<List<BackgroundJobRecord>, List<BackgroundJobInfo>>(
await BackgroundJobRepository.GetWaitingListAsync(maxResultCount)
);
}

public void Delete(Guid jobId)
{
BackgroundJobRepository.Delete(jobId);
}

public virtual async Task DeleteAsync(Guid jobId)
{
await BackgroundJobRepository.DeleteAsync(jobId);
}

public void Update(BackgroundJobInfo jobInfo)
{
BackgroundJobRepository.Update(
ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
);
}

public virtual async Task UpdateAsync(BackgroundJobInfo jobInfo)
{
await BackgroundJobRepository.UpdateAsync(
Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ namespace Volo.Abp.BackgroundJobs
{
public interface IBackgroundJobRepository : IBasicRepository<BackgroundJobRecord, Guid>
{
List<BackgroundJobRecord> GetWaitingList(int maxResultCount);

Task<List<BackgroundJobRecord>> GetWaitingListAsync(int maxResultCount);
}
}
Original file line number Diff line number Diff line change
@@ -21,16 +21,27 @@ public EfCoreBackgroundJobRepository(
Clock = clock;
}

public List<BackgroundJobRecord> GetWaitingList(int maxResultCount)
{
return GetWaitingListQuery(maxResultCount)
.ToList();
}

public async Task<List<BackgroundJobRecord>> GetWaitingListAsync(int maxResultCount)
{
return await GetWaitingListQuery(maxResultCount)
.ToListAsync();
}

private IQueryable<BackgroundJobRecord> GetWaitingListQuery(int maxResultCount)
{
var now = Clock.Now;
return await DbSet
return DbSet
.Where(t => !t.IsAbandoned && t.NextTryTime <= now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToListAsync();
.Take(maxResultCount);
}
}
}
Original file line number Diff line number Diff line change
@@ -21,16 +21,27 @@ public MongoBackgroundJobRepository(
Clock = clock;
}

public List<BackgroundJobRecord> GetWaitingList(int maxResultCount)
{
return GetWaitingListQuery(maxResultCount)
.ToList();
}

public async Task<List<BackgroundJobRecord>> GetWaitingListAsync(int maxResultCount)
{
return await GetWaitingListQuery(maxResultCount)
.ToListAsync();
}

private IMongoQueryable<BackgroundJobRecord> GetWaitingListQuery(int maxResultCount)
{
var now = Clock.Now;
return await GetMongoQueryable()
return GetMongoQueryable()
.Where(t => !t.IsAbandoned && t.NextTryTime <= now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToListAsync();
.Take(maxResultCount);
}
}
}

0 comments on commit 4ce0381

Please sign in to comment.