Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch band pass unassigned #62

Merged
merged 6 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 40 additions & 42 deletions src/pytom_tm/tmjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,61 +393,59 @@ def start_job(
read_mrc(self.mask)
)

# create weighting, include missing wedge and band-pass filters
template_wedge = None
if self.tilt_angles is not None:
# convolute tomo with whitening filter and bandpass
tomo_filter = (create_gaussian_band_pass(
search_volume.shape,
self.voxel_size,
# init tomogram and template weighting
tomo_filter, template_wedge = 1, 1
# first generate bandpass filters
if not (self.low_pass is None and self.high_pass is None):
tomo_filter *= create_gaussian_band_pass(
search_volume.shape,
self.voxel_size,
self.low_pass,
self.high_pass
).astype(np.float32)
template_wedge *= create_gaussian_band_pass(
self.template_shape,
self.voxel_size,
self.low_pass,
self.high_pass
) * (profile_to_weighting(
).astype(np.float32)

# then multiply with optional whitening filters
if self.whiten_spectrum:
tomo_filter *= profile_to_weighting(
np.load(self.whitening_filter),
search_volume.shape
) if self.whiten_spectrum else 1)).astype(np.float32)

# we always apply a binary wedge (with optional band pass) to the volume to remove empty regions
search_volume = np.real(irfftn(rfftn(search_volume) * tomo_filter, s=search_volume.shape))
).astype(np.float32)
template_wedge *= profile_to_weighting(
np.load(self.whitening_filter),
self.template_shape
).astype(np.float32)

# get template wedge
template_wedge = (create_wedge(
# if tilt angles are provided we can create wedge filters
if self.tilt_angles is not None:
# for the tomogram a binary wedge is generated to explicitly set the missing wedge region to 0
tomo_filter *= create_wedge(
search_volume.shape,
self.tilt_angles,
self.voxel_size,
cut_off_radius=1.,
angles_in_degrees=True,
tilt_weighting=False
)
McHaillet marked this conversation as resolved.
Show resolved Hide resolved
# for the template a binary or per-tilt-weighted wedge is generated depending on the options
template_wedge *= create_wedge(
self.template_shape,
self.tilt_angles,
self.voxel_size,
cut_off_radius=1.,
angles_in_degrees=True,
low_pass=self.low_pass,
high_pass=self.high_pass,
tilt_weighting=self.tilt_weighting,
accumulated_dose_per_tilt=self.dose_accumulation,
ctf_params_per_tilt=self.ctf_data
) * (profile_to_weighting(
np.load(self.whitening_filter),
self.template_shape
) if self.whiten_spectrum else 1)).astype(np.float32)

if logging.root.level == logging.DEBUG:
write_mrc(
self.output_dir.joinpath('debug_tomo_filter.mrc'),
tomo_filter,
voxel_size=self.voxel_size
)
write_mrc(
self.output_dir.joinpath('debug_search_volume.mrc'),
search_volume,
voxel_size=self.voxel_size
)
write_mrc(
self.output_dir.joinpath('debug_template_convoluted.mrc'),
np.fft.irfftn(np.fft.rfftn(template) * template_wedge, s=template.shape).astype(np.float32),
voxel_size=self.voxel_size
)
write_mrc(
self.output_dir.joinpath('debug_3d_ctf.mrc'),
template_wedge,
voxel_size=self.voxel_size
)
).astype(np.float32)

# apply the optional band pass and whitening filter to the search region
search_volume = np.real(irfftn(rfftn(search_volume) * tomo_filter, s=search_volume.shape))

# load rotation search
angle_ids = list(range(
Expand Down
36 changes: 36 additions & 0 deletions tests/test_tmjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pytom_tm.tmjob import TMJob, TMJobError
from pytom_tm.io import read_mrc, write_mrc, UnequalSpacingError
from pytom_tm.extract import extract_particles
from test_weights import CTF_PARAMS, ACCUMULATED_DOSE, TILT_ANGLES
McHaillet marked this conversation as resolved.
Show resolved Hide resolved


TOMO_SHAPE = (100, 107, 59)
Expand All @@ -28,6 +29,7 @@
TEST_SCORES = TEST_DATA_DIR.joinpath('tomogram_scores.mrc')
TEST_ANGLES = TEST_DATA_DIR.joinpath('tomogram_angles.mrc')
TEST_CUSTOM_ANGULAR_SEARCH = TEST_DATA_DIR.joinpath('custom_angular_search.txt')
TEST_WHITENING_FILTER = TEST_DATA_DIR.joinpath('tomogram_whitening_filter.npy')


class TestTMJob(unittest.TestCase):
Expand Down Expand Up @@ -107,6 +109,9 @@ def tearDownClass(cls) -> None:
TEST_SCORES.unlink()
TEST_ANGLES.unlink()
TEST_CUSTOM_ANGULAR_SEARCH.unlink()
# the whitening filter might not exist if the job with spectrum whitening failed, so the unlinking needs to
# allow this (with missing_ok=True) to ensure clean up of the test directory
TEST_WHITENING_FILTER.unlink(missing_ok=True)
TEST_DATA_DIR.rmdir()

def setUp(self):
Expand Down Expand Up @@ -149,6 +154,37 @@ def test_tm_job_copy(self):
msg='Tomogram shape not correct in job, perhaps transpose issue?'
)

def test_tm_job_weighting_options(self):
# run with all options
job = TMJob('0', 10, TEST_TOMOGRAM, TEST_TEMPLATE, TEST_MASK, TEST_DATA_DIR,
angle_increment='90.00', voxel_size=1., low_pass=10, high_pass=100,
dose_accumulation=ACCUMULATED_DOSE, ctf_data=CTF_PARAMS, tilt_angles=TILT_ANGLES,
whiten_spectrum=True, tilt_weighting=True)
score, angle = job.start_job(0, return_volumes=True)
self.assertEqual(score.shape, job.tomo_shape, msg='TMJob with all options failed')

# run with only tilt weighting (in test_weights different options are tested for create_wedge)
job = TMJob('0', 10, TEST_TOMOGRAM, TEST_TEMPLATE, TEST_MASK, TEST_DATA_DIR,
angle_increment='90.00', voxel_size=1.,
dose_accumulation=ACCUMULATED_DOSE, ctf_data=CTF_PARAMS, tilt_angles=TILT_ANGLES,
tilt_weighting=True)
score, angle = job.start_job(0, return_volumes=True)
self.assertEqual(score.shape, job.tomo_shape, msg='TMJob with only wedge creation failed')

# run with only bandpass (in test_weights bandpass option are tested)
job = TMJob('0', 10, TEST_TOMOGRAM, TEST_TEMPLATE, TEST_MASK, TEST_DATA_DIR,
angle_increment='90.00', voxel_size=1., low_pass=10, high_pass=100)
score, angle = job.start_job(0, return_volumes=True)
self.assertEqual(score.shape, job.tomo_shape, msg='TMJob with only band-pass failed')

# run with only spectrum whitening (in test_weights the whitening filter is tested
job = TMJob('0', 10, TEST_TOMOGRAM, TEST_TEMPLATE, TEST_MASK, TEST_DATA_DIR,
angle_increment='90.00', voxel_size=1., whiten_spectrum=True)
score, angle = job.start_job(0, return_volumes=True)
self.assertEqual(score.shape, job.tomo_shape, msg='TMJob with only whitening filter failed')

# TMJob with none of these weighting options is tested in all other runs in this file.

def test_custom_angular_search(self):
job = TMJob('0', 10, TEST_TOMOGRAM, TEST_TEMPLATE, TEST_MASK, TEST_DATA_DIR,
angle_increment=TEST_CUSTOM_ANGULAR_SEARCH, voxel_size=1.)
Expand Down
27 changes: 18 additions & 9 deletions tests/test_weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@
65.895
'''
ACCUMULATED_DOSE = [float(x.strip()) for x in DOSE_FILE.split('\n') if x != '']
TILT_ANGLES = list(range(-51, 54, 3))


class TestWeights(unittest.TestCase):
def setUp(self):
self.volume_shape_even = (10, 10, 10)
self.volume_shape_uneven = (11, 11, 11)
self.volume_shape_irregular = (7, 12, 6)
self.tilt_angles = list(range(-51, 54, 3))
self.voxel_size = 3.34
self.low_pass = 10
self.high_pass = 50
Expand Down Expand Up @@ -180,23 +180,23 @@ def test_create_wedge(self):
'equal to 0'):
create_wedge(
self.volume_shape_even,
self.tilt_angles,
TILT_ANGLES,
0.
)
with self.assertRaises(ValueError, msg='Create wedge should raise ValueError if cut_off_radius is smaller or '
'equal to 0'):
create_wedge(
self.volume_shape_even,
self.tilt_angles,
TILT_ANGLES,
1.,
cut_off_radius=0.
)

# create test wedges
structured_wedge = create_wedge(self.volume_shape_even, self.tilt_angles, 1., tilt_weighting=True)
symmetric_wedge = create_wedge(self.volume_shape_even, [self.tilt_angles[0], self.tilt_angles[-1]],
structured_wedge = create_wedge(self.volume_shape_even, TILT_ANGLES, 1., tilt_weighting=True)
symmetric_wedge = create_wedge(self.volume_shape_even, [TILT_ANGLES[0], TILT_ANGLES[-1]],
1., tilt_weighting=False)
asymmetric_wedge = create_wedge(self.volume_shape_even, [self.tilt_angles[0], self.tilt_angles[-2]],
asymmetric_wedge = create_wedge(self.volume_shape_even, [TILT_ANGLES[0], TILT_ANGLES[-2]],
1., tilt_weighting=False)

self.assertEqual(structured_wedge.shape, self.reduced_even_shape_3d,
Expand All @@ -217,26 +217,35 @@ def test_create_wedge(self):
self.assertTrue(np.sum((symmetric_wedge != asymmetric_wedge) * 1) != 0,
msg='Symmetric and asymmetric wedge should be different!')

structured_wedge = create_wedge(self.volume_shape_even, self.tilt_angles, self.voxel_size, tilt_weighting=True,
structured_wedge = create_wedge(self.volume_shape_even, TILT_ANGLES, self.voxel_size, tilt_weighting=True,
cut_off_radius=1., low_pass=self.low_pass, high_pass=self.high_pass)
self.assertEqual(structured_wedge.shape, self.reduced_even_shape_3d,
msg='Wedge with band-pass does not have expected output shape')
self.assertEqual(structured_wedge.dtype, np.float32,
msg='Wedge with band-pass does not have expected dtype')

weights = create_wedge(self.volume_shape_even, self.tilt_angles, self.voxel_size * 3,
# test shapes of wedges
weights = create_wedge(self.volume_shape_even, TILT_ANGLES, self.voxel_size * 3,
tilt_weighting=True, low_pass=40,
accumulated_dose_per_tilt=ACCUMULATED_DOSE,
ctf_params_per_tilt=CTF_PARAMS)
self.assertEqual(weights.shape, self.reduced_even_shape_3d,
msg='3D CTF does not have the correct reduced fourier shape.')
weights = create_wedge(self.volume_shape_uneven, self.tilt_angles, self.voxel_size * 3,
weights = create_wedge(self.volume_shape_uneven, TILT_ANGLES, self.voxel_size * 3,
tilt_weighting=True, low_pass=40,
accumulated_dose_per_tilt=ACCUMULATED_DOSE,
ctf_params_per_tilt=CTF_PARAMS)
self.assertEqual(weights.shape, self.reduced_uneven_shape_3d,
msg='3D CTF does not have the correct reduced fourier shape.')

# test parameter flexibility of tilt_weighted wedge
weights = create_wedge(self.volume_shape_even, TILT_ANGLES, self.voxel_size * 3,
tilt_weighting=True, low_pass=self.low_pass,
accumulated_dose_per_tilt=None,
ctf_params_per_tilt=None)
self.assertEqual(weights.shape, self.reduced_even_shape_3d,
msg='Tilt weighted wedge should also work without defocus and dose info.')

def test_ctf(self):
ctf_raw = create_ctf(
self.volume_shape_even,
Expand Down