Skip to content

Commit

Permalink
Merge pull request #98 from legend-exp/spms
Browse files Browse the repository at this point in the history
add rule to build dsp pars for SiPM data
  • Loading branch information
gipert authored Feb 25, 2025
2 parents 8a76eea + e4d6230 commit b530386
Show file tree
Hide file tree
Showing 23 changed files with 549 additions and 308 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
- name: Set LEGEND_METADATA variable
run: |
echo "LEGEND_METADATA=$GITHUB_WORKSPACE/inputs" >> $GITHUB_ENV
- name: Clone legend-metadata
uses: actions/checkout@v4
with:
Expand All @@ -68,6 +69,10 @@ jobs:
token: ${{ secrets.CLONE_LEGEND_METADATA }}
path: ${{ env.LEGEND_METADATA }}

- name: Recursively update legend-metadata submodules
run: |
cd "$LEGEND_METADATA" && git submodule update --recursive --remote
- name: Run data production tests
run: ./tests/runprod/run-all.sh

Expand Down
6 changes: 4 additions & 2 deletions dataflow-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
legend_metadata_version: main
# legend_metadata_version: main
allow_none_par: false

paths:
sandbox_path: $_/sandbox
Expand Down Expand Up @@ -74,8 +75,8 @@ execenv:
arg: /data2/public/prodenv/containers/legendexp_legend-base_latest_20241110203225.sif
env:
PRODENV: $PRODENV
NUMBA_CACHE_DIR: $_/.snakemake/numba-cache
LGDO_BOUNDSCHECK: "false"
# LGDO_CACHE: "false"
DSPEED_BOUNDSCHECK: "false"
PYGAMA_PARALLEL: "false"
PYGAMA_FASTMATH: "false"
Expand All @@ -86,6 +87,7 @@ execenv:
arg: --image legendexp/legend-base:latest
env:
PRODENV: $PRODENV
NUMBA_CACHE_DIR: $_/.snakemake/numba-cache
HDF5_USE_FILE_LOCKING: "false"
LGDO_BOUNDSCHECK: "false"
DSPEED_BOUNDSCHECK: "false"
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dynamic = ["version"]
dependencies = [
"colorlog",
"dbetto>=1.2",
"pygama>=2",
"pygama>=2.0.5",
"dspeed>=1.6",
"pylegendmeta>=1.2",
"legend-pydataobj>=1.11.6",
Expand Down Expand Up @@ -116,6 +116,7 @@ par-geds-psp-average = "legenddataflow.scripts.par.geds.psp.average:par_geds_
par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds_raw_blindcal"
par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck"
par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser"
par-spms-dsp-trg-thr = "legenddataflow.scripts.par.spms.dsp.trigger_threshold:par_spms_dsp_trg_thr"

[tool.uv.workspace]
exclude = ["generated", "inputs", "software", "workflow"]
Expand Down
35 changes: 35 additions & 0 deletions tests/runprod/test-argon-char-dataprod.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

# IMPORTANT: this script must be executed from the legend-dataflow directory

# shellcheck disable=SC1091
source "$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)/conftest.sh"

rawdir="$(get_dataflow_config_value paths.tier_raw)"
mkdir -p "${rawdir}" || exit 1

function mkdir_n_touch() {
mkdir -p "$(dirname "${1}")" || return 1
touch "${1}" || return 1
}

rawfiles=(
anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5
anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5
acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5
)

(
cd "${rawdir}" || exit 1
for file in "${rawfiles[@]}"; do
mkdir_n_touch "$file"
done
)

_smk_opts=(
--touch
--config allow_none_par=true
--workflow-profile workflow/profiles/default
)

run_test_command snakemake "${_smk_opts[@]}" "all-p13-*-evt.gen" || exit 1
6 changes: 2 additions & 4 deletions tests/runprod/test-evt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ rawfiles=(
cal/p03/r001/l200-p03-r001-cal-20230317T211819Z-tier_raw.lh5
cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5
cal/p03/r002/l200-p03-r002-cal-20230324T161401Z-tier_raw.lh5
anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5
anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5
acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5
)

(
Expand All @@ -47,4 +44,5 @@ _smk_opts=(
--workflow-profile workflow/profiles/default
)

run_test_command snakemake "${_smk_opts[@]}" "all-*-evt.gen" || exit 1
run_test_command snakemake "${_smk_opts[@]}" "all-p03-*-evt.gen" || exit 1
run_test_command snakemake "${_smk_opts[@]}" "all-p04-*-evt.gen" || exit 1
20 changes: 13 additions & 7 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ basedir = workflow.basedir

time = datetime.now().strftime("%Y%m%dT%H%M%SZ")

if not Path(meta).exists():
LegendMetadata(meta).checkout(config.legend_metadata_version)

metadata = LegendMetadata(meta, lazy=True)
# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist
metadata = LegendMetadata(meta)
if "legend_metadata_version" in config:
metadata.checkout(config.legend_metadata_version)

part = CalGrouping(config, Path(det_status) / "cal_groupings.yaml")

Expand All @@ -51,12 +51,14 @@ wildcard_constraints:
timestamp=r"\d{8}T\d{6}Z",


include: "rules/channel_merge.smk"
include: "rules/filelist_gen.smk"
include: "rules/chanlist_gen.smk"
include: "rules/common.smk"
include: "rules/main.smk"
include: "rules/tcm.smk"
include: "rules/dsp_pars_geds.smk"
include: "rules/dsp_pars_spms.smk"
include: "rules/dsp.smk"
include: "rules/psp_pars_geds.smk"
include: "rules/psp.smk"
Expand All @@ -79,12 +81,16 @@ localrules:

onstart:
print("INFO: starting workflow")

# Make sure some packages are initialized before we begin to avoid race conditions
# https://numba.readthedocs.io/en/stable/developer/caching.html#cache-sharing
if not workflow.touch:
for pkg in ["dspeed", "lgdo", "matplotlib"]:
shell(execenv.execenv_pyexe(config, "python") + "-c 'import " + pkg + "'")
shell(
execenv.execenv_pyexe(config, "python")
+ "-c 'import dspeed, lgdo, matplotlib, pygama'"
)

# Log parameter catalogs in validity files
# Log parameter catalogs in validity files
hit_par_cat_file = Path(utils.pars_path(config)) / "hit" / "validity.yaml"
if hit_par_cat_file.is_file():
hit_par_cat_file.unlink()
Expand Down
7 changes: 5 additions & 2 deletions workflow/Snakefile-build-raw
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ from legenddataflow import patterns as patt
from legenddataflow import utils, execenv, ParsKeyResolve
from datetime import datetime
from dbetto import AttrsDict
from legendmeta import LegendMetadata

utils.subst_vars_in_snakemake_config(workflow, config)
config = AttrsDict(config)
Expand All @@ -26,8 +27,10 @@ meta = utils.metadata_path(config)

time = datetime.now().strftime("%Y%m%dT%H%M%SZ")

if not Path(meta_path).exists():
LegendMetadata(meta_path).checkout(config.legend_metadata_version)
# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist
metadata = LegendMetadata(meta_path, lazy=True)
if "legend_metadata_version" in config:
metadata.checkout(config.legend_metadata_version)


wildcard_constraints:
Expand Down
28 changes: 22 additions & 6 deletions workflow/rules/chanlist_gen.smk
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ from legenddataflow import execenv_pyexe
from legenddataflow.utils import filelist_path


def get_chanlist(setup, keypart, workflow, config, det_status, chan_maps):
# FIXME: the system argument should always be explicitly supplied
def get_chanlist(
setup, keypart, workflow, config, det_status, chan_maps, system="geds"
):
key = ChannelProcKey.parse_keypart(keypart)

flist_path = filelist_path(setup)
os.makedirs(flist_path, exist_ok=True)
output_file = os.path.join(
flist_path,
f"all-{key.experiment}-{key.period}-{key.run}-cal-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}",
f"all-{key.experiment}-{key.period}-{key.run}-{key.datatype}-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}",
)

os.system(
execenv_pyexe(config, "create-chankeylist")
+ f"--det-status {det_status} --channelmap {chan_maps} --timestamp {key.timestamp} "
f"--datatype cal --output-file {output_file}"
f"--datatype {key.datatype} --output-file {output_file} --system {system}"
)

with open(output_file) as r:
Expand All @@ -36,12 +39,25 @@ def get_chanlist(setup, keypart, workflow, config, det_status, chan_maps):


def get_par_chanlist(
setup, keypart, tier, basedir, det_status, chan_maps, name=None, extension="yaml"
setup,
keypart,
tier,
basedir,
det_status,
chan_maps,
datatype="cal",
system="geds",
name=None,
extension="yaml",
):

chan_list = get_chanlist(setup, keypart, workflow, config, det_status, chan_maps)
chan_list = get_chanlist(
setup, keypart, workflow, config, det_status, chan_maps, system
)

par_pattern = get_pattern_pars_tmp_channel(setup, tier, name, extension)
par_pattern = get_pattern_pars_tmp_channel(
setup, tier, name, datatype=datatype, extension=extension
)

filenames = ChannelProcKey.get_channel_files(keypart, par_pattern, chan_list)

Expand Down
60 changes: 42 additions & 18 deletions workflow/rules/channel_merge.smk
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
from legenddataflow.patterns import (
get_pattern_pars_tmp_channel,
get_pattern_plts_tmp_channel,
get_pattern_plts,
get_pattern_tier,
get_pattern_pars_tmp,
get_pattern_pars,
)
from legenddataflow.utils import set_last_rule_name
import inspect

from legenddataflow import patterns
from legenddataflow.utils import set_last_rule_name
from legenddataflow.execenv import execenv_pyexe


def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
if lh5_tier is None:
lh5_tier = tier
Expand All @@ -24,7 +19,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
chan_maps,
),
output:
get_pattern_plts(config, tier),
patterns.get_pattern_plts(config, tier),
group:
f"merge-{tier}"
shell:
Expand All @@ -47,7 +42,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
extension="pkl",
),
output:
get_pattern_pars(
patterns.get_pattern_pars(
config,
tier,
name="objects",
Expand Down Expand Up @@ -76,7 +71,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
),
output:
temp(
get_pattern_pars_tmp(
patterns.get_pattern_pars_tmp(
config,
tier,
datatype="cal",
Expand All @@ -91,6 +86,35 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):

set_last_rule_name(workflow, f"build_pars_{tier}_db")

rule:
"""Merge pars for SiPM channels in a single pars file."""
input:
lambda wildcards: get_par_chanlist(
config,
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
tier,
basedir,
det_status,
chan_maps,
datatype=wildcards.datatype,
system="spms"
),
output:
patterns.get_pattern_pars(
config,
tier,
name="spms",
datatype="{datatype}",
),
group:
f"merge-{tier}"
shell:
execenv_pyexe(config, "merge-channels") + \
"--input {input} "
"--output {output} "

set_last_rule_name(workflow, f"build_pars_spms_{tier}_db")

rule:
input:
in_files=lambda wildcards: get_par_chanlist(
Expand All @@ -102,27 +126,27 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
chan_maps,
extension="lh5" if lh5_merge is True else inspect.signature(get_par_chanlist).parameters['extension'].default,
),
in_db=get_pattern_pars_tmp(
in_db=patterns.get_pattern_pars_tmp(
config,
tier,
datatype="cal",
) if lh5_merge is True else [],
plts=get_pattern_plts(config, tier),
objects=get_pattern_pars(
plts=patterns.get_pattern_plts(config, tier),
objects=patterns.get_pattern_pars(
config,
tier,
name="objects",
extension="dir",
check_in_cycle=check_in_cycle,
),
output:
out_file=get_pattern_pars(
out_file=patterns.get_pattern_pars(
config,
tier,
extension="lh5" if lh5_merge is True else inspect.signature(get_pattern_pars).parameters['extension'].default,
extension="lh5" if lh5_merge is True else inspect.signature(patterns.get_pattern_pars).parameters['extension'].default,
check_in_cycle=check_in_cycle,
),
out_db=get_pattern_pars(config, tier, check_in_cycle=check_in_cycle) if lh5_merge is True else [],
out_db=patterns.get_pattern_pars(config, tier, check_in_cycle=check_in_cycle) if lh5_merge is True else [],
group:
f"merge-{tier}"
run:
Expand Down
Loading

0 comments on commit b530386

Please sign in to comment.