Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nssp patching code #2000

Merged
merged 56 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
c78ae21
nssp patching code
minhkhul Jul 23, 2024
7694c0a
lint
minhkhul Jul 23, 2024
a3ed4c2
add test
minhkhul Jul 24, 2024
1628d34
add test
minhkhul Jul 24, 2024
2536b94
Add patching how-to to readme
minhkhul Jul 24, 2024
e4d45e5
adjust current_issue_dir name for weekly data instead of daily.
minhkhul Jul 25, 2024
db906fc
lint
minhkhul Jul 25, 2024
8f0bb32
adjust test for more cases
minhkhul Jul 25, 2024
7f151f5
add custom_run flag
minhkhul Aug 6, 2024
c093349
handle custom flag on but bad config
minhkhul Aug 6, 2024
a967416
make patch config check readable
minhkhul Aug 6, 2024
c020da6
make good_patch_config check comprehensive
minhkhul Aug 6, 2024
9a6130b
rewrite good_patch_config for clarity
minhkhul Aug 7, 2024
b8a2177
add unit tests for good_patch_config check
minhkhul Aug 7, 2024
a7d9443
add test_pull unit test for patching case + cleanup format
minhkhul Aug 7, 2024
e29e07e
split test cases + move to pytest
minhkhul Aug 8, 2024
0a4bfb6
add test for multi-week patching
minhkhul Aug 9, 2024
6c0abad
rename tests for clarity + restructure test_patch tests to clarify pu…
minhkhul Aug 15, 2024
7078dd0
make expected issue dates explicit in test_patch_confirm_dir_structur…
minhkhul Aug 15, 2024
d435bf0
add log to distinguish grabbing records from socrata vs locally store…
minhkhul Aug 15, 2024
8734daa
Update nssp/README.md
minhkhul Aug 28, 2024
5a6f8b6
Update nssp/README.md
minhkhul Aug 28, 2024
4356494
Add auto-download source backup data + update docs + test
minhkhul Aug 29, 2024
ca427a4
adjust custom_run flag to leave room for non-patch custom runs
minhkhul Aug 30, 2024
f58b068
move pull logic from run.py into pull.py
minhkhul Aug 30, 2024
5e93175
logger to static
minhkhul Aug 30, 2024
2d8670d
adjust unit tests
minhkhul Aug 30, 2024
f0335f6
more unit test adjustment
minhkhul Aug 31, 2024
7e06f94
move get_source_data to pull.py + make get_source_data run when sourc…
minhkhul Sep 3, 2024
e678ce6
auto-remove source_dir content after finish patch run
minhkhul Sep 3, 2024
bc1d7a7
lint happy
minhkhul Sep 3, 2024
84cba84
Update pull.py
minhkhul Sep 10, 2024
742737b
Update pull.py - remove stat debug
minhkhul Sep 10, 2024
e13d3db
add progress log for source file download
minhkhul Sep 10, 2024
9cec6ff
lint
minhkhul Sep 10, 2024
5450d8b
lint
minhkhul Sep 10, 2024
0a8da1e
Merge remote-tracking branch 'origin/main' into nssp_patching
minhkhul Dec 19, 2024
ab4b542
rewrite get_source_data to get source data from prod backup_dir + add…
minhkhul Jan 3, 2025
14b4e6f
various bug fixes
minhkhul Jan 5, 2025
d9e1f74
remove source data from local after patch run
minhkhul Jan 6, 2025
ca5294b
fix tests in test_pull
minhkhul Jan 6, 2025
49c4f67
Add new + fix current patch tests + fix duplicated data
minhkhul Jan 7, 2025
fb3fbf2
use set comprehension in get_patch_dates
minhkhul Jan 7, 2025
1f5a4dc
lint
minhkhul Jan 8, 2025
32f550e
make user param optional when no source data download needed
minhkhul Jan 8, 2025
2214f87
linter
minhkhul Jan 8, 2025
9ce6cab
Merge branch 'main' into nssp_patching
minhkhul Feb 6, 2025
633399b
lint
minhkhul Feb 6, 2025
83e5a8c
adjust readme
minhkhul Feb 6, 2025
552d036
add test for pull source data + remove misc secondary trace
minhkhul Feb 6, 2025
c4dd3b2
darker linting
nmdefries Feb 6, 2025
a4c4aec
add tests for remote source download vs local
minhkhul Feb 7, 2025
7c77be5
Merge branch 'nssp_patching' of https://github.com/cmu-delphi/covidca…
minhkhul Feb 7, 2025
5f383a6
change source host to params + add if logger before use in create_bac…
minhkhul Feb 7, 2025
d70d6e1
lint
minhkhul Feb 7, 2025
2a67c27
fix test
minhkhul Feb 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions nssp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,10 @@ with the percentage of code covered by the tests.

None of the linting or unit tests should fail, and the code lines that are not covered by unit tests should be small and
should not include critical sub-routines.

## Running Patches:
A daily backup of source in the form of csv files can be found on `bigchunk-dev-02.delphi.cmu.edu` under `/common/source_backup/nssp`. This data is needed to create patches. Talk to your sysadmin for access.
When your credentials to the server are working, to create patching data for a specific date range in batch issue format, adjust `params.json` in accordance with instructions in `patch.py`, then run
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
```
env/bin/python -m delphi_nssp.patch
```
153 changes: 153 additions & 0 deletions nssp/delphi_nssp/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""
This module is used for patching data in the delphi_nssp package.

The code assume user can use key-based auth to access prod server
where historical source data is stored.

To use this module, configure params.json like so:

{
"common": {
"custom_run": true,
...
},
"validation": {
...
},
"patch": {
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
"source_dir": "delphi/covidcast-indicators/nssp/source_data",
"user": "username",
"patch_dir": "delphi/covidcast-indicators/nssp/AprilPatch",
"start_issue": "2024-04-20",
"end_issue": "2024-04-21",
}
}

In this params.json, we
- Turn on the "custom_run" flag under "common"
- Add "patch" section, which contains:
+ "source_dir": the local directory where source data is downloaded to
+ "user": the username to log in to the remote server where source data is backed up
+ "patch_dir": the local directory where to write all patch issues output
+ "start_date": str, YYYY-MM-DD format, first issue date
+ "end_date": str, YYYY-MM-DD format, last issue date

if "source_dir" doesn't exist locally or has no files in it, we download source data to source_dir
else, we assume all needed source files are already in source_dir.

This module will generate data for that range of issue dates, and store them in batch issue format in the patch_dir:
[patch_dir]/issue_[issue-date]/nssp/actual_data_file.csv
"""

import sys
from datetime import datetime, timedelta
from os import listdir, makedirs, path
from shutil import rmtree

from delphi_utils import get_structured_logger, read_params
from epiweeks import Week

from .pull import get_source_data
from .run import run_module


def good_patch_config(params, logger):
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
"""
Check if the params.json file is correctly configured for patching.

params: Dict[str, Any]
Nested dictionary of parameters, typically loaded from params.json file.
logger: Logger object
Logger object to log messages.
"""
valid_config = True
custom_run = params["common"].get("custom_run", False)
if not custom_run:
logger.error("Calling patch.py without custom_run flag set true.")
valid_config = False

patch_config = params.get("patch", {})
if patch_config == {}:
logger.error("Custom flag is on, but patch section is missing.")
valid_config = False
else:
required_patch_keys = ["start_issue", "end_issue", "patch_dir", "source_dir", "user"]
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
missing_keys = [key for key in required_patch_keys if key not in patch_config]
if missing_keys:
logger.error("Patch section is missing required key(s)", missing_keys=missing_keys)
valid_config = False
else:
try: # issue dates validity check
start_issue = datetime.strptime(patch_config["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(patch_config["end_issue"], "%Y-%m-%d")
if start_issue > end_issue:
logger.error("Start issue date is after end issue date.")
valid_config = False
except ValueError:
logger.error("Issue dates must be in YYYY-MM-DD format.")
valid_config = False

if valid_config:
logger.info("Good patch configuration.")
return True
logger.info("Bad patch configuration.")
return False


def patch():
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
"""
Run the nssp indicator for a range of issue dates.
"""
params = read_params()
logger = get_structured_logger("delphi_nssp.patch", filename=params["common"]["log_filename"])
if not good_patch_config(params, logger):
sys.exit(1)

source_dir = params["patch"]["source_dir"]
download_source = False
if not path.isdir(source_dir) or not listdir(source_dir): # no source dir or empty source dir
download_source = True
get_source_data(params, logger)
else:
logger.info("Source data already exists locally.")

start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")

logger.info(start_issue=start_issue.strftime("%Y-%m-%d"))
logger.info(end_issue=end_issue.strftime("%Y-%m-%d"))
logger.info(source_dir=source_dir)
logger.info(patch_dir=params["patch"]["patch_dir"])

makedirs(params["patch"]["patch_dir"], exist_ok=True)

current_issue = start_issue
while current_issue <= end_issue:
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
logger.info("patching issue", issue_date=current_issue.strftime("%Y%m%d"))

current_issue_source_csv = f"""{source_dir}/{current_issue.strftime("%Y%m%d")}.csv.gz"""
if not path.isfile(current_issue_source_csv):
logger.info("No source data at this path", current_issue_source_csv=current_issue_source_csv)
current_issue += timedelta(days=1)
continue

params["patch"]["current_issue"] = current_issue.strftime("%Y%m%d")

# current_issue_date can be different from params["patch"]["current_issue"]
# due to weekly cadence of nssp data. For weekly sources, issue dates in our
# db matches with first date of epiweek that the reporting date falls in,
# rather than reporting date itself.
current_issue_date = Week.fromdate(current_issue).startdate()
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_date.strftime("%Y%m%d")}/nssp"""
makedirs(f"{current_issue_dir}", exist_ok=True)
params["common"]["export_dir"] = f"""{current_issue_dir}"""

run_module(params, logger)
current_issue += timedelta(days=1)

if download_source:
rmtree(source_dir)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved


if __name__ == "__main__":
patch()
126 changes: 117 additions & 9 deletions nssp/delphi_nssp/pull.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""

import functools
import logging
import sys
import textwrap
from os import makedirs, path
from typing import Optional

import pandas as pd
import paramiko
from delphi_utils import create_backup_csv
from sodapy import Socrata

Expand All @@ -20,6 +25,70 @@
)


def print_callback(remote_file_name, logger, bytes_so_far, bytes_total, progress_chunks):
"""Print the callback information."""
rough_percent_transferred = int(100 * (bytes_so_far / bytes_total))
if rough_percent_transferred in progress_chunks:
logger.info("Transfer in progress", remote_file_name=remote_file_name, percent=rough_percent_transferred)
# Remove progress chunk, so it is not logged again
progress_chunks.remove(rough_percent_transferred)


def get_source_data(params, logger):
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
"""
Download historical source data from a backup server.
This function is typically used in patching only. Normal runs grab latest data from SODA API.

This function uses "user" configuration under "patch" section in params.json to specify
a username with local key-based access to connect to server where backup nssp source data is stored.
It uses "backup_dir" config under "common" section to locate backup files on remote server.
It then searches for CSV files that match the inclusive range of issue dates
specified by 'start_issue', and 'end_issue' config.

These CSV files are then downloaded and stored in the local 'source_dir' directory.
"""
makedirs(params["patch"]["source_dir"], exist_ok=True)
ssh = paramiko.SSHClient()
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
host = "delphi-master-prod-01.delphi.cmu.edu"
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
user = params["patch"]["user"]
ssh.connect(host, username=user)

# Generate file names of source files to download
dates = pd.date_range(start=params["patch"]["start_issue"], end=params["patch"]["end_issue"])
primary_source_files = [f"{date.strftime('%Y%m%d')}.csv.gz" for date in dates]
secondary_source_files = [f"{date.strftime('%Y%m%d')}_secondary.csv.gz" for date in dates]
remote_source_files = primary_source_files + secondary_source_files

# Download source files
sftp = ssh.open_sftp()
try:
sftp.stat(params["common"]["backup_dir"])
except IOError:
logger.error("Source backup directory does not exist on the remote server.")
minhkhul marked this conversation as resolved.
Show resolved Hide resolved

sftp.chdir(params["common"]["backup_dir"])

num_files_transferred = 0
for remote_file_name in remote_source_files:
callback_for_filename = functools.partial(print_callback, remote_file_name, logger, progress_chunks=[0, 50])
local_file_path = path.join(params["patch"]["source_dir"], remote_file_name)
try:
sftp.stat(remote_file_name)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
except IOError:
logger.warning(
"Source backup for this date does not exist on the remote server.", missing_filename=remote_file_name
)
continue
sftp.get(remote_file_name, local_file_path, callback=callback_for_filename)
logger.info("Transfer finished", remote_file_name=remote_file_name, local_file_path=local_file_path)
num_files_transferred += 1
ssh.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (optional): You could wrap this section in a with ssh.open_sftp() as sftp: to avoid this close(). It won't change the behavior but it might make maintenance/change less error-prone.


if num_files_transferred == 0:
logger.error("No source data was transferred. Check the source backup server for potential issues.")
sys.exit(1)

def warn_string(df, type_dict):
"""Format the warning string."""
warn = textwrap.dedent(
Expand Down Expand Up @@ -48,10 +117,12 @@ def pull_with_socrata_api(socrata_token: str, dataset_id: str):
dataset_id: str
The dataset id to pull data from


Returns
-------
list of dictionaries, each representing a row in the dataset
"""

client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
Expand All @@ -65,8 +136,14 @@ def pull_with_socrata_api(socrata_token: str, dataset_id: str):
return results


def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
"""Pull the latest NSSP ER visits primary dataset.
def pull_nssp_data(
socrata_token: str,
backup_dir: str,
custom_run: bool,
issue_date: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
"""Pull the NSSP ER visits primary dataset.

https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview

Expand All @@ -80,9 +157,22 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
df_ervisits = pd.DataFrame.from_records(socrata_results)
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
if not custom_run:
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
df_ervisits = pd.DataFrame.from_records(socrata_results)
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
logger.info("Number of records grabbed", num_records=len(df_ervisits), source="Socrata API")
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
elif custom_run and logger.name == "delphi_nssp.patch":
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
if issue_date is None:
raise ValueError("Issue date is required for patching")
source_filename = f"{backup_dir}/{issue_date}.csv.gz"
df_ervisits = pd.read_csv(source_filename)
logger.info(
"Number of records grabbed",
num_records=len(df_ervisits),
source=source_filename,
)

df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)

Expand All @@ -99,7 +189,11 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger


def secondary_pull_nssp_data(
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
socrata_token: str,
backup_dir: str,
custom_run: bool,
issue_date: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
"""Pull the latest NSSP ER visits secondary dataset.

Expand All @@ -119,9 +213,23 @@ def secondary_pull_nssp_data(
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9")
df_ervisits = pd.DataFrame.from_records(socrata_results)
create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger)
if not custom_run:
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9")
df_ervisits = pd.DataFrame.from_records(socrata_results)
create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger)
logger.info("Number of records grabbed", num_records=len(df_ervisits), source="secondary Socrata API")

elif custom_run and logger.name == "delphi_nssp.patch":
if issue_date is None:
raise ValueError("Issue date is required for patching")
source_filename = f"{backup_dir}/{issue_date}_secondary.csv.gz"
df_ervisits = pd.read_csv(source_filename)
logger.info(
"Number of records grabbed",
num_records=len(df_ervisits),
source=source_filename,
)

df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP)

# geo_type is not provided in the dataset, so we infer it from the geo_value
Expand Down
Loading
Loading