Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fields difference #167

Merged
merged 13 commits into from
Oct 7, 2019
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Note that the top-most release is changes in the unreleased master branch on Git
### Added
- **Anomalies** to see significant deviations in fields coverage across multiple jobs, #138
- Support to **Bitbucket API**, in order to access files from private repositories, #71
- **Fields Difference** rule to find the difference between field values of two jobs. Supports normalization, nested fields, full access to the data, #167


## [0.3.6] (2019-07-12)
Expand Down
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ recommonmark = "*"
sphinxcontrib-golangdomain = {git = "https://bitbucket.org/ymotongpoo/sphinxcontrib-golangdomain"}
sphinx-autoapi = {git = "https://github.com/rtfd/sphinx-autoapi"}
nbsphinx = "*"
sphinx_bootstrap_theme = "*"
memory-profiler = "*"
jupyter-console = "*"
matplotlib = "*"
Expand Down
26 changes: 26 additions & 0 deletions docs/source/nbs/Rules.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,32 @@
"arche.rules.category.get_difference(df, target_df, [\"category\"]).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Compare\n",
"### Fields"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"help(arche.rules.compare.fields)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"arche.rules.compare.fields(df, target_df, [\"part_number\", \"name\", \"uom\"]).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
1 change: 1 addition & 0 deletions src/arche/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import * # noqa

__version__ = "0.3.6"
SH_URL = "https://app.scrapinghub.com/p" # noqa
Expand Down
9 changes: 9 additions & 0 deletions src/arche/arche.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from arche.readers.schema import Schema, SchemaSource
from arche.report import Report
import arche.rules.category as category_rules
import arche.rules.compare as compare
import arche.rules.coverage as coverage_rules
import arche.rules.duplicates as duplicate_rules
import arche.rules.json_schema as schema_rules
Expand Down Expand Up @@ -256,3 +257,11 @@ def compare_with_customized_rules(self, source_items, target_items, tagged_field
price_rules.compare_prices_for_same_names,
]:
self.save_result(r(source_items.df, target_items.df, tagged_fields))
self.save_result(
compare.tagged_fields(
source_items.df,
target_items.df,
tagged_fields,
["product_url_field", "name_field"],
)
)
4 changes: 2 additions & 2 deletions src/arche/readers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pandas as pd
from scrapinghub import ScrapinghubClient
from scrapinghub.client.jobs import Job
from tqdm import tqdm_notebook
from tqdm.notebook import tqdm

RawItems = Iterable[Dict[str, Any]]

Expand All @@ -33,7 +33,7 @@ def categorize(df: pd.DataFrame) -> pd.DataFrame:
"""Cast columns with repeating values to `category` type to save memory"""
if len(df) < 100:
return
for c in tqdm_notebook(df.columns, desc="Categorizing"):
for c in tqdm(df.columns, desc="Categorizing"):
try:
if df[c].nunique(dropna=False) <= 10:
df[c] = df[c].astype("category")
Expand Down
4 changes: 2 additions & 2 deletions src/arche/rules/category.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from arche.rules.result import Outcome, Result
import pandas as pd
from tqdm import tqdm_notebook
from tqdm.notebook import tqdm


def get_difference(
Expand Down Expand Up @@ -97,7 +97,7 @@ def get_categories(df: pd.DataFrame, max_uniques: int = 10) -> Result:
columns = find_likely_cats(df, max_uniques)
result.stats = [
value_counts
for value_counts in tqdm_notebook(
for value_counts in tqdm(
map(lambda c: df[c].value_counts(dropna=False), columns),
desc="Finding categories",
total=len(columns),
Expand Down
98 changes: 98 additions & 0 deletions src/arche/rules/compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from typing import Tuple

from arche.readers.schema import TaggedFields
from arche.rules.result import *


MAX_MISSING_VALUES = 6


def fields(
source_df: pd.DataFrame,
target_df: pd.DataFrame,
names: List[str],
normalize: bool = False,
err_thr: float = 0.25,
) -> Result:
"""Finds fields values difference between dataframes.

Args:
names - a list of field names
normalize - if set, all fields converted to str and processed with lower() and strip()
err_thr - sets the failure threshold for missing values

Returns:
Result with same, missing and new values.
"""

def get_difference(
left: pd.Series, right: pd.Series
) -> Tuple[pd.Series, pd.Series, pd.Series]:
return (
left[left.isin(right)],
left[~(left.isin(right))],
right[~(right.isin(left))],
)

result = Result("Fields Difference")
for field in names:
source = source_df[field].dropna()
target = target_df[field].dropna()
if normalize:
source = source.astype(str).str.lower().str.strip()
target = target.astype(str).str.lower().str.strip()
try:
same, new, missing = get_difference(source, target)
except SystemError:
source = source.astype(str)
target = target.astype(str)
same, new, missing = get_difference(source, target)

same.name, new.name, missing.name = (None, None, None)
result.more_stats.update(
{f"{field}": {"same": same, "new": new, "missing": missing}}
)
result.add_info(
f"{len(source)} `non NaN {field}s` - {len(new)} new, {len(same)} same"
)
if len(missing) == 0:
continue

if len(missing) < MAX_MISSING_VALUES:
msg = ", ".join(missing.unique().astype(str))
else:
msg = f"{', '.join(missing.unique()[:5].astype(str))}..."
msg = f"{msg} `{field}s` are missing"
if len(missing) / len(target_df) >= err_thr:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear what err_thr does. I think it would be a good idea to document what the variable does on the doctring of the function;

result.add_error(
f"{len(missing)} `{field}s` are missing",
errors={msg: set(missing.index)},
)
else:
result.add_info(
f"{len(missing)} `{field}s` are missing",
errors={msg: set(missing.index)},
)
return result


def tagged_fields(
source_df: pd.DataFrame,
target_df: pd.DataFrame,
tagged_fields: TaggedFields,
tags: List[str],
) -> Result:
"""Compare fields tagged with `tags` between two dataframes."""
name = f"{', '.join(tags)} Fields Difference"
result = Result(name)
fields_names: List[str] = list()
for tag in tags:
tag_fields = tagged_fields.get(tag)
if tag_fields:
fields_names.extend(tag_fields)
if not fields_names:
result.add_info(Outcome.SKIPPED)
return result
result = fields(source_df, target_df, fields_names)
result.name = name
return result
6 changes: 2 additions & 4 deletions src/arche/rules/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from arche.rules.result import Outcome, Result
import numpy as np
import pandas as pd
from tqdm import tqdm_notebook
from tqdm.notebook import tqdm


def compare_boolean_fields(
Expand Down Expand Up @@ -94,9 +94,7 @@ def garbage_symbols(df: pd.DataFrame) -> Result:
row_keys: Set = set()
rule_result = Result("Garbage Symbols", items_count=len(df))

for column in tqdm_notebook(
df.select_dtypes([np.object]).columns, desc="Garbage Symbols"
):
for column in tqdm(df.select_dtypes([np.object]).columns, desc="Garbage Symbols"):
matches = df[column].apply(str).str.extractall(garbage, flags=re.IGNORECASE)
if not matches.empty:
error_keys = df.loc[matches.unstack().index.values].index
Expand Down
65 changes: 11 additions & 54 deletions src/arche/rules/price.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):

def compare_prices_for_same_urls(
source_df: pd.DataFrame, target_df: pd.DataFrame, tagged_fields: TaggedFields
):
) -> Result:
"""For each pair of items that have the same `product_url_field` tagged field,
compare `product_price_field` field

Returns:
A result containing pairs of items with same `product_url_field`
from `source_df` and `target_df` which `product_price_field` differ,
missing and new `product_url_field` tagged fields.
A result containing pairs of items from `source_df` and `target_df`
which `product_price_field` differ.
"""
result = Result("Compare Prices For Same Urls")
url_field_list: Optional[List[str]] = tagged_fields.get("product_url_field")
Expand All @@ -90,31 +89,12 @@ def compare_prices_for_same_urls(
same_urls = source_df[(source_df[url_field].isin(target_df[url_field].values))][
url_field
]
new_urls = source_df[~(source_df[url_field].isin(target_df[url_field].values))][
url_field
]
missing_urls = target_df[(~target_df[url_field].isin(source_df[url_field].values))][
url_field
]

errors = {}
for url, group in missing_urls.groupby(missing_urls):
errors[f"Missing {url}"] = set(group.index)

if not missing_urls.empty:
result.add_info(
f"{len(missing_urls)} urls missing from the tested job", errors=errors
)
if not new_urls.empty:
result.add_info(f"{len(new_urls)} new urls in the tested job")
result.add_info(f"{len(same_urls)} same urls in both jobs")

diff_prices_count = 0
price_field_tag = tagged_fields.get("product_price_field")
if not price_field_tag:
price_fields = tagged_fields.get("product_price_field")
if not price_fields:
result.add_info("product_price_field tag is not set")
else:
price_field = price_field_tag[0]
price_field = price_fields[0]
detailed_messages = []
for url in same_urls:
if url.strip() != "nan":
Expand All @@ -130,7 +110,6 @@ def compare_prices_for_same_urls(
and is_number(target_price)
and ratio_diff(source_price, target_price) > 0.1
):
diff_prices_count += 1
source_key = source_df[source_df[url_field] == url].index[0]
target_key = target_df[target_df[url_field] == url].index[0]
msg = (
Expand All @@ -139,7 +118,7 @@ def compare_prices_for_same_urls(
)
detailed_messages.append(msg)

res = f"{len(same_urls)} checked, {diff_prices_count} errors"
res = f"{len(same_urls)} checked, {len(detailed_messages)} errors"
if detailed_messages:
result.add_error(res, detailed="\n".join(detailed_messages))
else:
Expand Down Expand Up @@ -214,33 +193,12 @@ def compare_prices_for_same_names(
same_names = source_df[(source_df[name_field].isin(target_df[name_field].values))][
name_field
]
new_names = source_df[~(source_df[name_field].isin(target_df[name_field].values))][
name_field
]
missing_names = target_df[
~(target_df[name_field].isin(source_df[name_field].values))
][name_field]

errors = {}
for name, group in missing_names.groupby(missing_names):
errors[f"Missing {name}"] = set(group.index)

if not missing_names.empty:
result.add_info(
f"{len(missing_names)} names missing from the tested job", errors=errors
)
if not new_names.empty:
result.add_info(f"{len(new_names)} new names in the tested job")
result.add_info(f"{len(same_names)} same names in both jobs")

price_tag = "product_price_field"
price_field_tag = tagged_fields.get(price_tag)
if not price_field_tag:
price_fields = tagged_fields.get("product_price_field")
if not price_fields:
result.add_info("product_price_field tag is not set")
return result

price_field = price_field_tag[0]
count = 0
price_field = price_fields[0]

detailed_messages = []
for name in same_names:
Expand All @@ -249,7 +207,6 @@ def compare_prices_for_same_names(
target_price = target_df[target_df[name_field] == name][price_field].iloc[0]
if is_number(source_price) and is_number(target_price):
if ratio_diff(source_price, target_price) > 0.1:
count += 1
source_key = source_df[source_df[name_field] == name].index[0]
target_key = target_df[target_df[name_field] == name].index[0]
msg = (
Expand All @@ -258,7 +215,7 @@ def compare_prices_for_same_names(
)
detailed_messages.append(msg)

result_msg = f"{len(same_names)} checked, {count} errors"
result_msg = f"{len(same_names)} checked, {len(detailed_messages)} errors"
if detailed_messages:
result.add_error(result_msg, detailed="\n".join(detailed_messages))
else:
Expand Down
33 changes: 5 additions & 28 deletions src/arche/rules/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,35 +65,12 @@ class Result:

name: str
messages: Dict[Level, List[Message]] = field(default_factory=dict)
_stats: Optional[List[Stat]] = field(default_factory=list)
items_count: Optional[int] = 0
_stats: List[Stat] = field(default_factory=list)
more_stats: Dict[str, Dict] = field(default_factory=dict)
items_count: int = 0
_err_keys: Set[Union[str, int]] = field(default_factory=set)
_err_items_count: Optional[int] = 0
_figures: Optional[List[go.FigureWidget]] = field(default_factory=list)

def __eq__(self, other):
for left, right in zip(self.stats, other.stats):
if not self.tensors_equal(left, right):
return False

return (
self.name == other.name
and self.messages == other.messages
and self.items_count == other.items_count
and self.err_items_count == other.err_items_count
and len(self.stats) == len(other.stats)
)

@staticmethod
def tensors_equal(left: Stat, right: Stat):
try:
if isinstance(left, pd.DataFrame):
pd.testing.assert_frame_equal(left, right)
else:
pd.testing.assert_series_equal(left, right)
return True
except AssertionError:
return False
_err_items_count: int = 0
_figures: List[go.FigureWidget] = field(default_factory=list)

@property
def info(self):
Expand Down
Loading