Skip to content

Commit

Permalink
Merge pull request #37 from legend-exp/dev
Browse files Browse the repository at this point in the history
Updates for production cycle `v1.1`
  • Loading branch information
ggmarshall authored Apr 29, 2024
2 parents a61804a + 90a38fa commit b26a39e
Show file tree
Hide file tree
Showing 39 changed files with 4,633 additions and 1,087 deletions.
91 changes: 54 additions & 37 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This includes:
- the same for partition level tiers
"""

import pathlib, os, json, sys
import pathlib, os, json, sys, glob
import scripts.util as ds
from scripts.util.pars_loading import pars_catalog
from scripts.util.patterns import get_pattern_tier_raw
Expand All @@ -21,6 +21,8 @@ from scripts.util.utils import (
chan_map_path,
filelist_path,
metadata_path,
tmp_log_path,
pars_path,
)
from datetime import datetime
from collections import OrderedDict
Expand All @@ -41,68 +43,59 @@ part = ds.dataset_file(setup, os.path.join(configs, "partitions.json"))
basedir = workflow.basedir


include: "rules/common.smk"
include: "rules/main.smk"


localrules:
gen_filelist,
autogen_output,


ds.pars_key_resolve.write_par_catalog(
["-*-*-*-cal"],
os.path.join(pars_path(setup), "pht", "validity.jsonl"),
get_pattern_tier_raw(setup),
{"cal": ["par_pht"], "lar": ["par_pht"]},
)
wildcard_constraints:
experiment="\w+",
period="p\d{2}",
run="r\d{3}",
datatype="\w{3}",
timestamp="\d{8}T\d{6}Z",


include: "rules/common.smk"
include: "rules/main.smk"
include: "rules/tcm.smk"
include: "rules/dsp.smk"
include: "rules/psp.smk"
include: "rules/hit.smk"
include: "rules/pht.smk"
include: "rules/evt.smk"
include: "rules/skm.smk"
include: "rules/blinding_calibration.smk"
include: "rules/qc_phy.smk"


localrules:
gen_filelist,
autogen_output,


onstart:
print("Starting workflow")
shell(f"rm {pars_path(setup)}/dsp/validity.jsonl || true")
shell(f"rm {pars_path(setup)}/hit/validity.jsonl || true")
shell(f"rm {pars_path(setup)}/pht/validity.jsonl || true")
shell(f"rm {pars_path(setup)}/raw/validity.jsonl || true")
ds.pars_key_resolve.write_par_catalog(
["-*-*-*-cal"],
os.path.join(pars_path(setup), "raw", "validity.jsonl"),
get_pattern_tier_raw(setup),
{"cal": ["par_raw"]},
)
ds.pars_key_resolve.write_par_catalog(
["-*-*-*-cal"],
os.path.join(pars_path(setup), "dsp", "validity.jsonl"),
get_pattern_tier_raw(setup),
{"cal": ["par_dsp"], "lar": ["par_dsp"]},
)
if os.path.isfile(os.path.join(pars_path(setup), "hit", "validity.jsonl")):
os.remove(os.path.join(pars_path(setup), "hit", "validity.jsonl"))


ds.pars_key_resolve.write_par_catalog(
["-*-*-*-cal"],
os.path.join(pars_path(setup), "hit", "validity.jsonl"),
get_pattern_tier_raw(setup),
{"cal": ["par_hit"], "lar": ["par_hit"]},
)

if os.path.isfile(os.path.join(pars_path(setup), "dsp", "validity.jsonl")):
os.remove(os.path.join(pars_path(setup), "dsp", "validity.jsonl"))
ds.pars_key_resolve.write_par_catalog(
["-*-*-*-cal"],
os.path.join(pars_path(setup), "pht", "validity.jsonl"),
os.path.join(pars_path(setup), "dsp", "validity.jsonl"),
get_pattern_tier_raw(setup),
{"cal": ["par_pht"], "lar": ["par_pht"]},
{"cal": ["par_dsp"], "lar": ["par_dsp"]},
)


onsuccess:
from snakemake.report import auto_report

rep_dir = f"{log_path(setup)}/report-{datetime.strftime(datetime.utcnow(), '%Y%m%dT%H%M%SZ')}"
rep_dir = f"{log_path(setup)}/report-{datetime.strftime(datetime.utcnow() , '%Y%m%dT%H%M%SZ')}"
pathlib.Path(rep_dir).mkdir(parents=True, exist_ok=True)
# auto_report(workflow.persistence.dag, f"{rep_dir}/report.html")
with open(os.path.join(rep_dir, "dag.txt"), "w") as f:
Expand All @@ -112,8 +105,32 @@ onsuccess:
f.writelines(str(workflow.persistence.dag.rule_dot()))
# shell(f"cat {rep_dir}/rg.txt | dot -Tpdf > {rep_dir}/rg.pdf")
print("Workflow finished, no error")
shell("rm *.gen || true")
shell(f"rm {filelist_path(setup)}/* || true")

# remove .gen files
files = glob.glob("*.gen")
for file in files:
if os.path.isfile(file):
os.remove(file)

# remove filelists
files = glob.glob(os.path.join(filelist_path(setup), "*"))
for file in files:
if os.path.isfile(file):
os.remove(file)
if os.path.exists(filelist_path(setup)):
os.rmdir(filelist_path(setup))

# remove logs
files = glob.glob(os.path.join(tmp_log_path(setup), "*", "*.log"))
for file in files:
if os.path.isfile(file):
os.remove(file)
dirs = glob.glob(os.path.join(tmp_log_path(setup), "*"))
for d in dirs:
if os.path.isdir(d):
os.rmdir(d)
if os.path.exists(tmp_log_path(setup)):
os.rmdir(tmp_log_path(setup))


# Placeholder, can email or maybe put message in slack
Expand Down
8 changes: 8 additions & 0 deletions Snakefile-build-raw
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ meta = metadata_path(setup)
basedir = workflow.basedir


wildcard_constraints:
experiment="\w+",
period="p\d{2}",
run="r\d{3}",
datatype="\w{3}",
timestamp="\d{8}T\d{6}Z",


localrules:
gen_filelist,
autogen_output,
Expand Down
70 changes: 70 additions & 0 deletions rules/common.smk
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ from scripts.util.patterns import (
get_pattern_tier_raw,
get_pattern_plts_tmp_channel,
)
from scripts.util import ProcessingFileKey


def read_filelist(wildcards):
Expand Down Expand Up @@ -39,6 +40,15 @@ def read_filelist_cal(wildcards, tier):
return files


def read_filelist_fft(wildcards, tier):
label = f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-fft"
with checkpoints.gen_filelist.get(label=label, tier=tier, extension="file").output[
0
].open() as f:
files = f.read().splitlines()
return files


def read_filelist_pars_cal_channel(wildcards, tier):
"""
This function will read the filelist of the channels and return a list of dsp files one for each channel
Expand Down Expand Up @@ -99,3 +109,63 @@ def get_pattern(tier):
return get_pattern_tier_daq(setup)
else:
return get_pattern_tier_raw(setup)


def set_last_rule_name(workflow, new_name):
"""Sets the name of the most recently created rule to be `new_name`.
Useful when creating rules dynamically (i.e. unnamed).
Warning
-------
This could mess up the workflow. Use at your own risk.
"""
rules = workflow._rules
last_key = next(reversed(rules))
assert last_key == rules[last_key].name

rules[new_name] = rules.pop(last_key)
rules[new_name].name = new_name

if workflow.default_target == last_key:
workflow.default_target = new_name

if last_key in workflow._localrules:
workflow._localrules.remove(last_key)
workflow._localrules.add(new_name)

workflow.check_localrules()


def get_svm_file(wildcards, tier, name):
par_overwrite_file = os.path.join(par_overwrite_path(setup), tier, "validity.jsonl")
pars_files_overwrite = pars_catalog.get_calib_files(
par_overwrite_file, wildcards.timestamp
)
for pars_file in pars_files_overwrite:
if name in pars_file:
return os.path.join(par_overwrite_path(setup), tier, pars_file)
raise ValueError(f"Could not find model in {pars_files_overwrite}")


def get_overwrite_file(tier, wildcards=None, timestamp=None, name=None):
par_overwrite_file = os.path.join(par_overwrite_path(setup), tier, "validity.jsonl")
if timestamp is not None:
pars_files_overwrite = pars_catalog.get_calib_files(
par_overwrite_file, timestamp
)
else:
pars_files_overwrite = pars_catalog.get_calib_files(
par_overwrite_file, wildcards.timestamp
)
if name is None:
fullname = f"{tier}-overwrite.json"
else:
fullname = f"{tier}_{name}-overwrite.json"
out_files = []
for pars_file in pars_files_overwrite:
if fullname in pars_file:
out_files.append(os.path.join(par_overwrite_path(setup), tier, pars_file))
if len(out_files) == 0:
raise ValueError(f"Could not find name in {pars_files_overwrite}")
else:
return out_files
Loading

0 comments on commit b26a39e

Please sign in to comment.