Skip to content

Commit

Permalink
[IBCDPE-947] GX Validation Record Keeping (#135)
Browse files Browse the repository at this point in the history
* adds reporter class

* updates CI command

* adds dataset report to process_dataset

* Testing out changes to return from log time wrapper

* uncomment config chunk

* change platform module name

* remove dev_config

* cleans up reporter

* cleans up gx

* conditional update_table

* updates process

* updates for gx_table config

* adds gx_table to config files

* autoflake

* adds test_reporter

* updates test_gx

* updates gx docstrings

* fix imports

* redo test process

* removes warnings

* adds missing typhint

* splits line

* change to constants.py

---------

Co-authored-by: BryanFauble <[email protected]>
  • Loading branch information
BWMac and BryanFauble authored Jun 10, 2024
1 parent d1ed49f commit 4d3da56
Show file tree
Hide file tree
Showing 15 changed files with 1,971 additions and 1,357 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
python-version: "3.9"
- run: pip install -U setuptools
- run: pip install .
- run: adt test_config.yaml --upload --platform GITHUB --token ${{secrets.SYNAPSE_PAT}}
- run: adt test_config.yaml --upload --platform GITHUB --run_id ${{ github.run_id }} --token ${{secrets.SYNAPSE_PAT}}

ghcr-publish:
needs: [build, test]
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,6 @@ staging/*

#test staging location
test_staging_dir/

# dev config file
dev_config.yaml
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ The agora-data-tools project follows the standard [trunk based development](http
adt test_config.yaml
```

If your changes have to do with the way that files are uploaded to Synapse, create a new configuration file by copying `test_config.yaml` and changing the `destination` and `gx_folder` fields to testing locations that you own. The command will change to be:
If your changes have to do with the way that files are uploaded to Synapse and/or uploading new records to the ADT GX Synapse table, create a new configuration file by copying `test_config.yaml` and changing the `destination`, `gx_folder`, and `gx_table` fields to testing locations that you own. The command will change to be:

```
adt my_dev_config.yaml --upload
Expand Down
2,378 changes: 1,196 additions & 1,182 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
destination: &dest syn12177492
staging_path: ./staging
gx_folder: syn52948668
gx_table: syn60527066
sources:
- genes_biodomains:
genes_biodomains_files: &genes_biodomains_files
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ packages = find:
install_requires =
pandas~=2.0.0
numpy~=1.21
setuptools~=67.0.0
setuptools~=70.0.0
synapseclient~=4.0.0
PyYAML~=6.0
pyarrow~=14.0.1
Expand Down
7 changes: 7 additions & 0 deletions src/agoradatatools/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Enum


class Platform(Enum):
LOCAL = "LOCAL"
GITHUB = "GITHUB"
NEXTFLOW = "NEXTFLOW"
64 changes: 47 additions & 17 deletions src/agoradatatools/gx.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
import json
import logging
import os
import shutil
import json
import typing

import pandas as pd

from agoradatatools.errors import ADTDataValidationError
from typing import Optional

import great_expectations as gx
import pandas as pd
from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult
from synapseclient import Activity, File, Synapse

from agoradatatools.reporter import DatasetReport

logger = logging.getLogger(__name__)
# Disable GX INFO logging
logging.getLogger("great_expectations").setLevel(logging.WARNING)


class GreatExpectationsRunner:
"""Class to run great expectations on a dataset and upload the HTML report to Synapse"""

failures: bool = False
failure_message: Optional[str] = None
report_file: Optional[str] = None
report_version: Optional[int] = None
report_link: Optional[str] = None

def __init__(
self,
syn: Synapse,
Expand All @@ -43,12 +48,12 @@ def __init__(
from expectations.expect_column_values_to_have_list_length import (
ExpectColumnValuesToHaveListLength,
)
from expectations.expect_column_values_to_have_list_members import (
ExpectColumnValuesToHaveListMembers,
)
from expectations.expect_column_values_to_have_list_length_in_range import (
ExpectColumnValuesToHaveListLengthInRange,
)
from expectations.expect_column_values_to_have_list_members import (
ExpectColumnValuesToHaveListMembers,
)
from expectations.expect_column_values_to_have_list_members_of_type import (
ExpectColumnValuesToHaveListMembersOfType,
)
Expand All @@ -74,7 +79,12 @@ def _check_if_expectation_suite_exists(self) -> bool:
return exists

def _get_results_path(self, checkpoint_result: CheckpointResult) -> str:
"""Gets the path to the most recent HTML report for a checkpoint, copies it to a Synapse-API friendly name, and returns the new path"""
"""Gets the path to the most recent HTML report for a checkpoint,
copies it to a Synapse-API friendly name, and returns the new path
Args:
checkpoint_result (CheckpointResult): CheckpointResult object from GX validation run.
"""
validation_results = checkpoint_result.list_validation_result_identifiers()
latest_validation_result = validation_results[0]

Expand All @@ -97,8 +107,13 @@ def _get_results_path(self, checkpoint_result: CheckpointResult) -> str:
return new_results_path

def _upload_results_file_to_synapse(self, results_path: str) -> None:
"""Uploads a results file to Synapse"""
self.syn.store(
"""Uploads a results file to Synapse. Assigns class attributes associated
with the report file.
Args:
results_path (str): Path to the GX report file.
"""
file = self.syn.store(
File(
results_path,
parentId=self.upload_folder,
Expand All @@ -109,12 +124,26 @@ def _upload_results_file_to_synapse(self, results_path: str) -> None:
),
forceVersion=True,
)
self.report_file = file.id
self.report_version = file.versionNumber
self.report_link = DatasetReport.format_link(
syn_id=file.id, version=file.versionNumber
)

@staticmethod
def convert_nested_columns_to_json(
df: pd.DataFrame, nested_columns: typing.List[str]
) -> pd.DataFrame:
"""Converts nested columns in a DataFrame to JSON-parseable strings"""
"""Converts nested columns in a DataFrame to JSON-parseable strings
Args:
df (pd.DataFrame): DataFrame
nested_columns (typing.List[str]): List of nested columns
Returns:
df (pd.DataFrame): DataFrame with nested columns converted to JSON-parseable strings
"""
df = df.copy()
for column in nested_columns:
df[column] = df[column].apply(json.dumps)
return df
Expand All @@ -126,7 +155,7 @@ def get_failed_expectations(self, checkpoint_result: CheckpointResult) -> str:
checkpoint_result (CheckpointResult): CheckpointResult object
Returns:
fail_message: String with information on which fields and expectations failed
fail_message (str): String with information on which fields and expectations failed
"""
fail_dict = {self.expectation_suite_name: {}}
expectation_results = checkpoint_result.list_validation_results()[0]["results"]
Expand All @@ -153,7 +182,8 @@ def get_failed_expectations(self, checkpoint_result: CheckpointResult) -> str:
return fail_message

def run(self) -> None:
"""Run great expectations on a dataset and upload the results to Synapse"""
"""Run great expectations on a dataset and upload the results to Synapse."""

if not self._check_if_expectation_suite_exists():
return

Expand Down Expand Up @@ -185,5 +215,5 @@ def run(self) -> None:
self._upload_results_file_to_synapse(latest_reults_path)

if not checkpoint_result.success:
fail_message = self.get_failed_expectations(checkpoint_result)
raise ADTDataValidationError(fail_message)
self.failures = True
self.failure_message = self.get_failed_expectations(checkpoint_result)
9 changes: 5 additions & 4 deletions src/agoradatatools/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ def format_seconds(seconds):
def time_function(func, *args, **kwargs):
"""Returns the elapsed time for a function to run."""
start_time = time.monotonic()
func(*args, **kwargs)
result = func(*args, **kwargs)
end_time = time.monotonic()
elapsed_time = end_time - start_time
elapsed_time_formatted = format_seconds(elapsed_time)
return elapsed_time_formatted
return elapsed_time_formatted, result


def log_time(func_name: str, logger: logging.Logger):
Expand All @@ -43,19 +43,20 @@ def wrapped(*args, **kwargs):
if func_name == "process_dataset":
dataset = next(iter(kwargs["dataset_obj"]))
logger.info("Now processing %s dataset", dataset)
elapsed_time_formatted = time_function(func, *args, **kwargs)
elapsed_time_formatted, result = time_function(func, *args, **kwargs)
logger.info("Processing complete for %s dataset", dataset)
string_list = [elapsed_time_formatted, dataset]
message = "Elapsed time: %s for %s dataset"

if func_name == "process_all_files":
logger.info("Agora Data Tools processing has started")
elapsed_time_formatted = time_function(func, *args, **kwargs)
elapsed_time_formatted, result = time_function(func, *args, **kwargs)
logger.info("Agora Data Tools processing has completed")
string_list = [elapsed_time_formatted]
message = "Elapsed time: %s for all data processing"

logger.info(message, *string_list)
return result

return wrapped

Expand Down
Loading

0 comments on commit 4d3da56

Please sign in to comment.