From 238ac63b893ef3a68a74706a6491572873aa3194 Mon Sep 17 00:00:00 2001 From: Jean Connelly Date: Tue, 28 Nov 2023 13:25:32 -0500 Subject: [PATCH] Reorganize and add some unit tests --- starcheck/src/lib/Ska/Starcheck/Obsid.pm | 2 +- .../{check_ir_zone.py => state_checks.py} | 155 ++++++++++-------- starcheck/tests/test_state_checks.py | 82 +++++++++ starcheck/utils.py | 7 +- 4 files changed, 177 insertions(+), 69 deletions(-) rename starcheck/{check_ir_zone.py => state_checks.py} (50%) create mode 100644 starcheck/tests/test_state_checks.py diff --git a/starcheck/src/lib/Ska/Starcheck/Obsid.pm b/starcheck/src/lib/Ska/Starcheck/Obsid.pm index 4be58217..27f91a24 100644 --- a/starcheck/src/lib/Ska/Starcheck/Obsid.pm +++ b/starcheck/src/lib/Ska/Starcheck/Obsid.pm @@ -2943,7 +2943,7 @@ sub proseco_args { return \%proseco_args; } - my $man_angle = call_python("check_ir_zone.get_obs_man_angle", + my $man_angle = call_python("state_checks.get_obs_man_angle", [ $targ_cmd->{tstop}, $self->{backstop} ]); $targ_cmd->{man_angle_calc} = $man_angle; diff --git a/starcheck/check_ir_zone.py b/starcheck/state_checks.py similarity index 50% rename from starcheck/check_ir_zone.py rename to starcheck/state_checks.py index f7c6de9f..1dee8bd7 100644 --- a/starcheck/check_ir_zone.py +++ b/starcheck/state_checks.py @@ -1,3 +1,6 @@ +import numpy as np +from cxotime import CxoTime + from astropy.table import Table, vstack import astropy.units as u from functools import lru_cache @@ -6,7 +9,7 @@ import kadi.commands as kadi_commands import kadi.commands.states as kadi_states from cxotime import CxoTime -from Chandra.Maneuver import duration +from chandra_maneuver import duration from Quaternion import Quat @@ -16,14 +19,52 @@ def make_man_table(): Calculate the time for range of manuever angles using Chandra.Maneuver duration. """ - angles = [] durations = [] q0 = Quat(equatorial=(0, 0, 0)) - for angle in np.arange(180): + + # Use a range of angles that sample the curve pretty well by eye + # The duration function is a little slow and not vectorized, so + # this is sparse. + angles = [0, 5, 10, 15, 20, 25, 35, 50, 100, 150, 180] + for angle in angles: q1 = Quat(equatorial=(angle, 0, 0)) - angles.append(angle) durations.append(duration(q0, q1)) - return Table([durations, angles], names=['duration', 'angle']) + + return Table([durations, angles], names=["duration", "angle"]) + + +def calc_pcad_state_nman_sums(states): + """ + For each NPNT state in a monotonically increasing list of states, + calculate the sum of time in NMAN before it + and interpolate into the man_table to calculate the equivalent + angle for a duration. + """ + man_table = make_man_table() + + nman_sums = [] + single_nman_sum = 0 + for state in states: + if state["pcad_mode"] == "NMAN": + single_nman_sum += state["tstop"] - state["tstart"] + if (state["pcad_mode"] == "NPNT") & (single_nman_sum > 0): + nman_sums.append( + { + "tstart": state["tstart"], + "nman_sum": single_nman_sum.copy(), + "angle": np.interp( + single_nman_sum, man_table["duration"], man_table["angle"] + ), + } + ) + # Reset the sum of NMM time to 0 on any NPNT state + single_nman_sum = 0 + + # If in a funny state, just add a large "maneuver time" of 20ks + if state["pcad_mode"] not in ["NMAN", "NPNT"]: + single_nman_sum += 20000 + + return Table(nman_sums) @lru_cache @@ -36,39 +77,20 @@ def make_man_angles(backstop_file): :param: backstop file :returns: astropy time with columns 'tstart', 'nman_sum', 'angle' """ - - man_table = make_man_table() - states = get_states(backstop_file, state_keys=['pcad_mode']) + states = get_states(backstop_file, state_keys=["pcad_mode"]) # If the states begin with NPNT we're done. Otherwise, get some more states. # 10 days is more than enough: it doesn't matter. - if states['pcad_mode'][0] != 'NPNT': + if states["pcad_mode"][0] != "NPNT": pre_states = kadi_states.get_states( - start=CxoTime(states[0]['tstart']) - 10 * u.day, stop=states[0]['tstart'], - state_keys=['pcad_mode'], merge_identical=True) + start=CxoTime(states[0]["tstart"]) - 10 * u.day, + stop=states[0]["tstart"], + state_keys=["pcad_mode"], + merge_identical=True, + ) states = vstack([pre_states, states]) - # For each NPNT start, calculate the sum of time in NMAN before it - # and interpolate into the man_table to calculate the equivalent - # angle for a duration. - nman_sums = [] - single_nman_sum = 0 - for state in states: - if state['pcad_mode'] == 'NMAN': - single_nman_sum += state['tstop'] - state['tstart'] - if (state['pcad_mode'] == 'NPNT') & (single_nman_sum > 0): - nman_sums.append({'tstart': state['tstart'], - 'nman_sum': single_nman_sum.copy(), - 'angle': np.interp(single_nman_sum, - man_table['duration'], - man_table['angle'])}) - single_nman_sum = 0 - - # If in a funny state, just add a large "maneuver" of 20ks - if state['pcad_mode'] not in ['NMAN', 'NPNT']: - single_nman_sum += 20000 - - return Table(nman_sums) + return calc_pcad_state_nman_sums(states) def get_obs_man_angle(npnt_tstart, backstop_file, default_large_angle=180): @@ -85,8 +107,8 @@ def get_obs_man_angle(npnt_tstart, backstop_file, default_large_angle=180): man_angles = make_man_angles(backstop_file) - idx = np.argmin(np.abs(man_angles['tstart'] - npnt_tstart)) - dt = man_angles['tstart'][idx] - npnt_tstart + idx = np.argmin(np.abs(man_angles["tstart"] - npnt_tstart)) + dt = man_angles["tstart"][idx] - npnt_tstart # For starcheck, if something went wrong, just # use a large (180) angle @@ -94,66 +116,69 @@ def get_obs_man_angle(npnt_tstart, backstop_file, default_large_angle=180): return float(default_large_angle) # Otherwise index into the table to get the angle for this duration - return float(man_angles['angle'][idx]) + return float(man_angles["angle"][idx]) def get_states(backstop_file, state_keys=None): - bs_cmds = kadi_commands.get_cmds_from_backstop(backstop_file) - bs_dates = bs_cmds['date'] - ok = bs_cmds['event_type'] == 'RUNNING_LOAD_TERMINATION_TIME' + bs_dates = bs_cmds["date"] + ok = bs_cmds["event_type"] == "RUNNING_LOAD_TERMINATION_TIME" rltt = CxoTime(bs_dates[ok][0] if np.any(ok) else bs_dates[0]) # Scheduled stop time is the end of propagation, either the explicit # time as a pseudo-command in the loads or the last backstop command time. - ok = bs_cmds['event_type'] == 'SCHEDULED_STOP_TIME' + ok = bs_cmds["event_type"] == "SCHEDULED_STOP_TIME" sched_stop = CxoTime(bs_dates[ok][0] if np.any(ok) else bs_dates[-1]) # Get the states for available commands. This automatically gets continuity. - return kadi_states.get_states(cmds=bs_cmds, start=rltt, stop=sched_stop, - state_keys=['pcad_mode'], merge_identical=True) + return kadi_states.get_states( + cmds=bs_cmds, + start=rltt, + stop=sched_stop, + state_keys=state_keys, + merge_identical=True, + ) def ir_zone_ok(backstop_file, out=None): + """ + Check that the high IR zone is all in NMM. - bs_cmds = kadi_commands.get_cmds_from_backstop(backstop_file) - bs_dates = bs_cmds['date'] - ok = bs_cmds['event_type'] == 'RUNNING_LOAD_TERMINATION_TIME' - rltt = CxoTime(bs_dates[ok][0] if np.any(ok) else bs_dates[0]) - - # Scheduled stop time is the end of propagation, either the explicit - # time as a pseudo-command in the loads or the last backstop command time. - ok = bs_cmds['event_type'] == 'SCHEDULED_STOP_TIME' - sched_stop = CxoTime(bs_dates[ok][0] if np.any(ok) else bs_dates[-1]) + :param backstop_file: backstop file + :param out: optional output file name (for plain text report) + :returns: True if high IR zone time is all in NMM, False otherwise + """ - # Get the states for available commands. This automatically gets continuity. - state_keys = ['pcad_mode', 'obsid'] - states = kadi_commands.states.get_states(cmds=bs_cmds, start=rltt, stop=sched_stop, - state_keys=state_keys, merge_identical=True) + states = get_states(backstop_file, state_keys=["pcad_mode", "obsid"]) + bs_cmds = kadi_commands.get_cmds_from_backstop(backstop_file) - perigee_cmds = bs_cmds[(bs_cmds['type'] == 'ORBPOINT') - & (bs_cmds['event_type'] == 'EPERIGEE')] + perigee_cmds = bs_cmds[ + (bs_cmds["type"] == "ORBPOINT") & (bs_cmds["event_type"] == "EPERIGEE") + ] all_ok = True out_text = ["IR ZONE CHECKING"] - for perigee_time in perigee_cmds['time']: - + for perigee_time in perigee_cmds["time"]: # Current high ir zone is from 25 minutes before to 27 minutes after + # These don't seem worth vars, but make sure to change in the text below + # too when these are updated. ir_zone_start = perigee_time - 25 * 60 ir_zone_stop = perigee_time + 27 * 60 + out_text.append(f"Checking perigee at {CxoTime(perigee_time).date}") out_text.append( - f"Checking perigee at {CxoTime(perigee_time).date}") - out_text.append( - f" High IR Zone from {CxoTime(ir_zone_start).date} to {CxoTime(ir_zone_stop).date}") - ok = (states['tstart'] <= ir_zone_stop) & (states['tstop'] >= ir_zone_start) + f" High IR Zone from (perigee - 25m) {CxoTime(ir_zone_start).date} " + f"to (perigee + 27m) {CxoTime(ir_zone_stop).date}" + ) + ok = (states["tstart"] <= ir_zone_stop) & (states["tstop"] >= ir_zone_start) for state in states[ok]: out_text.append( - f" state {state['datestart']} {state['datestop']} {state['pcad_mode']}") - if np.any(states[ok]['pcad_mode'] != 'NMAN'): + f" state {state['datestart']} {state['datestop']} {state['pcad_mode']}" + ) + if np.any(states[ok]["pcad_mode"] != "NMAN"): all_ok = False if out is not None: - with open(out, 'w') as fh: + with open(out, "w") as fh: fh.writelines("\n".join(out_text)) return all_ok diff --git a/starcheck/tests/test_state_checks.py b/starcheck/tests/test_state_checks.py new file mode 100644 index 00000000..7fc49bc3 --- /dev/null +++ b/starcheck/tests/test_state_checks.py @@ -0,0 +1,82 @@ +import numpy as np +from pathlib import Path +import pytest + +from astropy.table import Table +import chandra_maneuver +import parse_cm.tests +import Quaternion + +from starcheck.state_checks import ( + make_man_table, + get_obs_man_angle, + calc_pcad_state_nman_sums, +) + + +@pytest.mark.parametrize("angle", [0, 45, 90, 135, 180]) +def test_make_man_table_durations(angle): + """ + Test that the calculated interpolated duractions in the maneuver angle duration + table are reasonable using chandra_maneuver.duration directly as a reference. + """ + + man_table = make_man_table() + + # Check that the interpolated durations are reasonable + dur_method = np.interp(angle, man_table["angle"], man_table["duration"]) + q0 = Quaternion.Quat(equatorial=(0, 0, 0)) + dur_direct = chandra_maneuver.duration( + q0, Quaternion.Quat(equatorial=(angle, 0, 0)) + ) + assert np.isclose(dur_method, dur_direct, rtol=0, atol=10) + + +@pytest.mark.parametrize( + "duration,expected_angle", + [(240, 0), (1000, 27.7), (2000, 100.5), (3000, 175.5), (4000, 180)], +) +def test_make_man_table_angles(duration, expected_angle): + """ + Test that the angles calculatd from interpolation into the maneuver angle duration table + match reference values. + """ + + man_table = make_man_table() + angle_calc = np.interp(duration, man_table["duration"], man_table["angle"]) + assert np.isclose(angle_calc, expected_angle, rtol=0, atol=0.1) + + +# These tstarts and expected angles are specific to the CR182_0803.backstop file +@pytest.mark.parametrize( + "tstart,expected_angle", + [(646940289.224, 143.4), (646947397.759, 0.22), (646843532.911, 92.6)], +) +def test_get_obs_man_angle(tstart, expected_angle): + """ + Confirm that some of the angles calculated for a reference backstop file + match reference values. + """ + # Use a backstop file already in ska test data + backstop_file = ( + Path(parse_cm.tests.__file__).parent / "data" / "CR182_0803.backstop" + ) + angle = get_obs_man_angle(tstart, backstop_file) + assert np.isclose(angle, expected_angle, rtol=0, atol=0.1) + + +def test_calc_pcad_state_nman_sums(): + # Make a synthetic table of pcad_mode states + states = [ + {"tstart": 0, "tstop": 1000, "pcad_mode": "NPNT"}, + {"tstart": 1000, "tstop": 2000, "pcad_mode": "NMAN"}, + {"tstart": 2000, "tstop": 3000, "pcad_mode": "NMAN"}, + {"tstart": 3000, "tstop": 4000, "pcad_mode": "NPNT"}, + {"tstart": 4000, "tstop": 5000, "pcad_mode": "NMAN"}, + {"tstart": 5000, "tstop": 6000, "pcad_mode": "NPNT"}, + ] + states = Table(states) + nman_sums = calc_pcad_state_nman_sums(states) + # Confirm that the first NPNT after an NMAN has the appropriate sum of NMAN times before it + assert nman_sums[nman_sums["tstart"] == 3000]["nman_sum"][0] == 2000 + assert nman_sums[nman_sums["tstart"] == 5000]["nman_sum"][0] == 1000 diff --git a/starcheck/utils.py b/starcheck/utils.py index 45bb2e92..61734336 100644 --- a/starcheck/utils.py +++ b/starcheck/utils.py @@ -16,7 +16,6 @@ from chandra_aca.dark_model import dark_temp_scale import sparkles from chandra_aca.transform import mag_to_count_rate, pixels_to_yagzag, yagzag_to_pixels -from kadi.commands import states from mica.archive import aca_dark from parse_cm import read_backstop_as_list, write_backstop from proseco.catalog import get_aca_catalog, get_effective_t_ccd @@ -26,10 +25,12 @@ from testr import test_helper from cxotime import CxoTime +import kadi.commands.states as kadi_states + import starcheck from starcheck import __version__ as version from starcheck.calc_ccd_temps import get_ccd_temps -from starcheck.check_ir_zone import ir_zone_ok +from starcheck.state_checks import ir_zone_ok from starcheck.plot import make_plots_for_obsid ACQS = mica.stats.acq_stats.get_stats() @@ -123,7 +124,7 @@ def get_dither_kadi_state(date): "dither_period_pitch", "dither_period_yaw", ] - state = states.get_continuity(date, cols) + state = kadi_states.get_continuity(date, cols) # Cast the numpy floats as plain floats for key in [ "dither_ampl_pitch",