Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor hospital admission to use delphi_utils create_export_csv #2032

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 33 additions & 26 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

from .nancodes import Nans

def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):

def filter_contradicting_missing_codes(df, sensor, metric, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
# Get indicies where the XNOR is true (i.e. both are true or both are false).
Expand All @@ -21,12 +22,8 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
]
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code",
sensor=sensor,
metric=metric,
date=date.strftime(format="%Y-%m-%d"),
)
date = df.loc[mask]["timestamp"][0].strftime("%Y%m%d")
logger.info("Filtering contradictory missing code", sensor=sensor, metric=metric, date=date)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
df = df.loc[~mask]
Expand Down Expand Up @@ -100,6 +97,26 @@ def create_export_csv(
else:
dates = pd.date_range(start_date, end_date)

if remove_null_samples:
df = df[df["sample_size"].notnull()]
if sort_geos:
df = df.sort_values(by="geo_id")
if "missing_val" in df.columns:
df = filter_contradicting_missing_codes(df, sensor, metric, logger=logger)

expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"timestamp",
"missing_val",
"missing_se",
"missing_sample_size",
]
df = df.filter(items=expected_columns)
df = df.round({"val": 7, "se": 7})

for date in dates:
if weekly_dates:
t = Week.fromdate(pd.to_datetime(str(date)))
Expand All @@ -111,24 +128,14 @@ def create_export_csv(
else:
export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
export_df = filter_contradicting_missing_codes(
export_df, sensor, metric, date, logger=logger
)
if remove_null_samples:
export_df = export_df[export_df["sample_size"].notnull()]
export_df = export_df.round({"val": 7, "se": 7})
if sort_geos:
export_df = export_df.sort_values(by="geo_id")
export_df = df[df["timestamp"] == date]
export_df = export_df.drop("timestamp", axis=1)
export_df.to_csv(export_file, index=False, na_rep="NA")

logger.debug(
"Wrote rows",
num_rows=df.size,
geo_type=geo_res,
num_geo_ids=export_df["geo_id"].unique().size,
)
return dates
27 changes: 20 additions & 7 deletions _delphi_utils_python/tests/test_export.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tests for exporting CSV files."""
import logging
from datetime import datetime
from os import listdir
from os.path import join
Expand All @@ -11,6 +12,7 @@

from delphi_utils import create_export_csv, Nans

TEST_LOGGER = logging.getLogger()

def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
assert all(isinstance(e, type) or isinstance(e, str) for e in dtypes.values()), (
Expand Down Expand Up @@ -102,6 +104,7 @@ def test_export_with_metric(self, tmp_path):
metric="deaths",
geo_res="county",
sensor="test",
logger=TEST_LOGGER
)

assert set(listdir(tmp_path)) == set(
Expand All @@ -122,6 +125,7 @@ def test_export_rounding(self, tmp_path):
metric="deaths",
geo_res="county",
sensor="test",
logger=TEST_LOGGER
)
assert_frame_equal(
pd.read_csv(join(tmp_path, "20200215_county_deaths_test.csv")),
Expand All @@ -144,6 +148,7 @@ def test_export_without_metric(self, tmp_path):
export_dir=tmp_path,
geo_res="county",
sensor="test",
logger=TEST_LOGGER
)

assert set(listdir(tmp_path)) == set(
Expand All @@ -163,6 +168,7 @@ def test_export_with_limiting_start_date(self, tmp_path):
export_dir=tmp_path,
geo_res="county",
sensor="test",
logger=TEST_LOGGER
)

assert set(listdir(tmp_path)) == set(
Expand All @@ -182,6 +188,7 @@ def test_export_with_limiting_end_date(self, tmp_path):
export_dir=tmp_path,
geo_res="county",
sensor="test",
logger=TEST_LOGGER
)

assert set(listdir(tmp_path)) == set(
Expand All @@ -199,6 +206,7 @@ def test_export_with_no_dates(self, tmp_path):
export_dir=tmp_path,
geo_res="state",
sensor="test",
logger=TEST_LOGGER
)

assert set(listdir(tmp_path)) == set(
Expand Down Expand Up @@ -228,7 +236,8 @@ def test_export_with_null_removal(self, tmp_path):
export_dir=tmp_path,
geo_res="state",
sensor="test",
remove_null_samples=True
remove_null_samples=True,
logger=TEST_LOGGER
)

assert set(listdir(tmp_path)) == set(
Expand Down Expand Up @@ -259,7 +268,8 @@ def test_export_without_null_removal(self, tmp_path):
export_dir=tmp_path,
geo_res="state",
sensor="test",
remove_null_samples=False
remove_null_samples=False,
logger=TEST_LOGGER
)

assert set(listdir(tmp_path)) == set(
Expand All @@ -275,7 +285,7 @@ def test_export_without_null_removal(self, tmp_path):
def test_export_df_without_missingness(self, tmp_path):

create_export_csv(
df=self.DF.copy(), export_dir=tmp_path, geo_res="county", sensor="test"
df=self.DF.copy(), export_dir=tmp_path, geo_res="county", sensor="test", logger=TEST_LOGGER
)
df = pd.read_csv(join(tmp_path, "20200215_county_test.csv")).astype(
{"geo_id": str, "sample_size": int}
Expand All @@ -297,6 +307,7 @@ def test_export_df_with_missingness(self, tmp_path):
export_dir=tmp_path,
geo_res="county",
sensor="test",
logger=TEST_LOGGER
)
assert set(listdir(tmp_path)) == set(
[
Expand Down Expand Up @@ -324,12 +335,12 @@ def test_export_df_with_missingness(self, tmp_path):
@mock.patch("delphi_utils.logger")
def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):
sensor = "test"
geo_res = "state"
geo = "state"
create_export_csv(
df=self.DF3.copy(),
export_dir=tmp_path,
geo_res=geo,
sensor=sensor,
geo_res=geo_res,
logger=mock_logger
)
assert set(listdir(tmp_path)) == set(
Expand Down Expand Up @@ -360,7 +371,8 @@ def test_export_sort(self, tmp_path):
unsorted_df,
export_dir=tmp_path,
geo_res="county",
sensor="test"
sensor="test",
logger=TEST_LOGGER
)
expected_df = pd.DataFrame({
"geo_id": ["51175", "51093"],
Expand All @@ -376,7 +388,8 @@ def test_export_sort(self, tmp_path):
export_dir=tmp_path,
geo_res="county",
sensor="test",
sort_geos=True
sort_geos=True,
logger=TEST_LOGGER
)
expected_df = pd.DataFrame({
"geo_id": ["51093", "51175"],
Expand Down
3 changes: 2 additions & 1 deletion changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
start_date=start_date,
end_date=end_date,
sensor=out_name,
write_empty_days=True
write_empty_days=True,
logger=logger,
)
logger.debug("Wrote rows", num_rows=df.size, geo_type=geo_level, num_geo_ids=df["geo_id"].unique().size)
logger.debug("Wrote files", export_dir=output_path)
Expand Down
23 changes: 15 additions & 8 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@
when the module is run with `python -m delphi_claims_hosp`.
"""

# standard packages
import time
import os
import time
from datetime import datetime, timedelta
from pathlib import Path

# third party
from delphi_utils import get_structured_logger
from delphi_utils.export import create_export_csv

# first party
from .backfill import merge_backfill_file, store_backfill_file
from .config import Config
from .download_claims_ftp_files import download
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .modify_claims_drops import modify_and_write
from .update_indicator import ClaimsHospIndicatorUpdater
from .backfill import (store_backfill_file, merge_backfill_file)


def run_module(params):
Expand Down Expand Up @@ -138,10 +136,19 @@ def run_module(params):
signal_name,
logger,
)
updater.update_indicator(
output = updater.update_indicator(
claims_file,
params["common"]["export_dir"],
)
filtered_output_df = updater.preprocess_output(output)
create_export_csv(
filtered_output_df,
export_dir=params["common"]["export_dir"],
start_date=startdate,
geo_res=geo,
sensor=signal_name,
logger=logger,
)

max_dates.append(updater.output_dates[-1])
n_csv_export.append(len(updater.output_dates))
logger.info("Finished updating", geo_type=geo)
Expand Down
Loading
Loading