diff --git a/airflow/jobs.py b/airflow/jobs.py index 1064ba48e6fc6..9b71854d4dcdf 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1633,13 +1633,16 @@ def _execute_helper(self): self.heartbeat() last_self_heartbeat_time = timezone.utcnow() + is_unit_test = conf.getboolean('core', 'unit_test_mode') loop_end_time = time.time() loop_duration = loop_end_time - loop_start_time self.log.debug( "Ran scheduling loop in %.2f seconds", loop_duration) - self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval) - time.sleep(self._processor_poll_interval) + + if not is_unit_test: + self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval) + time.sleep(self._processor_poll_interval) # Exit early for a test mode, run one additional scheduler loop # to reduce the possibility that parsed DAG was put into the queue @@ -1652,7 +1655,7 @@ def _execute_helper(self): " have been processed {} times".format(self.num_runs)) break - if loop_duration < 1: + if loop_duration < 1 and not is_unit_test: sleep_length = 1 - loop_duration self.log.debug( "Sleeping for {0:.2f} seconds to prevent excessive logging" diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 97d2c17999b83..aceaec1598266 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -276,7 +276,7 @@ def get_dag(self, dag_id): def list_py_file_paths(directory, safe_mode=True, - include_examples=conf.getboolean('core', 'LOAD_EXAMPLES')): + include_examples=None): """ Traverse a directory and look for Python files. @@ -287,6 +287,8 @@ def list_py_file_paths(directory, safe_mode=True, :return: a list of paths to Python files in the specified directory :rtype: list[unicode] """ + if include_examples is None: + include_examples = conf.getboolean('core', 'LOAD_EXAMPLES') file_paths = [] if directory is None: return [] diff --git a/tests/jobs.py b/tests/jobs.py index e3195f86c8a8b..8cb53e948f7f7 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -1386,12 +1386,33 @@ def test_localtaskjob_double_trigger(self): class SchedulerJobTest(unittest.TestCase): def setUp(self): - self.dagbag = DagBag() with create_session() as session: session.query(models.DagRun).delete() + session.query(models.TaskInstance).delete() + session.query(models.Pool).delete() + session.query(models.DagModel).delete() + session.query(models.SlaMiss).delete() session.query(models.ImportError).delete() session.commit() + @classmethod + def setUpClass(cls): + cls.dagbag = DagBag() + + def getboolean(section, key): + if section.lower() == 'core' and key.lower() == 'load_examples': + return False + else: + return configuration.conf.getboolean(section, key) + + cls.patcher = mock.patch('airflow.jobs.conf.getboolean') + mock_getboolean = cls.patcher.start() + mock_getboolean.side_effect = getboolean + + @classmethod + def tearDownClass(cls): + cls.patcher.stop() + @staticmethod def run_single_scheduler_loop_with_no_dags(dags_folder): """ @@ -2489,6 +2510,7 @@ def test_scheduler_task_start_date(self): dag = self.dagbag.get_dag(dag_id) dag.clear() scheduler = SchedulerJob(dag_id, + subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'), num_runs=2) scheduler.run() @@ -2511,7 +2533,8 @@ def test_scheduler_multiprocessing(self): dag.clear() scheduler = SchedulerJob(dag_ids=dag_ids, - num_runs=2) + subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'), + num_runs=1) scheduler.run() # zero tasks ran