diff --git a/src/scheduling/schedule_service.py b/src/scheduling/schedule_service.py index 3386f3b6..2a80b4fb 100644 --- a/src/scheduling/schedule_service.py +++ b/src/scheduling/schedule_service.py @@ -28,23 +28,25 @@ def restore_jobs(schedules_folder): files = [file for file in os.listdir(schedules_folder) if file.endswith('.json')] + files.sort() - job_dict = {} + job_path_dict = {} ids = [] # list of ALL ids, including broken configs for file in files: try: - content = file_utils.read_file(os.path.join(schedules_folder, file)) + job_path = os.path.join(schedules_folder, file) + content = file_utils.read_file(job_path) job_json = custom_json.loads(content) ids.append(job_json['id']) job = scheduling_job.from_dict(job_json) - job_dict[job.id] = job + job_path_dict[job_path] = job except: LOGGER.exception('Failed to parse schedule file: ' + file) - return job_dict, ids + return job_path_dict, ids class ScheduleService: @@ -60,15 +62,14 @@ def __init__(self, self._execution_service = execution_service (jobs, ids) = restore_jobs(self._schedules_folder) - self._scheduled_executions = jobs self._id_generator = IdGenerator(ids) self.stopped = False self.scheduler = sched.scheduler(timefunc=time.time) self._start_scheduler() - for job in jobs.values(): - self.schedule_job(job) + for job_path, job in jobs.items(): + self.schedule_job(job, job_path) def create_job(self, script_name, parameter_values, incoming_schedule_config, user: User): if user is None: @@ -87,9 +88,9 @@ def create_job(self, script_name, parameter_values, incoming_schedule_config, us normalized_values = dict(config_model.parameter_values) job = SchedulingJob(id, user, schedule_config, script_name, normalized_values) - self.save_job(job) + job_path = self.save_job(job) - self.schedule_job(job) + self.schedule_job(job, job_path) return id @@ -103,7 +104,7 @@ def validate_script_config(config_model): raise UnavailableScriptException( 'Script contains secure parameters (' + parameter.str_name() + '), this is not supported') - def schedule_job(self, job: SchedulingJob): + def schedule_job(self, job: SchedulingJob, job_path): schedule = job.schedule if not schedule.repeatable and date_utils.is_past(schedule.start_datetime): @@ -113,11 +114,15 @@ def schedule_job(self, job: SchedulingJob): LOGGER.info( 'Scheduling ' + job.get_log_name() + ' at ' + next_datetime.astimezone(tz=None).strftime('%H:%M, %d %B %Y')) - self.scheduler.enterabs(next_datetime.timestamp(), 1, self._execute_job, (job,)) + self.scheduler.enterabs(next_datetime.timestamp(), 1, self._execute_job, (job, job_path)) - def _execute_job(self, job: SchedulingJob): + def _execute_job(self, job: SchedulingJob, job_path): LOGGER.info('Executing ' + job.get_log_name()) + if not os.path.exists(job_path): + LOGGER.info(job.get_log_name() + ' was removed, skipping execution') + return + script_name = job.script_name parameter_values = job.parameter_values user = job.user @@ -137,17 +142,20 @@ def cleanup(): except: LOGGER.exception('Failed to execute ' + job.get_log_name()) - self.schedule_job(job) + self.schedule_job(job, job_path) def save_job(self, job: SchedulingJob): user = job.user script_name = job.script_name filename = file_utils.to_filename('%s_%s_%s.json' % (script_name, user.get_audit_name(), job.id)) + path = os.path.join(self._schedules_folder, filename) file_utils.write_file( - os.path.join(self._schedules_folder, filename), + path, json.dumps(job.as_serializable_dict(), indent=2)) + return path + def _start_scheduler(self): def scheduler_loop(): while not self.stopped: diff --git a/src/tests/scheduling/schedule_service_test.py b/src/tests/scheduling/schedule_service_test.py index 970c4ab0..844acb01 100644 --- a/src/tests/scheduling/schedule_service_test.py +++ b/src/tests/scheduling/schedule_service_test.py @@ -21,25 +21,27 @@ class ScheduleServiceTestCase(TestCase): - def assert_schedule_calls(self, expected_job_time_pairs): - self.assertEqual(len(expected_job_time_pairs), len(self.scheduler_mock.enterabs.call_args_list)) + def assert_schedule_calls(self, expected_job_path_time_tuples): + self.assertEqual(len(expected_job_path_time_tuples), len(self.scheduler_mock.enterabs.call_args_list)) - for i, pair in enumerate(expected_job_time_pairs): - expected_time = date_utils.sec_to_datetime(pair[1]) - expected_job = pair[0] + for i, expected_tuple in enumerate(expected_job_path_time_tuples): + expected_job = expected_tuple[0] + expected_job_path = expected_tuple[1] + expected_time = date_utils.sec_to_datetime(expected_tuple[2]) # the first item of call_args is actual arguments, passed to the method args = self.scheduler_mock.enterabs.call_args_list[i][0] - # we schedule job as enterabs(expected_time, priority, self._execute_job, (job,)) - # to get the job, we need to get the last arg, and extract the first parameter from it + # we schedule job as enterabs(expected_time, priority, self._execute_job, (job, job_path)) schedule_method_args_tuple = args[3] - schedule_method_job_arg = schedule_method_args_tuple[0] + actual_job_arg = schedule_method_args_tuple[0] + actual_job_path_arg = schedule_method_args_tuple[1] actual_time = date_utils.sec_to_datetime(args[0]) + self.assertEqual(expected_job_path, actual_job_path_arg) self.assertEqual(expected_time, actual_time) self.assertDictEqual(expected_job.as_serializable_dict(), - schedule_method_job_arg.as_serializable_dict()) + actual_job_arg.as_serializable_dict()) def mock_schedule_model_with_secure_param(self): self.create_config('secure-config', parameters=[ @@ -173,13 +175,13 @@ def test_create_job_verify_scheduler_call_when_one_time(self): job_prototype = create_job(id='1', repeatable=False, start_datetime=mocked_now + timedelta(seconds=97)) self.call_create_job(job_prototype) - self.assert_schedule_calls([(job_prototype, mocked_now_epoch + 97)]) + self.assert_schedule_calls([(job_prototype, get_job_path(job_prototype), mocked_now_epoch + 97)]) def test_create_job_verify_timer_call_when_repeatable(self): job_prototype = create_job(id='1', repeatable=True, start_datetime=mocked_now - timedelta(seconds=97)) self.call_create_job(job_prototype) - self.assert_schedule_calls([(job_prototype, mocked_now_epoch + 1468703)]) + self.assert_schedule_calls([(job_prototype, get_job_path(job_prototype), mocked_now_epoch + 1468703)]) def call_create_job(self, job: SchedulingJob): return self.schedule_service.create_job( @@ -204,61 +206,67 @@ def verify_config_files(self, expected_jobs: Sequence[SchedulingJob]): class TestScheduleServiceInit(ScheduleServiceTestCase): def test_no_config_folder(self): - test_utils.cleanup() - schedule_service = ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder) - self.assertEqual(schedule_service._scheduled_executions, {}) + self.assertEqual('1', schedule_service._id_generator.next_id()) + self.assert_schedule_calls([]) def test_restore_multiple_configs(self): - job1 = create_job(id='11') - job2 = create_job(id=9) - job3 = create_job(id=3) - self.save_job(job1) - self.save_job(job2) - self.save_job(job3) + job1 = create_job(id='11', start_datetime=mocked_now + timedelta(seconds=10), repeatable=False) + job2 = create_job(id=9, start_datetime=mocked_now + timedelta(seconds=20), repeatable=False) + job3 = create_job(id=3, start_datetime=mocked_now + timedelta(seconds=30), repeatable=False) + job1_path = save_job(job1) + job2_path = save_job(job2) + job3_path = save_job(job3) schedule_service = ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder) - self.assertSetEqual({'3', '9', '11'}, set(schedule_service._scheduled_executions.keys())) self.assertEqual('12', schedule_service._id_generator.next_id()) + self.assert_schedule_calls([ + # alphabetical order, ids 11 -> 3 -> 9; + (job1, job1_path, mocked_now_epoch + 10), + (job3, job3_path, mocked_now_epoch + 30), + (job2, job2_path, mocked_now_epoch + 20) + ]) def test_restore_configs_when_one_corrupted(self): job1 = create_job(id='11', repeatable=None) - job2 = create_job(id=3) - self.save_job(job1) - self.save_job(job2) + job2 = create_job(id=3, repeatable=False, start_datetime=mocked_now + timedelta(seconds=10)) + save_job(job1) + job2_path = save_job(job2) schedule_service = ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder) - self.assertSetEqual({'3'}, set(schedule_service._scheduled_executions.keys())) self.assertEqual('12', schedule_service._id_generator.next_id()) + self.assert_schedule_calls([ + (job2, job2_path, mocked_now_epoch + 10) + ]) def test_schedule_on_restore_when_one_time(self): job = create_job(id=3, repeatable=False, start_datetime=mocked_now + timedelta(minutes=3)) - self.save_job(job) + job_path = save_job(job) ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder) - self.assert_schedule_calls([(job, mocked_now_epoch + 180)]) + self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 180)]) def test_schedule_on_restore_when_one_time_in_past(self): job = create_job(id=3, repeatable=False, start_datetime=mocked_now - timedelta(seconds=1)) - self.save_job(job) + save_job(job) ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder) self.assert_schedule_calls([]) def test_schedule_on_restore_when_repeatable_in_future(self): job = create_job(id=3, repeatable=True, start_datetime=mocked_now + timedelta(hours=3)) - self.save_job(job) + job_path = save_job(job) ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder) - self.assert_schedule_calls([(job, mocked_now_epoch + 1479600)]) + self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 1479600)]) def test_schedule_on_restore_when_repeatable_in_past(self): job = create_job(id=3, repeatable=True, start_datetime=mocked_now + timedelta(days=2)) - self.save_job(job) + job_path = save_job(job) ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder) - self.assert_schedule_calls([(job, mocked_now_epoch + 1468800)]) + self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 1468800)]) def test_scheduler_runner(self): original_runs_count = self.scheduler_mock.run.call_count @@ -281,18 +289,13 @@ def test_scheduler_runner_when_stopped(self): final_runs_count = self.scheduler_mock.run.call_count self.assertEqual(final_runs_count, original_runs_count) - def save_job(self, job): - schedules_dir = os.path.join(test_utils.temp_folder, 'schedules') - path = os.path.join(schedules_dir, get_job_filename(job)) - content = json.dumps(job.as_serializable_dict()) - file_utils.write_file(path, content) - class TestScheduleServiceExecuteJob(ScheduleServiceTestCase): def test_execute_simple_job(self): job = create_job(id=1, repeatable=False, start_datetime=mocked_now - timedelta(seconds=1)) + job_path = save_job(job) - self.schedule_service._execute_job(job) + self.schedule_service._execute_job(job, job_path) self.execution_service.start_script.assert_called_once_with( ANY, job.parameter_values, job.user) @@ -305,13 +308,14 @@ def test_execute_repeatable_job(self): start_datetime=mocked_now - timedelta(seconds=1), repeat_unit='days', repeat_period=1) + job_path = save_job(job) - self.schedule_service._execute_job(job) + self.schedule_service._execute_job(job, job_path) self.execution_service.start_script.assert_called_once_with( ANY, job.parameter_values, job.user) self.execution_service.add_finish_listener.assert_not_called() - self.assert_schedule_calls([(job, mocked_now_epoch + 86399)]) + self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 86399)]) def test_execute_when_fails(self): job = create_job(id=1, @@ -319,11 +323,12 @@ def test_execute_when_fails(self): start_datetime=mocked_now - timedelta(seconds=1), repeat_unit='days', repeat_period=1) + job_path = save_job(job) self.execution_service.start_script.side_effect = Exception('Test exception') - self.schedule_service._execute_job(job) + self.schedule_service._execute_job(job, job_path) - self.assert_schedule_calls([(job, mocked_now_epoch + 86399)]) + self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 86399)]) def test_execute_when_not_schedulable(self): job = create_job(id=1, @@ -332,11 +337,12 @@ def test_execute_when_not_schedulable(self): start_datetime=mocked_now - timedelta(seconds=1), repeat_unit='days', repeat_period=1) + job_path = save_job(job) - self.schedule_service._execute_job(job) + self.schedule_service._execute_job(job, job_path) self.execution_service.start_script.assert_not_called() - self.assert_schedule_calls([(job, mocked_now_epoch + 86399)]) + self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 86399)]) def test_execute_when_has_secure_parameters(self): job = create_job(id=1, @@ -345,13 +351,29 @@ def test_execute_when_has_secure_parameters(self): start_datetime=mocked_now - timedelta(seconds=1), repeat_unit='days', repeat_period=1) + job_path = save_job(job) self.mock_schedule_model_with_secure_param() - self.schedule_service._execute_job(job) + self.schedule_service._execute_job(job, job_path) + + self.execution_service.start_script.assert_not_called() + self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 86399)]) + + def test_execute_when_deleted(self): + job = create_job(id=1, + script_name='secure-config', + repeatable=True, + start_datetime=mocked_now - timedelta(seconds=1), + repeat_unit='days', + repeat_period=1) + job_path = save_job(job) + os.remove(job_path) + + self.schedule_service._execute_job(job, job_path) self.execution_service.start_script.assert_not_called() - self.assert_schedule_calls([(job, mocked_now_epoch + 86399)]) + self.assert_schedule_calls([]) def test_cleanup_execution(self): self.create_config('script_with_cleanup', auto_cleanup=True) @@ -359,6 +381,7 @@ def test_cleanup_execution(self): script_name='script_with_cleanup', repeatable=False, start_datetime=mocked_now - timedelta(seconds=1)) + job_path = save_job(job) finish_callback = None @@ -369,7 +392,7 @@ def add_finish_listener(callback_param, execution_id): self.execution_service.add_finish_listener.side_effect = add_finish_listener - self.schedule_service._execute_job(job) + self.schedule_service._execute_job(job, job_path) self.execution_service.start_script.assert_called_once_with( ANY, job.parameter_values, job.user) @@ -416,3 +439,17 @@ def create_job(id=None, def get_job_filename(job): return job.script_name + '_' + job.user.get_audit_name() + '_' + str(job.id) + '.json' + + +def save_job(job): + path = get_job_path(job) + content = json.dumps(job.as_serializable_dict()) + file_utils.write_file(path, content) + + return path + + +def get_job_path(job): + schedules_dir = os.path.join(test_utils.temp_folder, 'schedules') + path = os.path.join(schedules_dir, get_job_filename(job)) + return path