-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks.py
61 lines (49 loc) · 1.34 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
"""
File to run the pipeline.
Author: Daniel Borek, July 2023
"""
from pathlib import Path
import invoke
VARS = "PYDEVD_DISABLE_FILE_VALIDATION=1"
ENV = "3.12.1"
VENV = "brainlat-3.12.1"
PYTHON = f"~/.pyenv/versions/{ENV}/envs/{VENV}/bin/python"
def run_notebook(c, notebook):
"""Runs a Jupyter notebook."""
if Path(notebook).is_file():
c.run(
f"{VARS} {PYTHON} -m jupyter nbconvert \\"
f"--to notebook --execute {notebook} --inplace"
)
else:
print(f"Notebook {notebook} does not exist.")
@invoke.task
def downlnoad_data(c):
"""
Downloads data from Synapse.
"""
c.run(f"{VARS} {PYTHON} scripts/01_download-data.py")
@invoke.task
def dataset_overview(c):
"""
Loads and converts PSD data to dataarray form from via Jupyter notebook.
"""
if Path("/Volumes/T7/BrainLat").exists():
notebook = "notebooks/00_Dataset_overview.ipynb"
run_notebook(c, notebook)
@invoke.task
def check_subject(c):
"""
Loads and converts PSD data to dataarray form from via Jupyter notebook.
"""
if Path("/Volumes/T7/BrainLat").exists():
notebook = "notebooks/01_Plot_subject.ipynb"
run_notebook(c, notebook)
# Entire pipeline
@invoke.task
def run_all(c):
"""
Runs the entire pipeline.
"""
dataset_overview(c)
check_subject(c)