Skip to content

Commit

Permalink
Fields difference (#167)
Browse files Browse the repository at this point in the history
* Add compare_fields

* Ignore * warnings

* Refactor price

* Support nested structures

* Add normalization

* Add more_stats to easily access all data, replace Result class eq with assert

* Update rules to new assert

* Fix tqdm warning
  • Loading branch information
manycoding authored Oct 7, 2019
1 parent 0538719 commit 77075db
Show file tree
Hide file tree
Showing 25 changed files with 449 additions and 278 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Note that the top-most release is changes in the unreleased master branch on Git
### Added
- **Anomalies** to see significant deviations in fields coverage across multiple jobs, #138
- Support to **Bitbucket API**, in order to access files from private repositories, #71
- **Fields Difference** rule to find the difference between field values of two jobs. Supports normalization, nested fields, full access to the data, #167


## [0.3.6] (2019-07-12)
Expand Down
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ recommonmark = "*"
sphinxcontrib-golangdomain = {git = "https://bitbucket.org/ymotongpoo/sphinxcontrib-golangdomain"}
sphinx-autoapi = {git = "https://github.com/rtfd/sphinx-autoapi"}
nbsphinx = "*"
sphinx_bootstrap_theme = "*"
memory-profiler = "*"
jupyter-console = "*"
matplotlib = "*"
Expand Down
26 changes: 26 additions & 0 deletions docs/source/nbs/Rules.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,32 @@
"arche.rules.category.get_difference(df, target_df, [\"category\"]).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Compare\n",
"### Fields"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"help(arche.rules.compare.fields)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"arche.rules.compare.fields(df, target_df, [\"part_number\", \"name\", \"uom\"]).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
1 change: 1 addition & 0 deletions src/arche/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import * # noqa

__version__ = "0.3.6"
SH_URL = "https://app.scrapinghub.com/p" # noqa
Expand Down
9 changes: 9 additions & 0 deletions src/arche/arche.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from arche.readers.schema import Schema, SchemaSource
from arche.report import Report
import arche.rules.category as category_rules
import arche.rules.compare as compare
import arche.rules.coverage as coverage_rules
import arche.rules.duplicates as duplicate_rules
import arche.rules.json_schema as schema_rules
Expand Down Expand Up @@ -256,3 +257,11 @@ def compare_with_customized_rules(self, source_items, target_items, tagged_field
price_rules.compare_prices_for_same_names,
]:
self.save_result(r(source_items.df, target_items.df, tagged_fields))
self.save_result(
compare.tagged_fields(
source_items.df,
target_items.df,
tagged_fields,
["product_url_field", "name_field"],
)
)
4 changes: 2 additions & 2 deletions src/arche/readers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pandas as pd
from scrapinghub import ScrapinghubClient
from scrapinghub.client.jobs import Job
from tqdm import tqdm_notebook
from tqdm.notebook import tqdm

RawItems = Iterable[Dict[str, Any]]

Expand All @@ -33,7 +33,7 @@ def categorize(df: pd.DataFrame) -> pd.DataFrame:
"""Cast columns with repeating values to `category` type to save memory"""
if len(df) < 100:
return
for c in tqdm_notebook(df.columns, desc="Categorizing"):
for c in tqdm(df.columns, desc="Categorizing"):
try:
if df[c].nunique(dropna=False) <= 10:
df[c] = df[c].astype("category")
Expand Down
4 changes: 2 additions & 2 deletions src/arche/rules/category.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from arche.rules.result import Outcome, Result
import pandas as pd
from tqdm import tqdm_notebook
from tqdm.notebook import tqdm


def get_difference(
Expand Down Expand Up @@ -97,7 +97,7 @@ def get_categories(df: pd.DataFrame, max_uniques: int = 10) -> Result:
columns = find_likely_cats(df, max_uniques)
result.stats = [
value_counts
for value_counts in tqdm_notebook(
for value_counts in tqdm(
map(lambda c: df[c].value_counts(dropna=False), columns),
desc="Finding categories",
total=len(columns),
Expand Down
98 changes: 98 additions & 0 deletions src/arche/rules/compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from typing import Tuple

from arche.readers.schema import TaggedFields
from arche.rules.result import *


MAX_MISSING_VALUES = 6


def fields(
source_df: pd.DataFrame,
target_df: pd.DataFrame,
names: List[str],
normalize: bool = False,
err_thr: float = 0.25,
) -> Result:
"""Finds fields values difference between dataframes.
Args:
names - a list of field names
normalize - if set, all fields converted to str and processed with lower() and strip()
err_thr - sets the failure threshold for missing values
Returns:
Result with same, missing and new values.
"""

def get_difference(
left: pd.Series, right: pd.Series
) -> Tuple[pd.Series, pd.Series, pd.Series]:
return (
left[left.isin(right)],
left[~(left.isin(right))],
right[~(right.isin(left))],
)

result = Result("Fields Difference")
for field in names:
source = source_df[field].dropna()
target = target_df[field].dropna()
if normalize:
source = source.astype(str).str.lower().str.strip()
target = target.astype(str).str.lower().str.strip()
try:
same, new, missing = get_difference(source, target)
except SystemError:
source = source.astype(str)
target = target.astype(str)
same, new, missing = get_difference(source, target)

same.name, new.name, missing.name = (None, None, None)
result.more_stats.update(
{f"{field}": {"same": same, "new": new, "missing": missing}}
)
result.add_info(
f"{len(source)} `non NaN {field}s` - {len(new)} new, {len(same)} same"
)
if len(missing) == 0:
continue

if len(missing) < MAX_MISSING_VALUES:
msg = ", ".join(missing.unique().astype(str))
else:
msg = f"{', '.join(missing.unique()[:5].astype(str))}..."
msg = f"{msg} `{field}s` are missing"
if len(missing) / len(target_df) >= err_thr:
result.add_error(
f"{len(missing)} `{field}s` are missing",
errors={msg: set(missing.index)},
)
else:
result.add_info(
f"{len(missing)} `{field}s` are missing",
errors={msg: set(missing.index)},
)
return result


def tagged_fields(
source_df: pd.DataFrame,
target_df: pd.DataFrame,
tagged_fields: TaggedFields,
tags: List[str],
) -> Result:
"""Compare fields tagged with `tags` between two dataframes."""
name = f"{', '.join(tags)} Fields Difference"
result = Result(name)
fields_names: List[str] = list()
for tag in tags:
tag_fields = tagged_fields.get(tag)
if tag_fields:
fields_names.extend(tag_fields)
if not fields_names:
result.add_info(Outcome.SKIPPED)
return result
result = fields(source_df, target_df, fields_names)
result.name = name
return result
6 changes: 2 additions & 4 deletions src/arche/rules/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from arche.rules.result import Outcome, Result
import numpy as np
import pandas as pd
from tqdm import tqdm_notebook
from tqdm.notebook import tqdm


def compare_boolean_fields(
Expand Down Expand Up @@ -94,9 +94,7 @@ def garbage_symbols(df: pd.DataFrame) -> Result:
row_keys: Set = set()
rule_result = Result("Garbage Symbols", items_count=len(df))

for column in tqdm_notebook(
df.select_dtypes([np.object]).columns, desc="Garbage Symbols"
):
for column in tqdm(df.select_dtypes([np.object]).columns, desc="Garbage Symbols"):
matches = df[column].apply(str).str.extractall(garbage, flags=re.IGNORECASE)
if not matches.empty:
error_keys = df.loc[matches.unstack().index.values].index
Expand Down
65 changes: 11 additions & 54 deletions src/arche/rules/price.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ def compare_was_now(df: pd.DataFrame, tagged_fields: TaggedFields):

def compare_prices_for_same_urls(
source_df: pd.DataFrame, target_df: pd.DataFrame, tagged_fields: TaggedFields
):
) -> Result:
"""For each pair of items that have the same `product_url_field` tagged field,
compare `product_price_field` field
Returns:
A result containing pairs of items with same `product_url_field`
from `source_df` and `target_df` which `product_price_field` differ,
missing and new `product_url_field` tagged fields.
A result containing pairs of items from `source_df` and `target_df`
which `product_price_field` differ.
"""
result = Result("Compare Prices For Same Urls")
url_field_list: Optional[List[str]] = tagged_fields.get("product_url_field")
Expand All @@ -90,31 +89,12 @@ def compare_prices_for_same_urls(
same_urls = source_df[(source_df[url_field].isin(target_df[url_field].values))][
url_field
]
new_urls = source_df[~(source_df[url_field].isin(target_df[url_field].values))][
url_field
]
missing_urls = target_df[(~target_df[url_field].isin(source_df[url_field].values))][
url_field
]

errors = {}
for url, group in missing_urls.groupby(missing_urls):
errors[f"Missing {url}"] = set(group.index)

if not missing_urls.empty:
result.add_info(
f"{len(missing_urls)} urls missing from the tested job", errors=errors
)
if not new_urls.empty:
result.add_info(f"{len(new_urls)} new urls in the tested job")
result.add_info(f"{len(same_urls)} same urls in both jobs")

diff_prices_count = 0
price_field_tag = tagged_fields.get("product_price_field")
if not price_field_tag:
price_fields = tagged_fields.get("product_price_field")
if not price_fields:
result.add_info("product_price_field tag is not set")
else:
price_field = price_field_tag[0]
price_field = price_fields[0]
detailed_messages = []
for url in same_urls:
if url.strip() != "nan":
Expand All @@ -130,7 +110,6 @@ def compare_prices_for_same_urls(
and is_number(target_price)
and ratio_diff(source_price, target_price) > 0.1
):
diff_prices_count += 1
source_key = source_df[source_df[url_field] == url].index[0]
target_key = target_df[target_df[url_field] == url].index[0]
msg = (
Expand All @@ -139,7 +118,7 @@ def compare_prices_for_same_urls(
)
detailed_messages.append(msg)

res = f"{len(same_urls)} checked, {diff_prices_count} errors"
res = f"{len(same_urls)} checked, {len(detailed_messages)} errors"
if detailed_messages:
result.add_error(res, detailed="\n".join(detailed_messages))
else:
Expand Down Expand Up @@ -214,33 +193,12 @@ def compare_prices_for_same_names(
same_names = source_df[(source_df[name_field].isin(target_df[name_field].values))][
name_field
]
new_names = source_df[~(source_df[name_field].isin(target_df[name_field].values))][
name_field
]
missing_names = target_df[
~(target_df[name_field].isin(source_df[name_field].values))
][name_field]

errors = {}
for name, group in missing_names.groupby(missing_names):
errors[f"Missing {name}"] = set(group.index)

if not missing_names.empty:
result.add_info(
f"{len(missing_names)} names missing from the tested job", errors=errors
)
if not new_names.empty:
result.add_info(f"{len(new_names)} new names in the tested job")
result.add_info(f"{len(same_names)} same names in both jobs")

price_tag = "product_price_field"
price_field_tag = tagged_fields.get(price_tag)
if not price_field_tag:
price_fields = tagged_fields.get("product_price_field")
if not price_fields:
result.add_info("product_price_field tag is not set")
return result

price_field = price_field_tag[0]
count = 0
price_field = price_fields[0]

detailed_messages = []
for name in same_names:
Expand All @@ -249,7 +207,6 @@ def compare_prices_for_same_names(
target_price = target_df[target_df[name_field] == name][price_field].iloc[0]
if is_number(source_price) and is_number(target_price):
if ratio_diff(source_price, target_price) > 0.1:
count += 1
source_key = source_df[source_df[name_field] == name].index[0]
target_key = target_df[target_df[name_field] == name].index[0]
msg = (
Expand All @@ -258,7 +215,7 @@ def compare_prices_for_same_names(
)
detailed_messages.append(msg)

result_msg = f"{len(same_names)} checked, {count} errors"
result_msg = f"{len(same_names)} checked, {len(detailed_messages)} errors"
if detailed_messages:
result.add_error(result_msg, detailed="\n".join(detailed_messages))
else:
Expand Down
33 changes: 5 additions & 28 deletions src/arche/rules/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,35 +65,12 @@ class Result:

name: str
messages: Dict[Level, List[Message]] = field(default_factory=dict)
_stats: Optional[List[Stat]] = field(default_factory=list)
items_count: Optional[int] = 0
_stats: List[Stat] = field(default_factory=list)
more_stats: Dict[str, Dict] = field(default_factory=dict)
items_count: int = 0
_err_keys: Set[Union[str, int]] = field(default_factory=set)
_err_items_count: Optional[int] = 0
_figures: Optional[List[go.FigureWidget]] = field(default_factory=list)

def __eq__(self, other):
for left, right in zip(self.stats, other.stats):
if not self.tensors_equal(left, right):
return False

return (
self.name == other.name
and self.messages == other.messages
and self.items_count == other.items_count
and self.err_items_count == other.err_items_count
and len(self.stats) == len(other.stats)
)

@staticmethod
def tensors_equal(left: Stat, right: Stat):
try:
if isinstance(left, pd.DataFrame):
pd.testing.assert_frame_equal(left, right)
else:
pd.testing.assert_series_equal(left, right)
return True
except AssertionError:
return False
_err_items_count: int = 0
_figures: List[go.FigureWidget] = field(default_factory=list)

@property
def info(self):
Expand Down
Loading

0 comments on commit 77075db

Please sign in to comment.