diff --git a/src/Horarium.InMemory/InMemoryRepository.cs b/src/Horarium.InMemory/InMemoryRepository.cs index 639e239..0374571 100644 --- a/src/Horarium.InMemory/InMemoryRepository.cs +++ b/src/Horarium.InMemory/InMemoryRepository.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; +using Horarium.Builders; using Horarium.Repository; namespace Horarium.InMemory @@ -52,7 +53,7 @@ public Task FailedJob(string jobId, Exception error) _storage.Remove(job); job.Status = JobStatus.Failed; - job.Error = error.Message + error.StackTrace; + job.Error = error.Message + ' ' + error.StackTrace; _storage.Add(job); }); @@ -74,7 +75,7 @@ public Task RepeatJob(string jobId, DateTime startAt, Exception error) job.Status = JobStatus.RepeatJob; job.StartAt = startAt; - job.Error = error.Message + error.StackTrace; + job.Error = error.Message + ' ' + error.StackTrace; _storage.Add(job); }); @@ -114,5 +115,39 @@ public Task> GetJobStatistic() { return Task.FromResult(_storage.GetStatistics()); } + + public Task RescheduleRecurrentJob(string jobId, DateTime startAt, Exception error) + { + return _processor.Execute(() => + { + var completedJob = _storage.GetById(jobId); + if (completedJob == null) return; + + _storage.Remove(completedJob); + + if (error == null) + { + completedJob.Status = JobStatus.Ready; + completedJob.StartAt = startAt; + + _storage.Add(completedJob); + + return; + } + + completedJob.Status = JobStatus.Failed; + completedJob.Error = error.Message + ' ' + error.StackTrace; + + _storage.Add(completedJob); + + var newJob = completedJob.Copy(); + + newJob.JobId = JobBuilderHelpers.GenerateNewJobId(); + newJob.Status = JobStatus.Ready; + newJob.StartAt = startAt; + + _storage.Add(newJob); + }); + } } } \ No newline at end of file diff --git a/src/Horarium.Mongo/MongoRepository.cs b/src/Horarium.Mongo/MongoRepository.cs index c646fa1..8d69932 100644 --- a/src/Horarium.Mongo/MongoRepository.cs +++ b/src/Horarium.Mongo/MongoRepository.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Reflection; using System.Threading.Tasks; +using Horarium.Builders; using Horarium.Repository; using MongoDB.Driver; @@ -27,7 +28,7 @@ public async Task GetReadyJob(string machineName, TimeSpan obsoleteTime) var filter = Builders.Filter.Where(x => (x.Status == JobStatus.Ready || x.Status == JobStatus.RepeatJob) && x.StartAt < DateTime.UtcNow || x.Status == JobStatus.Executing && x.StartedExecuting < DateTime.UtcNow - obsoleteTime); - + var update = Builders.Update .Set(x => x.Status, JobStatus.Executing) .Set(x => x.ExecutedMachine, machineName) @@ -36,7 +37,7 @@ public async Task GetReadyJob(string machineName, TimeSpan obsoleteTime) var options = new FindOneAndUpdateOptions {ReturnDocument = ReturnDocument.After}; - var result = await collection.FindOneAndUpdateAsync(filter, update, options); + var result = await collection.FindOneAndUpdateAsync(filter, update, options); return result?.ToJobDb(); } @@ -55,9 +56,8 @@ public async Task AddRecurrentJob(JobDb job) .Set(x => x.Cron, job.Cron) .Set(x => x.StartAt, job.StartAt); - var needsProperties = - _jobDbProperties.Where(x => x.Name != nameof(JobMongoModel.Cron) - && x.Name != nameof(JobMongoModel.StartAt)); + var needsProperties = _jobDbProperties.Where(x => + x.Name != nameof(JobMongoModel.Cron) && x.Name != nameof(JobMongoModel.StartAt)); //Если джоб уже существет апдейтем только 2 поля //Если нужно создать, то устанавливаем все остальные поля @@ -66,8 +66,8 @@ public async Task AddRecurrentJob(JobDb job) update = update.SetOnInsert(jobDbProperty.Name, jobDbProperty.GetValue(job)); } - await IMongoCollectionExtensions.UpdateOneAsync(collection, x => x.JobKey == job.JobKey - && (x.Status == JobStatus.Executing || x.Status == JobStatus.Ready), + await collection.UpdateOneAsync( + x => x.JobKey == job.JobKey && (x.Status == JobStatus.Executing || x.Status == JobStatus.Ready), update, new UpdateOptions { @@ -79,7 +79,8 @@ public async Task AddRecurrentJobSettings(RecurrentJobSettings settings) { var collection = _mongoClientProvider.GetCollection(); - await collection.ReplaceOneAsync(x => x.JobKey == settings.JobKey, + await collection.ReplaceOneAsync( + x => x.JobKey == settings.JobKey, RecurrentJobSettingsMongo.Create(settings), new UpdateOptions { @@ -108,6 +109,37 @@ public async Task RemoveJob(string jobId) await collection.DeleteOneAsync(x => x.JobId == jobId); } + public async Task RescheduleRecurrentJob(string jobId, DateTime startAt, Exception error) + { + var collection = _mongoClientProvider.GetCollection(); + + JobMongoModel failedJob = null; + + if (error != null) + { + failedJob = await collection + .Find(Builders.Filter.Where(x => x.JobId == jobId)) + .FirstOrDefaultAsync(); + } + + await collection.UpdateOneAsync( + x => x.JobId == jobId, + Builders.Update + .Set(x => x.StartAt, startAt) + .Set(x => x.Status, JobStatus.Ready)); + + if (error == null) + { + return; + } + + failedJob.JobId = JobBuilderHelpers.GenerateNewJobId(); + failedJob.Status = JobStatus.Failed; + failedJob.Error = error.Message + ' ' + error.StackTrace; + + await collection.InsertOneAsync(failedJob); + } + public async Task RepeatJob(string jobId, DateTime startAt, Exception error) { var collection = _mongoClientProvider.GetCollection(); @@ -115,7 +147,7 @@ public async Task RepeatJob(string jobId, DateTime startAt, Exception error) var update = Builders.Update .Set(x => x.Status, JobStatus.RepeatJob) .Set(x => x.StartAt, startAt) - .Set(x => x.Error, error.Message + error.StackTrace); + .Set(x => x.Error, error.Message + ' ' + error.StackTrace); await collection.UpdateOneAsync(x => x.JobId == jobId, update); } @@ -126,7 +158,7 @@ public async Task FailedJob(string jobId, Exception error) var update = Builders.Update .Set(x => x.Status, JobStatus.Failed) - .Set(x => x.Error, error.Message + error.StackTrace); + .Set(x => x.Error, error.Message + ' ' + error.StackTrace); await collection.UpdateOneAsync(x => x.JobId == jobId, update); } diff --git a/src/Horarium.Test/ExecutorJobTest.cs b/src/Horarium.Test/ExecutorJobTest.cs index 346be21..b73e26e 100644 --- a/src/Horarium.Test/ExecutorJobTest.cs +++ b/src/Horarium.Test/ExecutorJobTest.cs @@ -151,7 +151,7 @@ await executorJob.Execute(new JobMetadata } [Fact] - public async Task RecurrentJob_DeleteAfterRun() + public async Task RecurrentJob_RescheduleAfterRun() { var jobRepositoryMock = new Mock(); var (jobScopeFactoryMock, jobScopeMock) = CreateScopeMock(); @@ -179,7 +179,7 @@ await executorJob.Execute(new JobMetadata() Cron = cron }); - jobRepositoryMock.Verify(x => x.RemoveJob(It.IsAny())); + jobRepositoryMock.Verify(x => x.RescheduleRecurrentJob(It.IsAny(), It.IsAny(), null)); } [Fact] diff --git a/src/Horarium/Builders/JobBuilderHelpers.cs b/src/Horarium/Builders/JobBuilderHelpers.cs index f303665..4ee6ee4 100644 --- a/src/Horarium/Builders/JobBuilderHelpers.cs +++ b/src/Horarium/Builders/JobBuilderHelpers.cs @@ -17,6 +17,8 @@ public static JobMetadata GenerateNewJob(Type jobType) }; } + public static string GenerateNewJobId() => Guid.NewGuid().ToString("N"); + public static JobMetadata BuildJobsSequence(Queue jobsQueue, TimeSpan globalObsoleteInterval) { var job = jobsQueue.Dequeue(); diff --git a/src/Horarium/Builders/Recurrent/RecurrentJobBuilder.cs b/src/Horarium/Builders/Recurrent/RecurrentJobBuilder.cs index 6a3529b..27ddd13 100644 --- a/src/Horarium/Builders/Recurrent/RecurrentJobBuilder.cs +++ b/src/Horarium/Builders/Recurrent/RecurrentJobBuilder.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using Cronos; using Horarium.Interfaces; namespace Horarium.Builders.Recurrent @@ -26,7 +25,7 @@ public IRecurrentJobBuilder WithKey(string jobKey) public override Task Schedule() { - var nextOccurence = ParseAndGetNextOccurrence(Job.Cron); + var nextOccurence = Utils.ParseAndGetNextOccurrence(Job.Cron); if (!nextOccurence.HasValue) { @@ -38,12 +37,5 @@ public override Task Schedule() return _adderJobs.AddRecurrentJob(Job); } - - private static DateTime? ParseAndGetNextOccurrence(string cron) - { - var expression = CronExpression.Parse(cron, CronFormat.IncludeSeconds); - - return expression.GetNextOccurrence(DateTime.UtcNow, TimeZoneInfo.Local); - } } } \ No newline at end of file diff --git a/src/Horarium/Handlers/ExecutorJob.cs b/src/Horarium/Handlers/ExecutorJob.cs index bab3fc1..2357de3 100644 --- a/src/Horarium/Handlers/ExecutorJob.cs +++ b/src/Horarium/Handlers/ExecutorJob.cs @@ -2,11 +2,8 @@ using System.Linq; using System.Reflection; using System.Threading.Tasks; - using Horarium.Interfaces; using Horarium.Repository; -using Newtonsoft.Json; -using Horarium.Builders.Recurrent; using Horarium.Fallbacks; namespace Horarium.Handlers @@ -14,7 +11,6 @@ namespace Horarium.Handlers public class ExecutorJob : IExecutorJob { private readonly IJobRepository _jobRepository; - private readonly IAdderJobs _adderJobs; private readonly HorariumSettings _settings; public ExecutorJob( @@ -23,7 +19,6 @@ public ExecutorJob( HorariumSettings settings) { _jobRepository = jobRepository; - _adderJobs = adderJobs; _settings = settings; } @@ -99,18 +94,14 @@ private async Task ExecuteJobRecurrent(JobMetadata jobMetadata) _settings.Logger.Debug("jobMetadata excecuted"); - await _jobRepository.RemoveJob(jobMetadata.JobId); + await ScheduleNextRecurrentIfPossible(jobMetadata, null); _settings.Logger.Debug("jobMetadata saved success"); } } catch (Exception ex) { - await _jobRepository.FailedJob(jobMetadata.JobId, ex); - } - finally - { - await ScheduleRecurrentNextTime(jobMetadata); + await ScheduleNextRecurrentIfPossible(jobMetadata, ex); } } @@ -163,13 +154,19 @@ private DateTime GetNextStartFailedJobTime(JobMetadata jobMetadata) return DateTime.UtcNow + strategy.GetNextStartInterval(jobMetadata.CountStarted); } - private async Task ScheduleRecurrentNextTime(JobMetadata metadata) + private async Task ScheduleNextRecurrentIfPossible(JobMetadata metadata, Exception error) { - var cron = await _jobRepository.GetCronForRecurrentJob(metadata.JobKey); + var newStartAt = Utils.ParseAndGetNextOccurrence(metadata.Cron); - await new RecurrentJobBuilder(_adderJobs, cron, metadata.JobType, metadata.ObsoleteInterval) - .WithKey(metadata.JobKey) - .Schedule(); + if (newStartAt.HasValue) + { + await _jobRepository.RescheduleRecurrentJob(metadata.JobId, + newStartAt.Value, error); + } + else + { + await _jobRepository.RemoveJob(metadata.JobId); + } } private async Task ScheduleNextJobIfExists(JobMetadata metadata) @@ -209,8 +206,6 @@ private Task HandleFallbackStrategy(JobMetadata metadata) return ScheduleNextJobIfExists(metadata); case FallbackStrategyTypeEnum.ScheduleFallbackJob: return ScheduleFallbackJobIfExists(metadata); - case null: - case FallbackStrategyTypeEnum.StopExecution: default: return Task.CompletedTask; } diff --git a/src/Horarium/Repository/IJobRepository.cs b/src/Horarium/Repository/IJobRepository.cs index 0597c85..8a6adb2 100644 --- a/src/Horarium/Repository/IJobRepository.cs +++ b/src/Horarium/Repository/IJobRepository.cs @@ -23,5 +23,7 @@ public interface IJobRepository Task GetCronForRecurrentJob(string jobKey); Task> GetJobStatistic(); + + Task RescheduleRecurrentJob(string jobId, DateTime startAt, Exception error); } } \ No newline at end of file diff --git a/src/Horarium/Utils.cs b/src/Horarium/Utils.cs index b5a7205..f376351 100644 --- a/src/Horarium/Utils.cs +++ b/src/Horarium/Utils.cs @@ -1,5 +1,6 @@ using System; using System.Reflection; +using Cronos; using Newtonsoft.Json; namespace Horarium @@ -21,5 +22,12 @@ public static string AssemblyQualifiedNameWithoutVersion(this Type type) string retValue = type.FullName + ", " + type.GetTypeInfo().Assembly.GetName().Name; return retValue; } + + public static DateTime? ParseAndGetNextOccurrence(string cron) + { + var expression = CronExpression.Parse(cron, CronFormat.IncludeSeconds); + + return expression.GetNextOccurrence(DateTime.UtcNow, TimeZoneInfo.Local); + } } } \ No newline at end of file