Skip to content

Commit

Permalink
#547 stopped executing already removed scheduled jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bugy committed Dec 4, 2022
1 parent 081b20f commit 61fa4dc
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 62 deletions.
36 changes: 22 additions & 14 deletions src/scheduling/schedule_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,25 @@

def restore_jobs(schedules_folder):
files = [file for file in os.listdir(schedules_folder) if file.endswith('.json')]
files.sort()

job_dict = {}
job_path_dict = {}
ids = [] # list of ALL ids, including broken configs

for file in files:
try:
content = file_utils.read_file(os.path.join(schedules_folder, file))
job_path = os.path.join(schedules_folder, file)
content = file_utils.read_file(job_path)
job_json = custom_json.loads(content)
ids.append(job_json['id'])

job = scheduling_job.from_dict(job_json)

job_dict[job.id] = job
job_path_dict[job_path] = job
except:
LOGGER.exception('Failed to parse schedule file: ' + file)

return job_dict, ids
return job_path_dict, ids


class ScheduleService:
Expand All @@ -60,15 +62,14 @@ def __init__(self,
self._execution_service = execution_service

(jobs, ids) = restore_jobs(self._schedules_folder)
self._scheduled_executions = jobs
self._id_generator = IdGenerator(ids)
self.stopped = False

self.scheduler = sched.scheduler(timefunc=time.time)
self._start_scheduler()

for job in jobs.values():
self.schedule_job(job)
for job_path, job in jobs.items():
self.schedule_job(job, job_path)

def create_job(self, script_name, parameter_values, incoming_schedule_config, user: User):
if user is None:
Expand All @@ -87,9 +88,9 @@ def create_job(self, script_name, parameter_values, incoming_schedule_config, us
normalized_values = dict(config_model.parameter_values)
job = SchedulingJob(id, user, schedule_config, script_name, normalized_values)

self.save_job(job)
job_path = self.save_job(job)

self.schedule_job(job)
self.schedule_job(job, job_path)

return id

Expand All @@ -103,7 +104,7 @@ def validate_script_config(config_model):
raise UnavailableScriptException(
'Script contains secure parameters (' + parameter.str_name() + '), this is not supported')

def schedule_job(self, job: SchedulingJob):
def schedule_job(self, job: SchedulingJob, job_path):
schedule = job.schedule

if not schedule.repeatable and date_utils.is_past(schedule.start_datetime):
Expand All @@ -113,11 +114,15 @@ def schedule_job(self, job: SchedulingJob):
LOGGER.info(
'Scheduling ' + job.get_log_name() + ' at ' + next_datetime.astimezone(tz=None).strftime('%H:%M, %d %B %Y'))

self.scheduler.enterabs(next_datetime.timestamp(), 1, self._execute_job, (job,))
self.scheduler.enterabs(next_datetime.timestamp(), 1, self._execute_job, (job, job_path))

def _execute_job(self, job: SchedulingJob):
def _execute_job(self, job: SchedulingJob, job_path):
LOGGER.info('Executing ' + job.get_log_name())

if not os.path.exists(job_path):
LOGGER.info(job.get_log_name() + ' was removed, skipping execution')
return

script_name = job.script_name
parameter_values = job.parameter_values
user = job.user
Expand All @@ -137,17 +142,20 @@ def cleanup():
except:
LOGGER.exception('Failed to execute ' + job.get_log_name())

self.schedule_job(job)
self.schedule_job(job, job_path)

def save_job(self, job: SchedulingJob):
user = job.user
script_name = job.script_name

filename = file_utils.to_filename('%s_%s_%s.json' % (script_name, user.get_audit_name(), job.id))
path = os.path.join(self._schedules_folder, filename)
file_utils.write_file(
os.path.join(self._schedules_folder, filename),
path,
json.dumps(job.as_serializable_dict(), indent=2))

return path

def _start_scheduler(self):
def scheduler_loop():
while not self.stopped:
Expand Down
133 changes: 85 additions & 48 deletions src/tests/scheduling/schedule_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,27 @@


class ScheduleServiceTestCase(TestCase):
def assert_schedule_calls(self, expected_job_time_pairs):
self.assertEqual(len(expected_job_time_pairs), len(self.scheduler_mock.enterabs.call_args_list))
def assert_schedule_calls(self, expected_job_path_time_tuples):
self.assertEqual(len(expected_job_path_time_tuples), len(self.scheduler_mock.enterabs.call_args_list))

for i, pair in enumerate(expected_job_time_pairs):
expected_time = date_utils.sec_to_datetime(pair[1])
expected_job = pair[0]
for i, expected_tuple in enumerate(expected_job_path_time_tuples):
expected_job = expected_tuple[0]
expected_job_path = expected_tuple[1]
expected_time = date_utils.sec_to_datetime(expected_tuple[2])

# the first item of call_args is actual arguments, passed to the method
args = self.scheduler_mock.enterabs.call_args_list[i][0]

# we schedule job as enterabs(expected_time, priority, self._execute_job, (job,))
# to get the job, we need to get the last arg, and extract the first parameter from it
# we schedule job as enterabs(expected_time, priority, self._execute_job, (job, job_path))
schedule_method_args_tuple = args[3]
schedule_method_job_arg = schedule_method_args_tuple[0]
actual_job_arg = schedule_method_args_tuple[0]
actual_job_path_arg = schedule_method_args_tuple[1]
actual_time = date_utils.sec_to_datetime(args[0])

self.assertEqual(expected_job_path, actual_job_path_arg)
self.assertEqual(expected_time, actual_time)
self.assertDictEqual(expected_job.as_serializable_dict(),
schedule_method_job_arg.as_serializable_dict())
actual_job_arg.as_serializable_dict())

def mock_schedule_model_with_secure_param(self):
self.create_config('secure-config', parameters=[
Expand Down Expand Up @@ -173,13 +175,13 @@ def test_create_job_verify_scheduler_call_when_one_time(self):
job_prototype = create_job(id='1', repeatable=False, start_datetime=mocked_now + timedelta(seconds=97))
self.call_create_job(job_prototype)

self.assert_schedule_calls([(job_prototype, mocked_now_epoch + 97)])
self.assert_schedule_calls([(job_prototype, get_job_path(job_prototype), mocked_now_epoch + 97)])

def test_create_job_verify_timer_call_when_repeatable(self):
job_prototype = create_job(id='1', repeatable=True, start_datetime=mocked_now - timedelta(seconds=97))
self.call_create_job(job_prototype)

self.assert_schedule_calls([(job_prototype, mocked_now_epoch + 1468703)])
self.assert_schedule_calls([(job_prototype, get_job_path(job_prototype), mocked_now_epoch + 1468703)])

def call_create_job(self, job: SchedulingJob):
return self.schedule_service.create_job(
Expand All @@ -204,61 +206,67 @@ def verify_config_files(self, expected_jobs: Sequence[SchedulingJob]):

class TestScheduleServiceInit(ScheduleServiceTestCase):
def test_no_config_folder(self):
test_utils.cleanup()

schedule_service = ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder)
self.assertEqual(schedule_service._scheduled_executions, {})

self.assertEqual('1', schedule_service._id_generator.next_id())
self.assert_schedule_calls([])

def test_restore_multiple_configs(self):
job1 = create_job(id='11')
job2 = create_job(id=9)
job3 = create_job(id=3)
self.save_job(job1)
self.save_job(job2)
self.save_job(job3)
job1 = create_job(id='11', start_datetime=mocked_now + timedelta(seconds=10), repeatable=False)
job2 = create_job(id=9, start_datetime=mocked_now + timedelta(seconds=20), repeatable=False)
job3 = create_job(id=3, start_datetime=mocked_now + timedelta(seconds=30), repeatable=False)
job1_path = save_job(job1)
job2_path = save_job(job2)
job3_path = save_job(job3)

schedule_service = ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder)
self.assertSetEqual({'3', '9', '11'}, set(schedule_service._scheduled_executions.keys()))
self.assertEqual('12', schedule_service._id_generator.next_id())
self.assert_schedule_calls([
# alphabetical order, ids 11 -> 3 -> 9;
(job1, job1_path, mocked_now_epoch + 10),
(job3, job3_path, mocked_now_epoch + 30),
(job2, job2_path, mocked_now_epoch + 20)
])

def test_restore_configs_when_one_corrupted(self):
job1 = create_job(id='11', repeatable=None)
job2 = create_job(id=3)
self.save_job(job1)
self.save_job(job2)
job2 = create_job(id=3, repeatable=False, start_datetime=mocked_now + timedelta(seconds=10))
save_job(job1)
job2_path = save_job(job2)

schedule_service = ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder)
self.assertSetEqual({'3'}, set(schedule_service._scheduled_executions.keys()))
self.assertEqual('12', schedule_service._id_generator.next_id())
self.assert_schedule_calls([
(job2, job2_path, mocked_now_epoch + 10)
])

def test_schedule_on_restore_when_one_time(self):
job = create_job(id=3, repeatable=False, start_datetime=mocked_now + timedelta(minutes=3))
self.save_job(job)
job_path = save_job(job)

ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder)
self.assert_schedule_calls([(job, mocked_now_epoch + 180)])
self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 180)])

def test_schedule_on_restore_when_one_time_in_past(self):
job = create_job(id=3, repeatable=False, start_datetime=mocked_now - timedelta(seconds=1))
self.save_job(job)
save_job(job)

ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder)
self.assert_schedule_calls([])

def test_schedule_on_restore_when_repeatable_in_future(self):
job = create_job(id=3, repeatable=True, start_datetime=mocked_now + timedelta(hours=3))
self.save_job(job)
job_path = save_job(job)

ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder)
self.assert_schedule_calls([(job, mocked_now_epoch + 1479600)])
self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 1479600)])

def test_schedule_on_restore_when_repeatable_in_past(self):
job = create_job(id=3, repeatable=True, start_datetime=mocked_now + timedelta(days=2))
self.save_job(job)
job_path = save_job(job)

ScheduleService(self.config_service, self.execution_service, test_utils.temp_folder)
self.assert_schedule_calls([(job, mocked_now_epoch + 1468800)])
self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 1468800)])

def test_scheduler_runner(self):
original_runs_count = self.scheduler_mock.run.call_count
Expand All @@ -281,18 +289,13 @@ def test_scheduler_runner_when_stopped(self):
final_runs_count = self.scheduler_mock.run.call_count
self.assertEqual(final_runs_count, original_runs_count)

def save_job(self, job):
schedules_dir = os.path.join(test_utils.temp_folder, 'schedules')
path = os.path.join(schedules_dir, get_job_filename(job))
content = json.dumps(job.as_serializable_dict())
file_utils.write_file(path, content)


class TestScheduleServiceExecuteJob(ScheduleServiceTestCase):
def test_execute_simple_job(self):
job = create_job(id=1, repeatable=False, start_datetime=mocked_now - timedelta(seconds=1))
job_path = save_job(job)

self.schedule_service._execute_job(job)
self.schedule_service._execute_job(job, job_path)

self.execution_service.start_script.assert_called_once_with(
ANY, job.parameter_values, job.user)
Expand All @@ -305,25 +308,27 @@ def test_execute_repeatable_job(self):
start_datetime=mocked_now - timedelta(seconds=1),
repeat_unit='days',
repeat_period=1)
job_path = save_job(job)

self.schedule_service._execute_job(job)
self.schedule_service._execute_job(job, job_path)

self.execution_service.start_script.assert_called_once_with(
ANY, job.parameter_values, job.user)
self.execution_service.add_finish_listener.assert_not_called()
self.assert_schedule_calls([(job, mocked_now_epoch + 86399)])
self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 86399)])

def test_execute_when_fails(self):
job = create_job(id=1,
repeatable=True,
start_datetime=mocked_now - timedelta(seconds=1),
repeat_unit='days',
repeat_period=1)
job_path = save_job(job)

self.execution_service.start_script.side_effect = Exception('Test exception')
self.schedule_service._execute_job(job)
self.schedule_service._execute_job(job, job_path)

self.assert_schedule_calls([(job, mocked_now_epoch + 86399)])
self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 86399)])

def test_execute_when_not_schedulable(self):
job = create_job(id=1,
Expand All @@ -332,11 +337,12 @@ def test_execute_when_not_schedulable(self):
start_datetime=mocked_now - timedelta(seconds=1),
repeat_unit='days',
repeat_period=1)
job_path = save_job(job)

self.schedule_service._execute_job(job)
self.schedule_service._execute_job(job, job_path)

self.execution_service.start_script.assert_not_called()
self.assert_schedule_calls([(job, mocked_now_epoch + 86399)])
self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 86399)])

def test_execute_when_has_secure_parameters(self):
job = create_job(id=1,
Expand All @@ -345,20 +351,37 @@ def test_execute_when_has_secure_parameters(self):
start_datetime=mocked_now - timedelta(seconds=1),
repeat_unit='days',
repeat_period=1)
job_path = save_job(job)

self.mock_schedule_model_with_secure_param()

self.schedule_service._execute_job(job)
self.schedule_service._execute_job(job, job_path)

self.execution_service.start_script.assert_not_called()
self.assert_schedule_calls([(job, job_path, mocked_now_epoch + 86399)])

def test_execute_when_deleted(self):
job = create_job(id=1,
script_name='secure-config',
repeatable=True,
start_datetime=mocked_now - timedelta(seconds=1),
repeat_unit='days',
repeat_period=1)
job_path = save_job(job)
os.remove(job_path)

self.schedule_service._execute_job(job, job_path)

self.execution_service.start_script.assert_not_called()
self.assert_schedule_calls([(job, mocked_now_epoch + 86399)])
self.assert_schedule_calls([])

def test_cleanup_execution(self):
self.create_config('script_with_cleanup', auto_cleanup=True)
job = create_job(id=1,
script_name='script_with_cleanup',
repeatable=False,
start_datetime=mocked_now - timedelta(seconds=1))
job_path = save_job(job)

finish_callback = None

Expand All @@ -369,7 +392,7 @@ def add_finish_listener(callback_param, execution_id):

self.execution_service.add_finish_listener.side_effect = add_finish_listener

self.schedule_service._execute_job(job)
self.schedule_service._execute_job(job, job_path)

self.execution_service.start_script.assert_called_once_with(
ANY, job.parameter_values, job.user)
Expand Down Expand Up @@ -416,3 +439,17 @@ def create_job(id=None,

def get_job_filename(job):
return job.script_name + '_' + job.user.get_audit_name() + '_' + str(job.id) + '.json'


def save_job(job):
path = get_job_path(job)
content = json.dumps(job.as_serializable_dict())
file_utils.write_file(path, content)

return path


def get_job_path(job):
schedules_dir = os.path.join(test_utils.temp_folder, 'schedules')
path = os.path.join(schedules_dir, get_job_filename(job))
return path

0 comments on commit 61fa4dc

Please sign in to comment.