Skip to content

Commit

Permalink
Reorganize and add some unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanconn committed Nov 28, 2023
1 parent 1156f8f commit 238ac63
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 69 deletions.
2 changes: 1 addition & 1 deletion starcheck/src/lib/Ska/Starcheck/Obsid.pm
Original file line number Diff line number Diff line change
Expand Up @@ -2943,7 +2943,7 @@ sub proseco_args {
return \%proseco_args;
}

my $man_angle = call_python("check_ir_zone.get_obs_man_angle",
my $man_angle = call_python("state_checks.get_obs_man_angle",
[ $targ_cmd->{tstop}, $self->{backstop} ]);
$targ_cmd->{man_angle_calc} = $man_angle;

Expand Down
155 changes: 90 additions & 65 deletions starcheck/check_ir_zone.py → starcheck/state_checks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import numpy as np
from cxotime import CxoTime

from astropy.table import Table, vstack
import astropy.units as u
from functools import lru_cache
Expand All @@ -6,7 +9,7 @@
import kadi.commands as kadi_commands
import kadi.commands.states as kadi_states
from cxotime import CxoTime
from Chandra.Maneuver import duration
from chandra_maneuver import duration
from Quaternion import Quat


Expand All @@ -16,14 +19,52 @@ def make_man_table():
Calculate the time for range of manuever angles using Chandra.Maneuver
duration.
"""
angles = []
durations = []
q0 = Quat(equatorial=(0, 0, 0))
for angle in np.arange(180):

# Use a range of angles that sample the curve pretty well by eye
# The duration function is a little slow and not vectorized, so
# this is sparse.
angles = [0, 5, 10, 15, 20, 25, 35, 50, 100, 150, 180]
for angle in angles:
q1 = Quat(equatorial=(angle, 0, 0))
angles.append(angle)
durations.append(duration(q0, q1))
return Table([durations, angles], names=['duration', 'angle'])

return Table([durations, angles], names=["duration", "angle"])


def calc_pcad_state_nman_sums(states):
"""
For each NPNT state in a monotonically increasing list of states,
calculate the sum of time in NMAN before it
and interpolate into the man_table to calculate the equivalent
angle for a duration.
"""
man_table = make_man_table()

nman_sums = []
single_nman_sum = 0
for state in states:
if state["pcad_mode"] == "NMAN":
single_nman_sum += state["tstop"] - state["tstart"]
if (state["pcad_mode"] == "NPNT") & (single_nman_sum > 0):
nman_sums.append(
{
"tstart": state["tstart"],
"nman_sum": single_nman_sum.copy(),
"angle": np.interp(
single_nman_sum, man_table["duration"], man_table["angle"]
),
}
)
# Reset the sum of NMM time to 0 on any NPNT state
single_nman_sum = 0

# If in a funny state, just add a large "maneuver time" of 20ks
if state["pcad_mode"] not in ["NMAN", "NPNT"]:
single_nman_sum += 20000

return Table(nman_sums)


@lru_cache
Expand All @@ -36,39 +77,20 @@ def make_man_angles(backstop_file):
:param: backstop file
:returns: astropy time with columns 'tstart', 'nman_sum', 'angle'
"""

man_table = make_man_table()
states = get_states(backstop_file, state_keys=['pcad_mode'])
states = get_states(backstop_file, state_keys=["pcad_mode"])

# If the states begin with NPNT we're done. Otherwise, get some more states.
# 10 days is more than enough: it doesn't matter.
if states['pcad_mode'][0] != 'NPNT':
if states["pcad_mode"][0] != "NPNT":
pre_states = kadi_states.get_states(
start=CxoTime(states[0]['tstart']) - 10 * u.day, stop=states[0]['tstart'],
state_keys=['pcad_mode'], merge_identical=True)
start=CxoTime(states[0]["tstart"]) - 10 * u.day,
stop=states[0]["tstart"],
state_keys=["pcad_mode"],
merge_identical=True,
)
states = vstack([pre_states, states])

# For each NPNT start, calculate the sum of time in NMAN before it
# and interpolate into the man_table to calculate the equivalent
# angle for a duration.
nman_sums = []
single_nman_sum = 0
for state in states:
if state['pcad_mode'] == 'NMAN':
single_nman_sum += state['tstop'] - state['tstart']
if (state['pcad_mode'] == 'NPNT') & (single_nman_sum > 0):
nman_sums.append({'tstart': state['tstart'],
'nman_sum': single_nman_sum.copy(),
'angle': np.interp(single_nman_sum,
man_table['duration'],
man_table['angle'])})
single_nman_sum = 0

# If in a funny state, just add a large "maneuver" of 20ks
if state['pcad_mode'] not in ['NMAN', 'NPNT']:
single_nman_sum += 20000

return Table(nman_sums)
return calc_pcad_state_nman_sums(states)


def get_obs_man_angle(npnt_tstart, backstop_file, default_large_angle=180):
Expand All @@ -85,75 +107,78 @@ def get_obs_man_angle(npnt_tstart, backstop_file, default_large_angle=180):

man_angles = make_man_angles(backstop_file)

idx = np.argmin(np.abs(man_angles['tstart'] - npnt_tstart))
dt = man_angles['tstart'][idx] - npnt_tstart
idx = np.argmin(np.abs(man_angles["tstart"] - npnt_tstart))
dt = man_angles["tstart"][idx] - npnt_tstart

# For starcheck, if something went wrong, just
# use a large (180) angle
if np.abs(dt) > 100:
return float(default_large_angle)

# Otherwise index into the table to get the angle for this duration
return float(man_angles['angle'][idx])
return float(man_angles["angle"][idx])


def get_states(backstop_file, state_keys=None):

bs_cmds = kadi_commands.get_cmds_from_backstop(backstop_file)
bs_dates = bs_cmds['date']
ok = bs_cmds['event_type'] == 'RUNNING_LOAD_TERMINATION_TIME'
bs_dates = bs_cmds["date"]
ok = bs_cmds["event_type"] == "RUNNING_LOAD_TERMINATION_TIME"
rltt = CxoTime(bs_dates[ok][0] if np.any(ok) else bs_dates[0])

# Scheduled stop time is the end of propagation, either the explicit
# time as a pseudo-command in the loads or the last backstop command time.
ok = bs_cmds['event_type'] == 'SCHEDULED_STOP_TIME'
ok = bs_cmds["event_type"] == "SCHEDULED_STOP_TIME"
sched_stop = CxoTime(bs_dates[ok][0] if np.any(ok) else bs_dates[-1])

# Get the states for available commands. This automatically gets continuity.
return kadi_states.get_states(cmds=bs_cmds, start=rltt, stop=sched_stop,
state_keys=['pcad_mode'], merge_identical=True)
return kadi_states.get_states(
cmds=bs_cmds,
start=rltt,
stop=sched_stop,
state_keys=state_keys,
merge_identical=True,
)


def ir_zone_ok(backstop_file, out=None):
"""
Check that the high IR zone is all in NMM.
bs_cmds = kadi_commands.get_cmds_from_backstop(backstop_file)
bs_dates = bs_cmds['date']
ok = bs_cmds['event_type'] == 'RUNNING_LOAD_TERMINATION_TIME'
rltt = CxoTime(bs_dates[ok][0] if np.any(ok) else bs_dates[0])

# Scheduled stop time is the end of propagation, either the explicit
# time as a pseudo-command in the loads or the last backstop command time.
ok = bs_cmds['event_type'] == 'SCHEDULED_STOP_TIME'
sched_stop = CxoTime(bs_dates[ok][0] if np.any(ok) else bs_dates[-1])
:param backstop_file: backstop file
:param out: optional output file name (for plain text report)
:returns: True if high IR zone time is all in NMM, False otherwise
"""

# Get the states for available commands. This automatically gets continuity.
state_keys = ['pcad_mode', 'obsid']
states = kadi_commands.states.get_states(cmds=bs_cmds, start=rltt, stop=sched_stop,
state_keys=state_keys, merge_identical=True)
states = get_states(backstop_file, state_keys=["pcad_mode", "obsid"])
bs_cmds = kadi_commands.get_cmds_from_backstop(backstop_file)

perigee_cmds = bs_cmds[(bs_cmds['type'] == 'ORBPOINT')
& (bs_cmds['event_type'] == 'EPERIGEE')]
perigee_cmds = bs_cmds[
(bs_cmds["type"] == "ORBPOINT") & (bs_cmds["event_type"] == "EPERIGEE")
]

all_ok = True
out_text = ["IR ZONE CHECKING"]
for perigee_time in perigee_cmds['time']:

for perigee_time in perigee_cmds["time"]:
# Current high ir zone is from 25 minutes before to 27 minutes after
# These don't seem worth vars, but make sure to change in the text below
# too when these are updated.
ir_zone_start = perigee_time - 25 * 60
ir_zone_stop = perigee_time + 27 * 60
out_text.append(f"Checking perigee at {CxoTime(perigee_time).date}")
out_text.append(
f"Checking perigee at {CxoTime(perigee_time).date}")
out_text.append(
f" High IR Zone from {CxoTime(ir_zone_start).date} to {CxoTime(ir_zone_stop).date}")
ok = (states['tstart'] <= ir_zone_stop) & (states['tstop'] >= ir_zone_start)
f" High IR Zone from (perigee - 25m) {CxoTime(ir_zone_start).date} "
f"to (perigee + 27m) {CxoTime(ir_zone_stop).date}"
)
ok = (states["tstart"] <= ir_zone_stop) & (states["tstop"] >= ir_zone_start)
for state in states[ok]:
out_text.append(
f" state {state['datestart']} {state['datestop']} {state['pcad_mode']}")
if np.any(states[ok]['pcad_mode'] != 'NMAN'):
f" state {state['datestart']} {state['datestop']} {state['pcad_mode']}"
)
if np.any(states[ok]["pcad_mode"] != "NMAN"):
all_ok = False

if out is not None:
with open(out, 'w') as fh:
with open(out, "w") as fh:
fh.writelines("\n".join(out_text))

return all_ok
82 changes: 82 additions & 0 deletions starcheck/tests/test_state_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import numpy as np
from pathlib import Path
import pytest

from astropy.table import Table
import chandra_maneuver
import parse_cm.tests
import Quaternion

from starcheck.state_checks import (
make_man_table,
get_obs_man_angle,
calc_pcad_state_nman_sums,
)


@pytest.mark.parametrize("angle", [0, 45, 90, 135, 180])
def test_make_man_table_durations(angle):
"""
Test that the calculated interpolated duractions in the maneuver angle duration
table are reasonable using chandra_maneuver.duration directly as a reference.
"""

man_table = make_man_table()

# Check that the interpolated durations are reasonable
dur_method = np.interp(angle, man_table["angle"], man_table["duration"])
q0 = Quaternion.Quat(equatorial=(0, 0, 0))
dur_direct = chandra_maneuver.duration(
q0, Quaternion.Quat(equatorial=(angle, 0, 0))
)
assert np.isclose(dur_method, dur_direct, rtol=0, atol=10)


@pytest.mark.parametrize(
"duration,expected_angle",
[(240, 0), (1000, 27.7), (2000, 100.5), (3000, 175.5), (4000, 180)],
)
def test_make_man_table_angles(duration, expected_angle):
"""
Test that the angles calculatd from interpolation into the maneuver angle duration table
match reference values.
"""

man_table = make_man_table()
angle_calc = np.interp(duration, man_table["duration"], man_table["angle"])
assert np.isclose(angle_calc, expected_angle, rtol=0, atol=0.1)


# These tstarts and expected angles are specific to the CR182_0803.backstop file
@pytest.mark.parametrize(
"tstart,expected_angle",
[(646940289.224, 143.4), (646947397.759, 0.22), (646843532.911, 92.6)],
)
def test_get_obs_man_angle(tstart, expected_angle):
"""
Confirm that some of the angles calculated for a reference backstop file
match reference values.
"""
# Use a backstop file already in ska test data
backstop_file = (
Path(parse_cm.tests.__file__).parent / "data" / "CR182_0803.backstop"
)
angle = get_obs_man_angle(tstart, backstop_file)
assert np.isclose(angle, expected_angle, rtol=0, atol=0.1)


def test_calc_pcad_state_nman_sums():
# Make a synthetic table of pcad_mode states
states = [
{"tstart": 0, "tstop": 1000, "pcad_mode": "NPNT"},
{"tstart": 1000, "tstop": 2000, "pcad_mode": "NMAN"},
{"tstart": 2000, "tstop": 3000, "pcad_mode": "NMAN"},
{"tstart": 3000, "tstop": 4000, "pcad_mode": "NPNT"},
{"tstart": 4000, "tstop": 5000, "pcad_mode": "NMAN"},
{"tstart": 5000, "tstop": 6000, "pcad_mode": "NPNT"},
]
states = Table(states)
nman_sums = calc_pcad_state_nman_sums(states)
# Confirm that the first NPNT after an NMAN has the appropriate sum of NMAN times before it
assert nman_sums[nman_sums["tstart"] == 3000]["nman_sum"][0] == 2000
assert nman_sums[nman_sums["tstart"] == 5000]["nman_sum"][0] == 1000
7 changes: 4 additions & 3 deletions starcheck/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from chandra_aca.dark_model import dark_temp_scale
import sparkles
from chandra_aca.transform import mag_to_count_rate, pixels_to_yagzag, yagzag_to_pixels
from kadi.commands import states
from mica.archive import aca_dark
from parse_cm import read_backstop_as_list, write_backstop
from proseco.catalog import get_aca_catalog, get_effective_t_ccd
Expand All @@ -26,10 +25,12 @@
from testr import test_helper
from cxotime import CxoTime

import kadi.commands.states as kadi_states

import starcheck
from starcheck import __version__ as version
from starcheck.calc_ccd_temps import get_ccd_temps
from starcheck.check_ir_zone import ir_zone_ok
from starcheck.state_checks import ir_zone_ok
from starcheck.plot import make_plots_for_obsid

ACQS = mica.stats.acq_stats.get_stats()
Expand Down Expand Up @@ -123,7 +124,7 @@ def get_dither_kadi_state(date):
"dither_period_pitch",
"dither_period_yaw",
]
state = states.get_continuity(date, cols)
state = kadi_states.get_continuity(date, cols)
# Cast the numpy floats as plain floats
for key in [
"dither_ampl_pitch",
Expand Down

0 comments on commit 238ac63

Please sign in to comment.