From 2438c5b784b76c32f983e82e2b74b3a762cfc42a Mon Sep 17 00:00:00 2001 From: Tom Aldcroft Date: Mon, 18 Dec 2023 06:10:33 -0500 Subject: [PATCH] Factor checks out of ACAReviewTable class --- sparkles/checks.py | 469 ++++++++++++++++++++++++++++++++++ sparkles/core.py | 463 +-------------------------------- sparkles/roll_optimize.py | 6 +- sparkles/tests/test_checks.py | 66 ++--- 4 files changed, 511 insertions(+), 493 deletions(-) create mode 100644 sparkles/checks.py diff --git a/sparkles/checks.py b/sparkles/checks.py new file mode 100644 index 0000000..474b619 --- /dev/null +++ b/sparkles/checks.py @@ -0,0 +1,469 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +from itertools import combinations + +import numpy as np +import proseco.characteristics as ACA +from chandra_aca.transform import mag_to_count_rate, snr_mag_for_t_ccd + +# Observations with man_angle_next less than or equal to CREEP_AWAY_THRESHOLD +# are considered "creep away" observations. CREEP_AWAY_THRESHOLD is in units of degrees. +CREEP_AWAY_THRESHOLD = 5.0 + + +def check_catalog(self): + """Perform all star catalog checks.""" + for entry in self: + entry_type = entry["type"] + is_guide = entry_type in ("BOT", "GUI") + is_acq = entry_type in ("BOT", "ACQ") + is_fid = entry_type == "FID" + + if is_guide or is_fid: + check_guide_fid_position_on_ccd(self, entry) + + if is_guide: + star = self.guides.get_id(entry["id"]) + check_pos_err_guide(self, star) + check_imposters_guide(self, star) + check_too_bright_guide(self, star) + check_guide_is_candidate(self, star) + + if is_guide or is_acq: + check_bad_stars(self, entry) + + if is_fid: + fid = self.fids.get_id(entry["id"]) + check_fid_spoiler_score(self, entry["idx"], fid) + + check_guide_overlap(self) + check_guide_geometry(self) + check_acq_p2(self) + check_guide_count(self) + check_dither(self) + check_fid_count(self) + check_include_exclude(self) + + +def check_guide_overlap(self): + """Check for overlapping tracked items. + + Overlap is defined as within 12 pixels. + """ + ok = np.in1d(self["type"], ("GUI", "BOT", "FID", "MON")) + idxs = np.flatnonzero(ok) + for idx1, idx2 in combinations(idxs, 2): + entry1 = self[idx1] + entry2 = self[idx2] + drow = entry1["row"] - entry2["row"] + dcol = entry1["col"] - entry2["col"] + if np.abs(drow) <= 12 and np.abs(dcol) <= 12: + msg = ( + "Overlapping track index (within 12 pix) " + f'idx [{entry1["idx"]}] and idx [{entry2["idx"]}]' + ) + self.add_message("critical", msg) + + +def check_guide_geometry(self): + """Check for guide stars too tightly clustered. + + (1) Check for any set of n_guide-2 stars within 500" of each other. + The nominal check here is a cluster of 3 stars within 500". For + ERs this check is very unlikely to fail. For catalogs with only + 4 guide stars this will flag for any 2 nearby stars. + + This check will likely need some refinement. + + (2) Check for all stars being within 2500" of each other. + + """ + ok = np.in1d(self["type"], ("GUI", "BOT")) + guide_idxs = np.flatnonzero(ok) + n_guide = len(guide_idxs) + + if n_guide < 2: + msg = "Cannot check geometry with fewer than 2 guide stars" + self.add_message("critical", msg) + return + + def dist2(g1, g2): + out = (g1["yang"] - g2["yang"]) ** 2 + (g1["zang"] - g2["zang"]) ** 2 + return out + + # First check for any set of n_guide-2 stars within 500" of each other. + min_dist = 500 + min_dist2 = min_dist**2 + for idxs in combinations(guide_idxs, n_guide - 2): + for idx0, idx1 in combinations(idxs, 2): + # If any distance in this combination exceeds min_dist then + # the combination is OK. + if dist2(self[idx0], self[idx1]) > min_dist2: + break + else: + # Every distance was too small, issue a warning. + cat_idxs = [idx + 1 for idx in idxs] + msg = f'Guide indexes {cat_idxs} clustered within {min_dist}" radius' + + if self.man_angle_next > CREEP_AWAY_THRESHOLD: + msg += f" (man_angle_next > {CREEP_AWAY_THRESHOLD})" + self.add_message("critical", msg) + else: + msg += f" (man_angle_next <= {CREEP_AWAY_THRESHOLD})" + self.add_message("warning", msg) + + # Check for all stars within 2500" of each other + min_dist = 2500 + min_dist2 = min_dist**2 + for idx0, idx1 in combinations(guide_idxs, 2): + if dist2(self[idx0], self[idx1]) > min_dist2: + break + else: + msg = f'Guide stars all clustered within {min_dist}" radius' + self.add_message("warning", msg) + + +def check_guide_fid_position_on_ccd(self, entry): + """Check position of guide stars and fid lights on CCD.""" + # Shortcuts and translate y/z to yaw/pitch + dither_guide_y = self.dither_guide.y + dither_guide_p = self.dither_guide.z + + # Set "dither" for FID to be pseudodither of 5.0 to give 1 pix margin + # Set "track phase" dither for BOT GUI to max guide dither over + # interval or 20.0 if undefined. TO DO: hand the guide guide dither + dither_track_y = 5.0 if (entry["type"] == "FID") else dither_guide_y + dither_track_p = 5.0 if (entry["type"] == "FID") else dither_guide_p + + row_lim = ACA.max_ccd_row - ACA.CCD["window_pad"] + col_lim = ACA.max_ccd_col - ACA.CCD["window_pad"] + + def sign(axis): + """Return sign of the corresponding entry value. + + Note that np.sign returns 0 if the value is 0.0, not the right thing here. + """ + return -1 if (entry[axis] < 0) else 1 + + track_lims = { + "row": (row_lim - dither_track_y * ACA.ARC_2_PIX) * sign("row"), + "col": (col_lim - dither_track_p * ACA.ARC_2_PIX) * sign("col"), + } + + for axis in ("row", "col"): + track_delta = abs(track_lims[axis]) - abs(entry[axis]) + track_delta = np.round( + track_delta, decimals=1 + ) # Official check is to 1 decimal + for delta_lim, category in ((3.0, "critical"), (5.0, "info")): + if track_delta < delta_lim: + text = ( + f"Less than {delta_lim} pix edge margin {axis} " + f"lim {track_lims[axis]:.1f} " + f"val {entry[axis]:.1f} " + f"delta {track_delta:.1f}" + ) + self.add_message(category, text, idx=entry["idx"]) + break + + +# TO DO: acq star position check: +# For acq stars, the distance to the row/col padded limits are also confirmed, +# but code to track which boundary is exceeded (row or column) is not present. +# Note from above that the pix_row_pad used for row_lim has 7 more pixels of padding +# than the pix_col_pad used to determine col_lim. +# acq_edge_delta = min((row_lim - dither_acq_y / ang_per_pix) - abs(pixel_row), +# (col_lim - dither_acq_p / ang_per_pix) - abs(pixel_col)) +# if ((entry_type =~ /BOT|ACQ/) and (acq_edge_delta < (-1 * 12))){ +# push @orange_warn, sprintf "alarm [%2d] Acq Off (padded) CCD by > 60 arcsec.\n",i +# } +# elsif ((entry_type =~ /BOT|ACQ/) and (acq_edge_delta < 0)){ +# push @{self->{fyi}}, +# sprintf "alarm [%2d] Acq Off (padded) CCD (P_ACQ should be < .5)\n",i +# } + + +def check_acq_p2(self): + """Check acquisition catalog safing probability.""" + P2 = -np.log10(self.acqs.calc_p_safe()) + P2 = np.round(P2, decimals=2) # Official check is to 2 decimals + obs_type = "OR" if self.is_OR else "ER" + P2_lim = 2.0 if self.is_OR else 3.0 + if P2 < P2_lim: + self.add_message("critical", f"P2: {P2:.2f} less than {P2_lim} for {obs_type}") + elif P2 < P2_lim + 1: + self.add_message( + "warning", f"P2: {P2:.2f} less than {P2_lim + 1} for {obs_type}" + ) + + +def check_include_exclude(self): + """Check for included or excluded guide or acq stars or fids (info)""" + call_args = self.call_args + for typ in ("acq", "guide", "fid"): + for action in ("include", "exclude"): + ids = call_args.get(f"{action}_ids_{typ}") + if ids is not None: + msg = f"{action}d {typ} ID(s): {ids}" + + # Check for halfwidths. This really only applies to + # include_halfws_acq, but having it here in the loop doesn't hurt. + halfws = call_args.get(f"{action}_halfws_{typ}") + if halfws is not None: + msg = msg + f" halfwidths(s): {halfws}" + + self.add_message("info", msg) + + +def check_guide_count(self): + """ + Check for sufficient guide star fractional count. + + Also check for multiple very-bright stars + + """ + obs_type = "ER" if self.is_ER else "OR" + count_9th_lim = 3.0 + if self.is_ER and np.round(self.guide_count_9th, decimals=2) < count_9th_lim: + # Determine the threshold 9th mag equivalent value at the effective guide t_ccd + mag9 = snr_mag_for_t_ccd(self.guides.t_ccd, 9.0, -10.9) + self.add_message( + "critical", + ( + f"{obs_type} count of 9th ({mag9:.1f} for {self.guides.t_ccd:.1f}C)" + f" mag guide stars {self.guide_count_9th:.2f} < {count_9th_lim}" + ), + ) + + # Rounded guide count + guide_count_round = np.round(self.guide_count, decimals=2) + + # Set critical guide_count threshold + # For observations with creep-away in place as a mitigation for end of observation + # roll error, we can accept a lower guide_count (3.5 instead of 4.0). + # See https://occweb.cfa.harvard.edu/twiki/bin/view/Aspect/StarWorkingGroupMeeting2023x03x15 + if self.is_OR: + count_lim = 3.5 if (self.man_angle_next <= CREEP_AWAY_THRESHOLD) else 4.0 + else: + count_lim = 6.0 + + if guide_count_round < count_lim: + self.add_message( + "critical", + f"{obs_type} count of guide stars {self.guide_count:.2f} < {count_lim}", + ) + # If in the 3.5 to 4.0 range, this probably deserves a warning. + elif count_lim == 3.5 and guide_count_round < 4.0: + self.add_message( + "warning", + f"{obs_type} count of guide stars {self.guide_count:.2f} < 4.0", + ) + + bright_cnt_lim = 1 if self.is_OR else 3 + if np.count_nonzero(self.guides["mag"] < 5.5) > bright_cnt_lim: + self.add_message( + "caution", + f"{obs_type} with more than {bright_cnt_lim} stars brighter than 5.5.", + ) + + # Requested slots for guide stars and mon windows + n_guide_or_mon_request = self.call_args["n_guide"] + + # Actual guide stars + n_guide = len(self.guides) + + # Actual mon windows. For catalogs from pickles from proseco < 5.0 + # self.mons might be initialized to a NoneType or not be an attribute so + # handle that as 0 monitor windows. + try: + n_mon = len(self.mons) + except (TypeError, AttributeError): + n_mon = 0 + + # Different number of guide stars than requested + if n_guide + n_mon != n_guide_or_mon_request: + if n_mon == 0: + # Usual case + msg = ( + f"{obs_type} with {n_guide} guides " + f"but {n_guide_or_mon_request} were requested" + ) + else: + msg = ( + f"{obs_type} with {n_guide} guides and {n_mon} monitor(s) " + f"but {n_guide_or_mon_request} guides or mon slots were requested" + ) + self.add_message("caution", msg) + + # Caution for any "unusual" guide star request + typical_n_guide = 5 if self.is_OR else 8 + if n_guide_or_mon_request != typical_n_guide: + or_mon_slots = " or mon slots" if n_mon > 0 else "" + msg = ( + f"{obs_type} with" + f" {n_guide_or_mon_request} guides{or_mon_slots} requested but" + f" {typical_n_guide} is typical" + ) + self.add_message("caution", msg) + + +# Add a check that for ORs with guide count between 3.5 and 4.0, the +# dither is 4 arcsec if dynamic background not enabled. +def check_dither(self): + """Check dither. + + This presently checks that dither is 4x4 arcsec if dynamic background is not in + use and the field has a low guide_count. + """ + + # Skip check if guide_count is 4.0 or greater + if self.guide_count >= 4.0: + return + + # Skip check if dynamic backround is enabled (inferred from dyn_bgd_n_faint) + if self.dyn_bgd_n_faint > 0: + return + + # Check that dither is <= 4x4 arcsec + if self.dither_guide.y > 4.0 or self.dither_guide.z > 4.0: + self.add_message( + "critical", + f"guide_count {self.guide_count:.2f} and dither > 4x4 arcsec", + ) + + +def check_pos_err_guide(self, star): + """Warn on stars with larger POS_ERR (warning at 1" critical at 2")""" + agasc_id = star["id"] + idx = self.get_id(agasc_id)["idx"] + # POS_ERR is in milliarcsecs in the table + pos_err = star["POS_ERR"] * 0.001 + for limit, category in ((2.0, "critical"), (1.25, "warning")): + if np.round(pos_err, decimals=2) > limit: + self.add_message( + category, + ( + f"Guide star {agasc_id} POS_ERR {pos_err:.2f}, limit" + f" {limit} arcsec" + ), + idx=idx, + ) + break + + +def check_imposters_guide(self, star): + """Warn on stars with larger imposter centroid offsets""" + + # Borrow the imposter offset method from starcheck + def imposter_offset(cand_mag, imposter_mag): + """Get imposter offset. + + For a given candidate star and the pseudomagnitude of the brightest 2x2 + imposter calculate the max offset of the imposter counts are at the edge of + the 6x6 (as if they were in one pixel). This is somewhat the inverse of + proseco.get_pixmag_for_offset. + """ + cand_counts = mag_to_count_rate(cand_mag) + spoil_counts = mag_to_count_rate(imposter_mag) + return spoil_counts * 3 * 5 / (spoil_counts + cand_counts) + + agasc_id = star["id"] + idx = self.get_id(agasc_id)["idx"] + offset = imposter_offset(star["mag"], star["imp_mag"]) + for limit, category in ((4.0, "critical"), (2.5, "warning")): + if np.round(offset, decimals=1) > limit: + self.add_message( + category, + f"Guide star imposter offset {offset:.1f}, limit {limit} arcsec", + idx=idx, + ) + break + + +def check_guide_is_candidate(self, star): + """Critical for guide star that is not a valid guide candidate. + + This can occur for a manually included guide star. In rare cases + the star may still be acceptable and ACA review can accept the warning. + """ + if not self.guides.get_candidates_mask(star): + agasc_id = star["id"] + idx = self.get_id(agasc_id)["idx"] + self.add_message( + "critical", + f"Guide star {agasc_id} does not meet guide candidate criteria", + idx=idx, + ) + + +def check_too_bright_guide(self, star): + """Warn on guide stars that may be too bright. + + - Critical if within 2 * mag_err of the hard 5.2 limit, caution within 3 * mag_err + + """ + agasc_id = star["id"] + idx = self.get_id(agasc_id)["idx"] + mag_err = star["mag_err"] + for mult, category in ((2, "critical"), (3, "caution")): + if star["mag"] - (mult * mag_err) < 5.2: + self.add_message( + category, + ( + f"Guide star {agasc_id} within {mult}*mag_err of 5.2 " + f"(mag_err={mag_err:.2f})" + ), + idx=idx, + ) + break + + +def check_bad_stars(self, entry): + """Check if entry (guide or acq) is in bad star set from proseco + + :param entry: ACAReviewTable row + :return: None + """ + if entry["id"] in ACA.bad_star_set: + msg = f'Star {entry["id"]} is in proseco bad star set' + self.add_message("critical", msg, idx=entry["idx"]) + + +def check_fid_spoiler_score(self, idx, fid): + """ + Check the spoiler warnings for fid + + :param idx: catalog index of fid entry being checked + :param fid: corresponding row of ``fids`` table + :return: None + """ + if fid["spoiler_score"] == 0: + return + + fid_id = fid["id"] + category_map = {"yellow": "warning", "red": "critical"} + + for spoiler in fid["spoilers"]: + msg = ( + f'Fid {fid_id} has {spoiler["warn"]} spoiler: star {spoiler["id"]} with' + f' mag {spoiler["mag"]:.2f}' + ) + self.add_message(category_map[spoiler["warn"]], msg, idx=idx) + + +def check_fid_count(self): + """ + Check for the correct number of fids. + + :return: None + """ + obs_type = "ER" if self.is_ER else "OR" + + if len(self.fids) != self.n_fid: + msg = f"{obs_type} has {len(self.fids)} fids but {self.n_fid} were requested" + self.add_message("critical", msg) + + # Check for "typical" number of fids for an OR / ER (3 or 0) + typical_n_fid = 3 if self.is_OR else 0 + if self.n_fid != typical_n_fid: + msg = f"{obs_type} requested {self.n_fid} fids but {typical_n_fid} is typical" + self.add_message("caution", msg) diff --git a/sparkles/core.py b/sparkles/core.py index 6f2600d..938a6ee 100644 --- a/sparkles/core.py +++ b/sparkles/core.py @@ -9,21 +9,21 @@ import pprint import re import traceback -from itertools import chain, combinations +from itertools import chain from pathlib import Path import chandra_aca import numpy as np import proseco -import proseco.characteristics as ACA from astropy.table import Column, Table from chandra_aca.star_probs import guide_count -from chandra_aca.transform import mag_to_count_rate, snr_mag_for_t_ccd, yagzag_to_pixels +from chandra_aca.transform import yagzag_to_pixels from jinja2 import Template from proseco.catalog import ACATable from proseco.core import MetaAttribute -from .roll_optimize import RollOptimizeMixin +from sparkles.checks import check_catalog +from sparkles.roll_optimize import RollOptimizeMixin CACHE = {} FILEDIR = Path(__file__).parent @@ -35,10 +35,6 @@ # dyn bgd. MIN_DYN_BGD_ANCHOR_STARS = 3 -# Observations with man_angle_next less than or equal to CREEP_AWAY_THRESHOLD -# are considered "creep away" observations. CREEP_AWAY_THRESHOLD is in units of degrees. -CREEP_AWAY_THRESHOLD = 5.0 - def get_t_ccds_bonus(mags, t_ccd, dyn_bgd_n_faint, dyn_bgd_dt_ccd): """Return array of t_ccds with dynamic background bonus applied. @@ -365,7 +361,7 @@ def _run_aca_review( aca.dyn_bgd_n_faint = dyn_bgd_n_faint aca.guides.dyn_bgd_n_faint = dyn_bgd_n_faint - aca.check_catalog() + check_catalog(aca) # Find roll options if requested if roll_level == "all" or aca.messages >= roll_level: @@ -1140,173 +1136,6 @@ def add_row_col(self): self.add_column(Column(row, name="row"), index=index) self.add_column(Column(col, name="col"), index=index + 1) - def check_catalog(self): - """Perform all star catalog checks.""" - for entry in self: - entry_type = entry["type"] - is_guide = entry_type in ("BOT", "GUI") - is_acq = entry_type in ("BOT", "ACQ") - is_fid = entry_type == "FID" - - if is_guide or is_fid: - self.check_guide_fid_position_on_ccd(entry) - - if is_guide: - star = self.guides.get_id(entry["id"]) - self.check_pos_err_guide(star) - self.check_imposters_guide(star) - self.check_too_bright_guide(star) - self.check_guide_is_candidate(star) - - if is_guide or is_acq: - self.check_bad_stars(entry) - - if is_fid: - fid = self.fids.get_id(entry["id"]) - self.check_fid_spoiler_score(entry["idx"], fid) - - self.check_guide_overlap() - self.check_guide_geometry() - self.check_acq_p2() - self.check_guide_count() - self.check_dither() - self.check_fid_count() - self.check_include_exclude() - - def check_guide_overlap(self): - """Check for overlapping tracked items. - - Overlap is defined as within 12 pixels. - """ - ok = np.in1d(self["type"], ("GUI", "BOT", "FID", "MON")) - idxs = np.flatnonzero(ok) - for idx1, idx2 in combinations(idxs, 2): - entry1 = self[idx1] - entry2 = self[idx2] - drow = entry1["row"] - entry2["row"] - dcol = entry1["col"] - entry2["col"] - if np.abs(drow) <= 12 and np.abs(dcol) <= 12: - msg = ( - "Overlapping track index (within 12 pix) " - f'idx [{entry1["idx"]}] and idx [{entry2["idx"]}]' - ) - self.add_message("critical", msg) - - def check_guide_geometry(self): - """Check for guide stars too tightly clustered. - - (1) Check for any set of n_guide-2 stars within 500" of each other. - The nominal check here is a cluster of 3 stars within 500". For - ERs this check is very unlikely to fail. For catalogs with only - 4 guide stars this will flag for any 2 nearby stars. - - This check will likely need some refinement. - - (2) Check for all stars being within 2500" of each other. - - """ - ok = np.in1d(self["type"], ("GUI", "BOT")) - guide_idxs = np.flatnonzero(ok) - n_guide = len(guide_idxs) - - if n_guide < 2: - msg = "Cannot check geometry with fewer than 2 guide stars" - self.add_message("critical", msg) - return - - def dist2(g1, g2): - out = (g1["yang"] - g2["yang"]) ** 2 + (g1["zang"] - g2["zang"]) ** 2 - return out - - # First check for any set of n_guide-2 stars within 500" of each other. - min_dist = 500 - min_dist2 = min_dist**2 - for idxs in combinations(guide_idxs, n_guide - 2): - for idx0, idx1 in combinations(idxs, 2): - # If any distance in this combination exceeds min_dist then - # the combination is OK. - if dist2(self[idx0], self[idx1]) > min_dist2: - break - else: - # Every distance was too small, issue a warning. - cat_idxs = [idx + 1 for idx in idxs] - msg = f'Guide indexes {cat_idxs} clustered within {min_dist}" radius' - - if self.man_angle_next > CREEP_AWAY_THRESHOLD: - msg += f" (man_angle_next > {CREEP_AWAY_THRESHOLD})" - self.add_message("critical", msg) - else: - msg += f" (man_angle_next <= {CREEP_AWAY_THRESHOLD})" - self.add_message("warning", msg) - - # Check for all stars within 2500" of each other - min_dist = 2500 - min_dist2 = min_dist**2 - for idx0, idx1 in combinations(guide_idxs, 2): - if dist2(self[idx0], self[idx1]) > min_dist2: - break - else: - msg = f'Guide stars all clustered within {min_dist}" radius' - self.add_message("warning", msg) - - def check_guide_fid_position_on_ccd(self, entry): - """Check position of guide stars and fid lights on CCD.""" - # Shortcuts and translate y/z to yaw/pitch - dither_guide_y = self.dither_guide.y - dither_guide_p = self.dither_guide.z - - # Set "dither" for FID to be pseudodither of 5.0 to give 1 pix margin - # Set "track phase" dither for BOT GUI to max guide dither over - # interval or 20.0 if undefined. TO DO: hand the guide guide dither - dither_track_y = 5.0 if (entry["type"] == "FID") else dither_guide_y - dither_track_p = 5.0 if (entry["type"] == "FID") else dither_guide_p - - row_lim = ACA.max_ccd_row - ACA.CCD["window_pad"] - col_lim = ACA.max_ccd_col - ACA.CCD["window_pad"] - - def sign(axis): - """Return sign of the corresponding entry value. - - Note that np.sign returns 0 if the value is 0.0, not the right thing here. - """ - return -1 if (entry[axis] < 0) else 1 - - track_lims = { - "row": (row_lim - dither_track_y * ACA.ARC_2_PIX) * sign("row"), - "col": (col_lim - dither_track_p * ACA.ARC_2_PIX) * sign("col"), - } - - for axis in ("row", "col"): - track_delta = abs(track_lims[axis]) - abs(entry[axis]) - track_delta = np.round( - track_delta, decimals=1 - ) # Official check is to 1 decimal - for delta_lim, category in ((3.0, "critical"), (5.0, "info")): - if track_delta < delta_lim: - text = ( - f"Less than {delta_lim} pix edge margin {axis} " - f"lim {track_lims[axis]:.1f} " - f"val {entry[axis]:.1f} " - f"delta {track_delta:.1f}" - ) - self.add_message(category, text, idx=entry["idx"]) - break - - # TO DO: acq star position check: - # For acq stars, the distance to the row/col padded limits are also confirmed, - # but code to track which boundary is exceeded (row or column) is not present. - # Note from above that the pix_row_pad used for row_lim has 7 more pixels of padding - # than the pix_col_pad used to determine col_lim. - # acq_edge_delta = min((row_lim - dither_acq_y / ang_per_pix) - abs(pixel_row), - # (col_lim - dither_acq_p / ang_per_pix) - abs(pixel_col)) - # if ((entry_type =~ /BOT|ACQ/) and (acq_edge_delta < (-1 * 12))){ - # push @orange_warn, sprintf "alarm [%2d] Acq Off (padded) CCD by > 60 arcsec.\n",i - # } - # elsif ((entry_type =~ /BOT|ACQ/) and (acq_edge_delta < 0)){ - # push @{self->{fyi}}, - # sprintf "alarm [%2d] Acq Off (padded) CCD (P_ACQ should be < .5)\n",i - # } - def add_message(self, category, text, **kwargs): r"""Add message to internal messages list. @@ -1319,288 +1148,6 @@ def add_message(self, category, text, **kwargs): message.update(kwargs) self.messages.append(message) - def check_acq_p2(self): - """Check acquisition catalog safing probability.""" - P2 = -np.log10(self.acqs.calc_p_safe()) - P2 = np.round(P2, decimals=2) # Official check is to 2 decimals - obs_type = "OR" if self.is_OR else "ER" - P2_lim = 2.0 if self.is_OR else 3.0 - if P2 < P2_lim: - self.add_message( - "critical", f"P2: {P2:.2f} less than {P2_lim} for {obs_type}" - ) - elif P2 < P2_lim + 1: - self.add_message( - "warning", f"P2: {P2:.2f} less than {P2_lim + 1} for {obs_type}" - ) - - def check_include_exclude(self): - """Check for included or excluded guide or acq stars or fids (info)""" - call_args = self.call_args - for typ in ("acq", "guide", "fid"): - for action in ("include", "exclude"): - ids = call_args.get(f"{action}_ids_{typ}") - if ids is not None: - msg = f"{action}d {typ} ID(s): {ids}" - - # Check for halfwidths. This really only applies to - # include_halfws_acq, but having it here in the loop doesn't hurt. - halfws = call_args.get(f"{action}_halfws_{typ}") - if halfws is not None: - msg = msg + f" halfwidths(s): {halfws}" - - self.add_message("info", msg) - - def check_guide_count(self): - """ - Check for sufficient guide star fractional count. - - Also check for multiple very-bright stars - - """ - obs_type = "ER" if self.is_ER else "OR" - count_9th_lim = 3.0 - if self.is_ER and np.round(self.guide_count_9th, decimals=2) < count_9th_lim: - # Determine the threshold 9th mag equivalent value at the effective guide t_ccd - mag9 = snr_mag_for_t_ccd(self.guides.t_ccd, 9.0, -10.9) - self.add_message( - "critical", - ( - f"{obs_type} count of 9th ({mag9:.1f} for {self.guides.t_ccd:.1f}C)" - f" mag guide stars {self.guide_count_9th:.2f} < {count_9th_lim}" - ), - ) - - # Rounded guide count - guide_count_round = np.round(self.guide_count, decimals=2) - - # Set critical guide_count threshold - # For observations with creep-away in place as a mitigation for end of observation - # roll error, we can accept a lower guide_count (3.5 instead of 4.0). - # See https://occweb.cfa.harvard.edu/twiki/bin/view/Aspect/StarWorkingGroupMeeting2023x03x15 - if self.is_OR: - count_lim = 3.5 if (self.man_angle_next <= CREEP_AWAY_THRESHOLD) else 4.0 - else: - count_lim = 6.0 - - if guide_count_round < count_lim: - self.add_message( - "critical", - f"{obs_type} count of guide stars {self.guide_count:.2f} < {count_lim}", - ) - # If in the 3.5 to 4.0 range, this probably deserves a warning. - elif count_lim == 3.5 and guide_count_round < 4.0: - self.add_message( - "warning", - f"{obs_type} count of guide stars {self.guide_count:.2f} < 4.0", - ) - - bright_cnt_lim = 1 if self.is_OR else 3 - if np.count_nonzero(self.guides["mag"] < 5.5) > bright_cnt_lim: - self.add_message( - "caution", - f"{obs_type} with more than {bright_cnt_lim} stars brighter than 5.5.", - ) - - # Requested slots for guide stars and mon windows - n_guide_or_mon_request = self.call_args["n_guide"] - - # Actual guide stars - n_guide = len(self.guides) - - # Actual mon windows. For catalogs from pickles from proseco < 5.0 - # self.mons might be initialized to a NoneType or not be an attribute so - # handle that as 0 monitor windows. - try: - n_mon = len(self.mons) - except (TypeError, AttributeError): - n_mon = 0 - - # Different number of guide stars than requested - if n_guide + n_mon != n_guide_or_mon_request: - if n_mon == 0: - # Usual case - msg = ( - f"{obs_type} with {n_guide} guides " - f"but {n_guide_or_mon_request} were requested" - ) - else: - msg = ( - f"{obs_type} with {n_guide} guides and {n_mon} monitor(s) " - f"but {n_guide_or_mon_request} guides or mon slots were requested" - ) - self.add_message("caution", msg) - - # Caution for any "unusual" guide star request - typical_n_guide = 5 if self.is_OR else 8 - if n_guide_or_mon_request != typical_n_guide: - or_mon_slots = " or mon slots" if n_mon > 0 else "" - msg = ( - f"{obs_type} with" - f" {n_guide_or_mon_request} guides{or_mon_slots} requested but" - f" {typical_n_guide} is typical" - ) - self.add_message("caution", msg) - - # Add a check that for ORs with guide count between 3.5 and 4.0, the - # dither is 4 arcsec if dynamic background not enabled. - def check_dither(self): - """Check dither. - - This presently checks that dither is 4x4 arcsec if dynamic background is not in - use and the field has a low guide_count. - """ - - # Skip check if guide_count is 4.0 or greater - if self.guide_count >= 4.0: - return - - # Skip check if dynamic backround is enabled (inferred from dyn_bgd_n_faint) - if self.dyn_bgd_n_faint > 0: - return - - # Check that dither is <= 4x4 arcsec - if self.dither_guide.y > 4.0 or self.dither_guide.z > 4.0: - self.add_message( - "critical", - f"guide_count {self.guide_count:.2f} and dither > 4x4 arcsec", - ) - - def check_pos_err_guide(self, star): - """Warn on stars with larger POS_ERR (warning at 1" critical at 2")""" - agasc_id = star["id"] - idx = self.get_id(agasc_id)["idx"] - # POS_ERR is in milliarcsecs in the table - pos_err = star["POS_ERR"] * 0.001 - for limit, category in ((2.0, "critical"), (1.25, "warning")): - if np.round(pos_err, decimals=2) > limit: - self.add_message( - category, - ( - f"Guide star {agasc_id} POS_ERR {pos_err:.2f}, limit" - f" {limit} arcsec" - ), - idx=idx, - ) - break - - def check_imposters_guide(self, star): - """Warn on stars with larger imposter centroid offsets""" - - # Borrow the imposter offset method from starcheck - def imposter_offset(cand_mag, imposter_mag): - """Get imposter offset. - - For a given candidate star and the pseudomagnitude of the brightest 2x2 - imposter calculate the max offset of the imposter counts are at the edge of - the 6x6 (as if they were in one pixel). This is somewhat the inverse of - proseco.get_pixmag_for_offset. - """ - cand_counts = mag_to_count_rate(cand_mag) - spoil_counts = mag_to_count_rate(imposter_mag) - return spoil_counts * 3 * 5 / (spoil_counts + cand_counts) - - agasc_id = star["id"] - idx = self.get_id(agasc_id)["idx"] - offset = imposter_offset(star["mag"], star["imp_mag"]) - for limit, category in ((4.0, "critical"), (2.5, "warning")): - if np.round(offset, decimals=1) > limit: - self.add_message( - category, - f"Guide star imposter offset {offset:.1f}, limit {limit} arcsec", - idx=idx, - ) - break - - def check_guide_is_candidate(self, star): - """Critical for guide star that is not a valid guide candidate. - - This can occur for a manually included guide star. In rare cases - the star may still be acceptable and ACA review can accept the warning. - """ - if not self.guides.get_candidates_mask(star): - agasc_id = star["id"] - idx = self.get_id(agasc_id)["idx"] - self.add_message( - "critical", - f"Guide star {agasc_id} does not meet guide candidate criteria", - idx=idx, - ) - - def check_too_bright_guide(self, star): - """Warn on guide stars that may be too bright. - - - Critical if within 2 * mag_err of the hard 5.2 limit, caution within 3 * mag_err - - """ - agasc_id = star["id"] - idx = self.get_id(agasc_id)["idx"] - mag_err = star["mag_err"] - for mult, category in ((2, "critical"), (3, "caution")): - if star["mag"] - (mult * mag_err) < 5.2: - self.add_message( - category, - ( - f"Guide star {agasc_id} within {mult}*mag_err of 5.2 " - f"(mag_err={mag_err:.2f})" - ), - idx=idx, - ) - break - - def check_bad_stars(self, entry): - """Check if entry (guide or acq) is in bad star set from proseco - - :param entry: ACAReviewTable row - :return: None - """ - if entry["id"] in ACA.bad_star_set: - msg = f'Star {entry["id"]} is in proseco bad star set' - self.add_message("critical", msg, idx=entry["idx"]) - - def check_fid_spoiler_score(self, idx, fid): - """ - Check the spoiler warnings for fid - - :param idx: catalog index of fid entry being checked - :param fid: corresponding row of ``fids`` table - :return: None - """ - if fid["spoiler_score"] == 0: - return - - fid_id = fid["id"] - category_map = {"yellow": "warning", "red": "critical"} - - for spoiler in fid["spoilers"]: - msg = ( - f'Fid {fid_id} has {spoiler["warn"]} spoiler: star {spoiler["id"]} with' - f' mag {spoiler["mag"]:.2f}' - ) - self.add_message(category_map[spoiler["warn"]], msg, idx=idx) - - def check_fid_count(self): - """ - Check for the correct number of fids. - - :return: None - """ - obs_type = "ER" if self.is_ER else "OR" - - if len(self.fids) != self.n_fid: - msg = ( - f"{obs_type} has {len(self.fids)} fids but {self.n_fid} were requested" - ) - self.add_message("critical", msg) - - # Check for "typical" number of fids for an OR / ER (3 or 0) - typical_n_fid = 3 if self.is_OR else 0 - if self.n_fid != typical_n_fid: - msg = ( - f"{obs_type} requested {self.n_fid} fids but {typical_n_fid} is typical" - ) - self.add_message("caution", msg) - @classmethod def from_ocat(cls, obsid, t_ccd=-5, man_angle=5, date=None, roll=None, **kwargs): """Return an AcaReviewTable object using OCAT to specify key information. diff --git a/sparkles/roll_optimize.py b/sparkles/roll_optimize.py index 5e6b836..5da210a 100644 --- a/sparkles/roll_optimize.py +++ b/sparkles/roll_optimize.py @@ -20,6 +20,8 @@ from proseco.characteristics import CCD from Quaternion import Quat +from sparkles.checks import check_catalog + def logical_intervals(vals, x=None): """Determine contiguous intervals during which ``vals`` is True. @@ -359,7 +361,7 @@ def get_roll_options( # Special case, first roll option is self but with obsid set to roll acar = deepcopy(self) - acar.check_catalog() + check_catalog(acar) acar.is_roll_option = True roll_options = [ { @@ -414,7 +416,7 @@ def get_roll_options( acar = self.__class__(aca_rolled, obsid=self.obsid, is_roll_option=True) # Do the review and set up messages attribute - acar.check_catalog() + check_catalog(acar) roll_option = { "acar": acar, diff --git a/sparkles/tests/test_checks.py b/sparkles/tests/test_checks.py index a2bc50c..5603ca3 100644 --- a/sparkles/tests/test_checks.py +++ b/sparkles/tests/test_checks.py @@ -14,7 +14,7 @@ from proseco.tests.test_common import DARK40, STD_INFO, mod_std_info from Quaternion import Quat -from sparkles import ACAReviewTable, get_t_ccds_bonus +from sparkles import ACAReviewTable, checks, get_t_ccds_bonus def test_check_slice_index(): @@ -38,7 +38,7 @@ def test_check_P2(): acar = ACAReviewTable(aca) # Check P2 for an OR (default obsid=0) - acar.check_acq_p2() + checks.check_acq_p2(acar) assert len(acar.messages) == 1 msg = acar.messages[0] assert msg["category"] == "critical" @@ -54,7 +54,7 @@ def test_check_P2(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_acq_p2() + checks.check_acq_p2(acar) assert len(acar.messages) == 1 msg = acar.messages[0] assert msg["category"] == "critical" @@ -73,7 +73,7 @@ def test_n_guide_check_not_enough_stars(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "OR with 4 guides but 5 were requested", "category": "caution"} ] @@ -95,7 +95,7 @@ def test_guide_is_candidate(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_catalog() + checks.check_catalog(acar) assert acar.messages == [ { "text": "Guide star 100 does not meet guide candidate criteria", @@ -118,7 +118,7 @@ def test_n_guide_check_atypical_request(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "OR with 4 guides requested but 5 is typical", "category": "caution"} ] @@ -139,7 +139,7 @@ def test_n_guide_mon_check_atypical_request(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ { "text": "OR with 6 guides or mon slots requested but 5 is typical", @@ -195,7 +195,7 @@ def test_n_guide_too_few_guide_or_mon(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ { "category": "caution", @@ -224,7 +224,7 @@ def test_guide_count_er1(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert len(acar.messages) == 1 msg = acar.messages[0] assert msg["category"] == "critical" @@ -242,7 +242,7 @@ def test_guide_count_er2(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "ER count of guide stars 3.00 < 6.0", "category": "critical"}, {"text": "ER with 3 guides but 8 were requested", "category": "caution"}, @@ -261,7 +261,7 @@ def test_guide_count_er3(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "ER with 6 guides but 8 were requested", "category": "caution"} ] @@ -282,7 +282,7 @@ def test_guide_count_er4(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "ER with 6 guides but 8 were requested", "category": "caution"} ] @@ -304,7 +304,7 @@ def test_include_exclude(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_include_exclude() + checks.check_include_exclude(acar) assert acar.messages == [ { "category": "info", @@ -330,7 +330,7 @@ def test_guide_count_er5(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "ER with more than 3 stars brighter than 5.5.", "category": "caution"}, {"text": "ER with 6 guides but 8 were requested", "category": "caution"}, @@ -348,7 +348,7 @@ def test_guide_count_or(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "OR count of guide stars 2.00 < 4.0", "category": "critical"}, {"text": "OR with 2 guides but 5 were requested", "category": "caution"}, @@ -368,7 +368,7 @@ def test_ok_number_bright_guide_stars(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "OR with 4 guides but 5 were requested", "category": "caution"} ] @@ -388,7 +388,7 @@ def test_too_many_bright_stars(): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_guide_count() + checks.check_guide_count(acar) assert len(acar.messages) == 1 msg = acar.messages[0] assert msg["category"] == "caution" @@ -412,7 +412,7 @@ def test_low_guide_count(): # Confirm the guide_count is in the range we want for the test to be valid assert acar.guide_count <= 4.0 and acar.guide_count > 3.5 assert acar.man_angle_next > 5 - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "OR count of guide stars 3.65 < 4.0", "category": "critical"}, {"text": "OR with 4 guides but 5 were requested", "category": "caution"}, @@ -436,7 +436,7 @@ def test_low_guide_count_creep_away(): acar = ACAReviewTable(aca) # Confirm the guide_count is in the range we want for the test to be valid assert acar.guide_count <= 4.0 and acar.guide_count > 3.5 - acar.check_guide_count() + checks.check_guide_count(acar) assert acar.messages == [ {"text": "OR count of guide stars 3.65 < 4.0", "category": "warning"}, {"text": "OR with 4 guides but 5 were requested", "category": "caution"}, @@ -463,7 +463,7 @@ def test_reduced_dither_low_guide_count(): assert acar.guide_count <= 4.0 and acar.guide_count > 3.5 # Run the dither check - acar.check_dither() + checks.check_dither(acar) assert len(acar.messages) == 0 @@ -525,7 +525,7 @@ def test_not_reduced_dither_low_guide_count(): assert acar.guide_count <= 4.0 and acar.guide_count > 3.5 # Run the dither check - acar.check_dither() + checks.check_dither(acar) assert acar.messages == [ {"text": "guide_count 3.65 and dither > 4x4 arcsec", "category": "critical"} ] @@ -550,7 +550,7 @@ def test_not_reduced_dither_low_guide_count_dyn_bgd(): acar = ACAReviewTable(aca) # Confirm the guide_count is in the range we want for the test to be valid assert acar.guide_count <= 4.0 and acar.guide_count > 3.5 - acar.check_dither() + checks.check_dither(acar) assert len(acar.messages) == 0 @@ -583,7 +583,7 @@ def test_pos_err_on_guide(): # Run pos err checks for guide in aca.guides: - acar.check_pos_err_guide(guide) + checks.check_pos_err_guide(acar, guide) assert len(acar.messages) == 2 msg = acar.messages[0] @@ -646,7 +646,7 @@ def test_guide_edge_check(): include_ids_guide=np.arange(1, 7), ) acar = ACAReviewTable(aca) - acar.check_catalog() + checks.check_catalog(acar) assert acar.messages == [ { @@ -696,7 +696,7 @@ def test_imposters_on_guide(exp_warn): raise_exc=True, ) acar = ACAReviewTable(aca) - acar.check_imposters_guide(aca.guides.get_id(110)) + checks.check_imposters_guide(acar, aca.guides.get_id(110)) if exp_warn: assert len(acar.messages) == 1 msg = acar.messages[0] @@ -718,7 +718,7 @@ def test_bad_star_set(proseco_agasc_1p7): include_ids_guide=[bad_id], ) acar = ACAReviewTable(aca) - acar.check_catalog() + checks.check_catalog(acar) assert acar.messages == [ { "text": "Guide star 1248994952 does not meet guide candidate criteria", @@ -747,7 +747,7 @@ def test_too_bright_guide_magerr(): **mod_std_info(n_fid=0), stars=stars, dark=DARK40, raise_exc=True ) acar = ACAReviewTable(aca) - acar.check_too_bright_guide(aca.guides.get_id(100)) + checks.check_too_bright_guide(acar, aca.guides.get_id(100)) msg = acar.messages[0] assert msg["category"] == "critical" assert "2*mag_err of 5.2" in msg["text"] @@ -769,7 +769,7 @@ def test_check_fid_spoiler_score(): assert np.all(aca.fids.cand_fids["spoiler_score"] == [4, 4, 4, 4, 1, 0]) acar = ACAReviewTable(aca) - acar.check_catalog() + checks.check_catalog(acar) assert acar.messages == [ { "text": "Fid 1 has red spoiler: star 108 with mag 9.00", @@ -793,7 +793,7 @@ def test_check_fid_count1(): stars=stars, **mod_std_info(detector="HRC-S", sim_offset=40000) ) acar = ACAReviewTable(aca) - acar.check_catalog() + checks.check_catalog(acar) assert acar.messages == [ {"text": "OR has 2 fids but 3 were requested", "category": "critical"} @@ -807,7 +807,7 @@ def test_check_fid_count2(): aca = get_aca_catalog(stars=stars, **mod_std_info(detector="HRC-S", n_fid=2)) acar = ACAReviewTable(aca) - acar.check_catalog() + checks.check_catalog(acar) assert acar.messages == [ {"text": "OR requested 2 fids but 3 is typical", "category": "caution"} @@ -829,7 +829,7 @@ def test_check_guide_geometry(): aca = get_aca_catalog(**STD_INFO, stars=stars, dark=DARK40) acar = aca.get_review_table() - acar.check_guide_geometry() + checks.check_guide_geometry(acar) if fail: assert len(acar.messages) == 1 msg = acar.messages[0] @@ -847,7 +847,7 @@ def test_check_guide_geometry(): stars.add_fake_star(yang=y * size, zang=z * size, mag=7.0) aca = get_aca_catalog(**STD_INFO, stars=stars, dark=DARK40) acar = aca.get_review_table() - acar.check_guide_geometry() + checks.check_guide_geometry(acar) assert len(acar.messages) == 1 msg = acar.messages[0] @@ -866,7 +866,7 @@ def test_check_guide_geometry(): aca = get_aca_catalog(**STD_INFO, man_angle_next=5.0, stars=stars, dark=DARK40) acar = aca.get_review_table() - acar.check_guide_geometry() + checks.check_guide_geometry(acar) assert len(acar.messages) == 1 msg = acar.messages[0]