Skip to content

Commit

Permalink
updated cleanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Dec 31, 2024
1 parent 9407d92 commit 19fcfce
Showing 1 changed file with 96 additions and 94 deletions.
190 changes: 96 additions & 94 deletions tests/test_execution_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# tests/test_execution_cleanup.py
"""Test cases for pipeline execution cleanup."""

import unittest
from unittest.mock import patch, MagicMock
Expand Down Expand Up @@ -31,7 +31,7 @@ def setUpClass(cls):
cls.dummy_cgat_check_deps = types.ModuleType('scripts.cgat_check_deps')

# Assign dummy function `checkDepedencies` to 'scripts.cgat_check_deps'
cls.dummy_cgat_check_deps.checkDepedencies = MagicMock(return_value=([], [])) # Adjust the return value as needed
cls.dummy_cgat_check_deps.checkDepedencies = MagicMock(return_value=([], []))

# Patch 'sys.modules' to include both 'scripts' and 'scripts.cgat_check_deps'
cls.patcher_scripts = patch.dict('sys.modules', {
Expand All @@ -47,129 +47,131 @@ def tearDownClass(cls):
def setUp(self):
self.test_dir = tempfile.mkdtemp()

# Mock parameters with explicit cluster settings
mock_params = {
"work_dir": self.test_dir,
"tmpdir": self.test_dir,
"cluster": {"memory_default": "4G"},
"pipeline_logfile": "pipeline.log",
"without_cluster": True,
"to_cluster": False, # Explicitly set to_cluster to False
}

# Patches for params and args
self.patcher_execution = patch(
'cgatcore.pipeline.execution.get_params',
return_value=MockParams({
"work_dir": self.test_dir,
"tmpdir": self.test_dir,
"cluster": {"memory_default": "4G"},
"pipeline_logfile": "pipeline.log",
"without_cluster": True,
})
return_value=MockParams(mock_params)
)
self.patcher_execution.start()

self.patcher_control = patch(
'cgatcore.pipeline.control.get_params',
return_value=MockParams({
"work_dir": self.test_dir,
"tmpdir": self.test_dir,
"cluster": {"memory_default": "4G"},
"pipeline_logfile": "pipeline.log",
"without_cluster": True,
})
return_value=MockParams(mock_params)
)
self.patcher_control.start()

# Initialize Executor instance here
# Initialize Executor instance with explicit kwargs
from cgatcore.pipeline.execution import Executor
self.executor = Executor()
self.executor = Executor(to_cluster=False, without_cluster=True)

def tearDown(self):
self.patcher_execution.stop()
self.patcher_control.stop()
shutil.rmtree(self.test_dir, ignore_errors=True)

@follows()
def target_task():
pass # This is a minimal task, no operation needed

@patch('ruffus.pipeline_run')
def test_error_handling_calls_cleanup(self, mock_pipeline_run):
mock_pipeline_run.side_effect = RethrownJobError([("target_task", "job1", "error1", "msg1", "traceback1")])

mock_args = MagicMock()
mock_args.debug = False
mock_args.pipeline_action = "make"
mock_args.input_validation = False # Disable input validation
mock_args.pipeline_targets = ["target_task"] # Reference the dummy task
mock_args.without_cluster = True
mock_args.multiprocess = 1
mock_args.loglevel = 5
mock_args.log_exceptions = True
mock_args.exceptions_terminate_immediately = False
mock_args.stdout = sys.stdout
mock_args.stderr = sys.stderr
mock_args.timeit_file = None
mock_args.timeit_header = True
mock_args.timeit_name = 'test_timeit'

with patch('cgatcore.experiment.get_args', return_value=mock_args):
from cgatcore.pipeline import control

with patch('cgatcore.pipeline.control.logging.getLogger') as mock_logger:
mock_logger.return_value = MagicMock()
print("About to call control.run_workflow() and expecting ValueError")

# Use assertRaises to capture the expected ValueError
with self.assertRaises(ValueError) as context:
control.run_workflow(mock_args, pipeline=None)

print("Caught expected ValueError:", str(context.exception))
def test_cleanup_all_jobs(self):
"""Test cleanup of all jobs."""
# Create some test files
test_files = ["test1.out", "test2.out", "test3.out"]
for f in test_files:
with open(os.path.join(self.test_dir, f), "w") as outf:
outf.write("test")

# Mock job info
job_info = {
"output_files": test_files
}

# Call cleanup
self.executor.cleanup([job_info])

# Check files were removed
for f in test_files:
self.assertFalse(os.path.exists(os.path.join(self.test_dir, f)))

def test_cleanup_failed_job_single_file(self):
test_file = os.path.join(self.test_dir, "test_output.txt")
with open(test_file, "w") as f:
f.write("Test content")
job_info = {"outfile": test_file}
self.executor.cleanup_failed_job(job_info)
self.assertFalse(os.path.exists(test_file))
"""Test cleanup of a failed job with a single output file."""
test_file = "test.out"
file_path = os.path.join(self.test_dir, test_file)

with open(file_path, "w") as outf:
outf.write("test")

job_info = {"output_files": [test_file]}
self.executor.cleanup([job_info])

self.assertFalse(os.path.exists(file_path))

def test_cleanup_failed_job_multiple_files(self):
test_files = [os.path.join(self.test_dir, f"test_output_{i}.txt") for i in range(3)]
for file in test_files:
with open(file, "w") as f:
f.write("Test content")
job_info = {"outfiles": test_files}
self.executor.cleanup_failed_job(job_info)
for file in test_files:
self.assertFalse(os.path.exists(file))
"""Test cleanup of a failed job with multiple output files."""
test_files = ["test1.out", "test2.out"]
file_paths = [os.path.join(self.test_dir, f) for f in test_files]

for path in file_paths:
with open(path, "w") as outf:
outf.write("test")

job_info = {"output_files": test_files}
self.executor.cleanup([job_info])

for path in file_paths:
self.assertFalse(os.path.exists(path))

def test_cleanup_failed_job_nonexistent_file(self):
non_existent_file = os.path.join(self.test_dir, "non_existent.txt")
job_info = {"outfile": non_existent_file}
try:
self.executor.cleanup_failed_job(job_info)
except Exception as e:
self.fail(f"cleanup_failed_job raised an exception unexpectedly: {e}")
"""Test cleanup with a nonexistent file."""
job_info = {"output_files": ["nonexistent.out"]}
# Should not raise an exception
self.executor.cleanup([job_info])

def test_cleanup_failed_job_no_outfiles(self):
job_info = {"job_name": "test_job"}
with self.assertLogs('cgatcore.pipeline', level='WARNING') as cm:
self.executor.cleanup_failed_job(job_info)
self.assertIn("No output files found for job test_job", cm.output[0])
"""Test cleanup with no output files."""
job_info = {"output_files": []}
# Should not raise an exception
self.executor.cleanup([job_info])

def test_error_handling_calls_cleanup(self):
"""Test that error handling properly calls cleanup."""
mock_pipeline = MagicMock()
mock_pipeline.run.side_effect = RethrownJobError([("target_task", "job1", "error1", "msg1", "traceback1")])

with self.assertRaises(RethrownJobError):
self.executor.run(mock_pipeline, make_jobs=True)

def test_start_job(self):
job_info = {"job_name": "test_job"}
"""Test starting a job."""
job_info = {
"task_name": "test_task",
"job_id": 1,
"output_files": ["test.out"]
}

self.executor.start_job(job_info)
self.assertIn(job_info, self.executor.active_jobs)
# Add assertions based on what start_job should do

def test_finish_job(self):
job_info = {"job_name": "test_job"}
"""Test finishing a job."""
job_info = {
"task_name": "test_task",
"job_id": 1,
"output_files": ["test.out"]
}

# First add the job to active_jobs
self.executor.start_job(job_info)
self.assertIn(job_info, self.executor.active_jobs)

# Now finish the job
self.executor.finish_job(job_info)

# Verify job was removed from active_jobs
self.assertNotIn(job_info, self.executor.active_jobs)

def test_cleanup_all_jobs(self):
test_files = [os.path.join(self.test_dir, f"test_output_{i}.txt") for i in range(3)]
for file in test_files:
with open(file, "w") as f:
f.write("Test content")
job_infos = [{"outfile": file} for file in test_files]
for job_info in job_infos:
self.executor.start_job(job_info)
self.executor.cleanup_all_jobs()
for file in test_files:
self.assertFalse(os.path.exists(file))
self.assertEqual(len(self.executor.active_jobs), 0)

0 comments on commit 19fcfce

Please sign in to comment.