Skip to content

Commit

Permalink
[AIRFLOW-3885] ~10x speed-up of SchedulerJobTest suite (apache#4730)
Browse files Browse the repository at this point in the history
The SchedulerJobTest suite now takes ~90 seconds on my laptop (down from
~900 seconds == 15 minutes) on Jenkins.

There are a few optimizations here:

1. Don't sleep() for 1 second every scheduling loop (in unit tests)
2. Don't process the example DAGs
3. Use `subdir` to process only the DAGs we need, for a couple of tests
   that actually run the scheduler
4. Only load the DagBag once instead of before each test

I've also added a few tables to the list of tables that are cleaned up
in between test runs to make the tests re-entrant.
  • Loading branch information
astahlman authored and ashb committed Mar 8, 2019
1 parent 8e5eb07 commit f1ed955
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
9 changes: 6 additions & 3 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1633,13 +1633,16 @@ def _execute_helper(self):
self.heartbeat()
last_self_heartbeat_time = timezone.utcnow()

is_unit_test = conf.getboolean('core', 'unit_test_mode')
loop_end_time = time.time()
loop_duration = loop_end_time - loop_start_time
self.log.debug(
"Ran scheduling loop in %.2f seconds",
loop_duration)
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)

if not is_unit_test:
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)

# Exit early for a test mode, run one additional scheduler loop
# to reduce the possibility that parsed DAG was put into the queue
Expand All @@ -1652,7 +1655,7 @@ def _execute_helper(self):
" have been processed {} times".format(self.num_runs))
break

if loop_duration < 1:
if loop_duration < 1 and not is_unit_test:
sleep_length = 1 - loop_duration
self.log.debug(
"Sleeping for {0:.2f} seconds to prevent excessive logging"
Expand Down
4 changes: 3 additions & 1 deletion airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def get_dag(self, dag_id):


def list_py_file_paths(directory, safe_mode=True,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES')):
include_examples=None):
"""
Traverse a directory and look for Python files.
Expand All @@ -287,6 +287,8 @@ def list_py_file_paths(directory, safe_mode=True,
:return: a list of paths to Python files in the specified directory
:rtype: list[unicode]
"""
if include_examples is None:
include_examples = conf.getboolean('core', 'LOAD_EXAMPLES')
file_paths = []
if directory is None:
return []
Expand Down
27 changes: 25 additions & 2 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,12 +1386,33 @@ def test_localtaskjob_double_trigger(self):
class SchedulerJobTest(unittest.TestCase):

def setUp(self):
self.dagbag = DagBag()
with create_session() as session:
session.query(models.DagRun).delete()
session.query(models.TaskInstance).delete()
session.query(models.Pool).delete()
session.query(models.DagModel).delete()
session.query(models.SlaMiss).delete()
session.query(models.ImportError).delete()
session.commit()

@classmethod
def setUpClass(cls):
cls.dagbag = DagBag()

def getboolean(section, key):
if section.lower() == 'core' and key.lower() == 'load_examples':
return False
else:
return configuration.conf.getboolean(section, key)

cls.patcher = mock.patch('airflow.jobs.conf.getboolean')
mock_getboolean = cls.patcher.start()
mock_getboolean.side_effect = getboolean

@classmethod
def tearDownClass(cls):
cls.patcher.stop()

@staticmethod
def run_single_scheduler_loop_with_no_dags(dags_folder):
"""
Expand Down Expand Up @@ -2489,6 +2510,7 @@ def test_scheduler_task_start_date(self):
dag = self.dagbag.get_dag(dag_id)
dag.clear()
scheduler = SchedulerJob(dag_id,
subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
num_runs=2)
scheduler.run()

Expand All @@ -2511,7 +2533,8 @@ def test_scheduler_multiprocessing(self):
dag.clear()

scheduler = SchedulerJob(dag_ids=dag_ids,
num_runs=2)
subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
num_runs=1)
scheduler.run()

# zero tasks ran
Expand Down

0 comments on commit f1ed955

Please sign in to comment.