Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements for symptoms extraction and visualization #82

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 105 additions & 54 deletions ecg_data_manager/modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,63 @@ def process_ecg_data(db: Client, data: pd.DataFrame) -> pd.DataFrame:
return processed_data


def fetch_symptoms_single(observation_data: dict) -> dict:
"""
Extracts symptoms information from the components array of a single observation data
dictionary where HKElectrocardiogram.SymptomsStatus is 'present'. Returns 'UserId',
'ResourceId', and 'Symptoms'. This data is suitable for merging with a main DataFrame.

Args:
observation_data: A dictionary containing observation data.

Returns:
dict: A dictionary with 'UserId', 'ResourceId', and 'Symptoms' if symptoms are present.
Returns an empty dictionary if no symptoms are present or if SymptomsStatus is
not 'present'.
"""
components = observation_data.get("component", [])
user_id = observation_data.get(ColumnNames.USER_ID.value)
resource_id = observation_data.get(ColumnNames.RESOURCE_ID.value)

# Check for SymptomsStatus
symptoms_status = next(
(
comp.get("valueString")
for comp in components
if comp.get("code", {}).get("coding", [{}])[0].get("code")
== "HKElectrocardiogram.SymptomsStatus"
),
None,
)

# If SymptomsStatus is "present", extract symptoms
if symptoms_status == "present":
symptoms = [
f"{comp.get('code', {}).get('coding', [{}])[0].get('display')}:"
f"{comp.get('valueString')}"
for comp in components
if "HKCategoryTypeIdentifier"
in comp.get("code", {}).get("coding", [{}])[0].get("code", "")
]
if symptoms: # Check if symptoms list is not empty
return {
ColumnNames.USER_ID.value: user_id,
ColumnNames.RESOURCE_ID.value: resource_id,
"Symptoms": ", ".join(symptoms),
}
return {
ColumnNames.USER_ID.value: user_id,
ColumnNames.RESOURCE_ID.value: resource_id,
"Symptoms": "No symptoms.",
}

return {
ColumnNames.USER_ID.value: user_id,
ColumnNames.RESOURCE_ID.value: resource_id,
"Symptoms": "No symptoms.",
}


def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches
db: Client,
input_df: pd.DataFrame,
Expand All @@ -91,7 +148,7 @@ def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches
) -> pd.DataFrame:
"""
Fetch diagnosis data from the Firestore database and extend the input DataFrame with new
columns.
columns, including a 'Symptoms' column.

Args:
db (Client): Firestore database client.
Expand All @@ -101,7 +158,7 @@ def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches
ECG_DATA_SUBCOLLECTION.

Returns:
pd.DataFrame: Extended DataFrame containing the fetched diagnosis data.
pd.DataFrame: Extended DataFrame containing the fetched diagnosis data and symptoms.
"""
collection_ref = db.collection(collection_name)
resources = []
Expand All @@ -126,98 +183,92 @@ def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches
)
).stream()

# Process the FHIR documents and store observation data
for doc in fhir_docs:
observation_data = doc.to_dict()
observation_data["user_id"] = user_id
observation_data["ResourceId"] = doc.id
observation_data[ColumnNames.USER_ID.value] = user_id
observation_data[ColumnNames.RESOURCE_ID.value] = doc.id

# Extract effective period start time
effective_start = observation_data.get("effectivePeriod", {}).get(
"start", ""
)
if effective_start:
observation_data["EffectiveDateTimeHHMM"] = effective_start

# Extract symptoms information HERE
symptoms_info = fetch_symptoms_single(observation_data)
if symptoms_info:
observation_data.update(symptoms_info)

# Extract diagnosis information from diagnosis subcollection
diagnosis_docs = list(
doc.reference.collection(DIAGNOSIS_DATA_SUBCOLLECTION).stream()
)

if diagnosis_docs:
physician_initials_list = [
diagnosis_doc.to_dict().get("physicianInitials")
for diagnosis_doc in diagnosis_docs
if diagnosis_doc.to_dict().get("physicianInitials")
]
observation_data["NumberOfReviewers"] = len(physician_initials_list)
observation_data["Reviewers"] = physician_initials_list
else:
observation_data["NumberOfReviewers"] = 0
observation_data["Reviewers"] = []

physician_initials_list = [
diagnosis_doc.to_dict().get("physicianInitials", "")
for diagnosis_doc in diagnosis_docs
]
observation_data["NumberOfReviewers"] = len(physician_initials_list)
observation_data["Reviewers"] = physician_initials_list
observation_data["ReviewStatus"] = (
"Incomplete review"
if observation_data["NumberOfReviewers"] < 3
else "Complete review"
)
resources.append(observation_data)

# Add new columns from diagnosis documents
for i, diagnosis_doc in enumerate(diagnosis_docs):
if diagnosis_doc:
doc_data = diagnosis_doc.to_dict()
for key, value in doc_data.items():
col_name = f"Diagnosis{i+1}_{key}"
new_columns.add(col_name)
observation_data[col_name] = value
doc_data = diagnosis_doc.to_dict()
for key, value in doc_data.items():
col_name = f"Diagnosis{i+1}_{key}"
new_columns.add(col_name)
observation_data[col_name] = value

resources.append(observation_data)

except Exception as e: # pylint: disable=broad-exception-caught
print(f"An error occurred while processing user {user_id}: {str(e)}")

fetched_df = pd.DataFrame(resources)

# Define columns for the final DataFrame
columns = [
ColumnNames.USER_ID.value,
"ResourceId",
ColumnNames.RESOURCE_ID.value,
"EffectiveDateTimeHHMM",
ColumnNames.APPLE_ELECTROCARDIOGRAM_CLASSIFICATION.value,
"NumberOfReviewers",
"Reviewers",
"ReviewStatus",
"Symptoms",
] + list(new_columns)

data = []

for resource in resources:
row_data = [
resource.get(ColumnNames.USER_ID.value, None),
resource.get("id", None),
(
resource.get("effectivePeriod", {}).get("start", None)
if resource.get("effectivePeriod")
else None
),
(
resource.get("component", [{}])[2].get("valueString", None)
if len(resource.get("component", [])) > 2
else None
),
resource.get("NumberOfReviewers", None),
resource.get("Reviewers", None),
resource.get("ReviewStatus", None),
]
for col in new_columns:
row_data.append(resource.get(col, None))

data.append(row_data)

fetched_df = pd.DataFrame(data, columns=columns)
fetched_df = fetched_df.reindex(
columns=columns, fill_value=None
) # Ensure columns are in order and filled

# Extend the input_df with new columns based on ResourceId
# Extend the input DataFrame with new columns
extended_df = input_df.copy()
additional_columns = [
"ResourceId",
ColumnNames.RESOURCE_ID.value,
"NumberOfReviewers",
"Reviewers",
"ReviewStatus",
"EffectiveDateTimeHHMM",
"Symptoms",
] + list(new_columns)

for col in additional_columns:
if col not in extended_df.columns:
extended_df[col] = None

for index, row in extended_df.iterrows():
resource_id = row["ResourceId"]
fetched_row = fetched_df[fetched_df["ResourceId"] == resource_id]
resource_id = row[ColumnNames.RESOURCE_ID.value]
fetched_row = fetched_df[
fetched_df[ColumnNames.RESOURCE_ID.value] == resource_id
]
if not fetched_row.empty:
for col in additional_columns:
if col in fetched_row.columns:
Expand Down
21 changes: 11 additions & 10 deletions ecg_data_manager/modules/visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
#

"""
This module provides classes and associated functions for viewing, filtering, and
analyzing ECG data. The primary class, ECGDataViewer, allows users to interact with
ECG data through a graphical interface, enabling the review, diagnosis, and visualization
of ECG recordings. The module also includes functions for plotting single lead ECGs and
configuring the appearance of the plots.
This module provides classes and functions for viewing, filtering, and analyzing ECG data. The
primary class, ECGDataViewer, allows users to interact with ECG data through a graphical interface,
enabling the review, diagnosis, and visualization of ECG recordings.
"""

# Standard library imports
Expand Down Expand Up @@ -351,6 +349,8 @@ def plot_single_ecg(self, row): # pylint: disable=too-many-locals
else "Unknown"
)

symptoms = row.get("Symptoms", "No symptoms reported.")

group_class = row[AGE_GROUP_STRING]
user_id_html = widgets.HTML(
value=f"<b style='font-size: larger;'><span style='color: blue;'>{group_class}</span> "
Expand All @@ -360,11 +360,15 @@ def plot_single_ecg(self, row): # pylint: disable=too-many-locals
heart_rate_html = widgets.HTML(
value=f"<b style='font-size: larger;'>Average HR: {heart_rate} bpm</b>"
)

symptoms_html = widgets.HTML(
value=f"<b style='font-size: larger;'>Symptoms: {symptoms}</b>"
)

interpretation_html = widgets.HTML(
value="<b style='font-size: larger;'>Classification: "
)

# Conditional color for non-sinusRhythm classifications
if ecg_interpretation != SINUS_RHYTHM:
interpretation_html.value += (
f"<span style='color: red;'>{ecg_interpretation}</span>"
Expand All @@ -374,7 +378,7 @@ def plot_single_ecg(self, row): # pylint: disable=too-many-locals

interpretation_html.value += "</b>"

display(user_id_html, heart_rate_html, interpretation_html)
display(user_id_html, heart_rate_html, symptoms_html, interpretation_html)

# Add review status
diagnosis_collection_ref = (
Expand Down Expand Up @@ -474,7 +478,6 @@ def hide_widgets(b): # pylint: disable=unused-argument
)
)

# Hide the widgets if not all selections have been made
initials = (
self.initials_dropdown.value
if self.initials_dropdown.value != WidgetStrings.OTHER.value
Expand All @@ -489,10 +492,8 @@ def hide_widgets(b): # pylint: disable=unused-argument
tracing_quality_dropdown.layout.visibility = "hidden"
notes_textarea.layout.visibility = "hidden"

# Attach the hide_widgets function to the button's on_click event
save_button.on_click(hide_widgets)

# Display the widgets
widgets_box = widgets.VBox(
[
diagnosis_dropdown,
Expand Down
Loading