Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregates Reconcile documentation #779

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions docs/aggregates_reconcile_configurations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Remorph Aggregates Reconciliation


Aggregates Reconcile is a utility to streamline the reconciliation process, specific aggregate metric is compared
between source and target data residing on Databricks.

## Summary

| operation_name | sample visualisation | description | key outputs captured in the recon metrics tables |
|--------------------------|---------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **aggregates-reconcile** | [data](visualisation.md#data) | reconciles the data for each aggregate metric - ```join_columns``` are used to identify the mismatches at aggregated metric level | - **mismatch_data**(sample data with mismatches captured at aggregated metric level )<br> - **missing_in_src**(sample rows that are available in target but missing in source)<br> - **missing_in_tgt**(sample rows that are available in source but are missing in target)<br> |


* [Supported Aggregate Functions](#supported-aggregate-functions)
* [Flow Chart](#flow-chart)
* [Supported Source Systems](#supported-source-systems)
* [TABLE Config Elements](#table-config-elements)
* [Aggregate Attributes](#aggregate-attributes)
* [Key Considerations](#key-considerations)
* [Aggregates Reconciliation Example](#aggregates-reconciliation-example)
* [DataFlow Example](#dataflow-example)


## Supported Aggregate Functions


| <a href="https://docs.databricks.com/en/sql/language-manual/sql-ref-functions-builtin.html#aggregate-functions" target="_blank"> Aggregate Functions </a> |
|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/min.html" target="_blank">**min**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/max.html" target="_blank">**max**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/count.html" target="_blank">**count**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/sum.html" target="_blank">**sum**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/avg.html" target="_blank">**avg**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/mean.html" target="_blank">**mean**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/mode.html" target="_blank">**mode**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/percentile.html" target="_blank">**percentile**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/stddev.html" target="_blank">**stddev**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/variance.html" target="_blank">**variance**</a> |
| <a href="https://docs.databricks.com/en/sql/language-manual/functions/median.html" target="_blank">**median**</a> |



[[back to top](#remorph-aggregates-reconciliation)]


## Flow Chart

```mermaid
flowchart TD
Aggregates-Reconcile --> MISMATCH_ROWS
Aggregates-Reconcile --> MISSING_IN_SRC
Aggregates-Reconcile --> MISSING_IN_TGT
```


[[back to top](#remorph-aggregates-reconciliation)]


## Supported Source Systems

All [source systems](../recon_configurations/README.md#supported-source-system) supported by reconcile

[[back to top](#remorph-aggregates-reconciliation)]


### TABLE Config Elements:

<table>
<tr>
<th>Python</th>
<th>JSON</th>
</tr>
<tr>
<td>
<pre lang="python">
@dataclass
class Table:
source_name: str
target_name: str
<b>aggregates: list[Aggregate] | None = None</b>
join_columns: list[str] | None = None
jdbc_reader_options: JdbcReaderOptions | None = None
select_columns: list[str] | None = None
drop_columns: list[str] | None = None
column_mapping: list[ColumnMapping] | None = None
transformations: list[Transformation] | None = None
column_thresholds: list[ColumnThresholds] | None = None
filters: Filters | None = None
table_thresholds: list[TableThresholds] | None = None
</pre>
</td>
<td>
<pre lang="json">
{
"source_name": "&lt;SOURCE_NAME&gt",
"target_name": "&lt;TARGET_NAME&gt",
"join_columns": ["&lt;COLUMN_NAME_1&gt","&lt;COLUMN_NAME_2&gt"],
"aggregates": [{
"type": "MIN",
"agg_columns": ["&lt;COLUMN_NAME_3&gt"],
"group_by_columns": ["&lt;GROUP_COLUMN_NAME&gt"]
},
{
"type": "MAX",
"agg_columns": ["&lt;COLUMN_NAME_4&gt"],
}],
"jdbc_reader_options": null,
"select_columns": null,
"drop_columns": null,
"column_mapping": null,
"transformation": null,
"column_thresholds": null,
"filters": null,
"table_thresholds": null
}
</pre>
</td>
</tr>
</table>



### Aggregate Attributes:

| config_name | data_type | description | required/optional | example_value |
|------------------|--------------|-----------------------------------------------------------------------|------------------------|------------------------|
| type | string | [Supported Aggregate Functions](#supported-aggregate-functions) | required | MIN |
| agg_columns | list[string] | list of columns names on which aggregate function needs to be applied | required | ["product_discount"] |
| group_by_columns | list[string] | list of column names on which grouping needs to be applied | optional(default=None) | ["product_id"] or None |


[[back to top](#remorph-aggregates-reconciliation)]



### Key Considerations:

1. The aggregate column names, group by columns and type are always converted to lowercase and considered for reconciliation.
2. Currently, it doesn't support case insensitivity and does not have collation support
3. The queries with “group by” column(s) are compared based on the same group by columns.
4. The queries without “group by” column(s) are compared row-to-row.
5. Existing features like `column_mapping`, `transformations`, `JDBCReaderOptions` and `filters` are leveraged for the aggregate metric reconciliation.
6. Existing `select_columns` and `drop_columns` are not considered for the aggregate metric reconciliation.
7. Even though the user provides the `select_columns` and `drop_columns`, those are not considered.

[[back to top](#remorph-aggregates-reconciliation)]


## Aggregates Reconciliation Example

For more examples, please refer to [sample config][link].

[link]: aggregates_reconcile_config_samples.md

[[back to top](#remorph-aggregates-reconciliation)]


## DataFlow Example

Aggregates Reconcile Data [Visualisation](visualisation.md)

[[back to top](#remorph-aggregates-reconciliation)]
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Aggregates Reconcile Config

Consider the below tables that we want to reconcile:

| category | catalog | schema | table_name | schema | primary_key |
|----------|----------------|---------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------|-------------|
| source | source_catalog | source_schema | product_prod | p_id INT,<br>p_name STRING,<br>price NUMBER,<br>discount DECIMAL(5,3),<br>offer DOUBLE,<br>creation_date DATE<br>comment STRING<br> | p_id |
| target | target_catalog | target_schema | product | product_id INT,<br>product_name STRING,<br>price NUMBER,<br>discount DECIMAL(5,3),<br>offer DOUBLE,<br>creation_date DATE<br>comment STRING<br> | product_id |

## Aggregates-Reconcile Run with Join, Column Mappings, Transformation, Filter and JDBC ReaderOptions configs

> **Note:** Even though the user provides the `select_columns` and `drop_columns`, those are not considered.


```json
{
"source_catalog": "source_catalog",
"source_schema": "source_schema",
"target_catalog": "target_catalog",
"target_schema": "target_schema",
"tables": [
{
"aggregates": [{
"type": "MIN",
"agg_columns": ["discount"],
"group_by_columns": ["p_id"]
},
{
"type": "AVG",
"agg_columns": ["discount"],
"group_by_columns": ["p_id"]
},
{
"type": "MAX",
"agg_columns": ["p_id"],
"group_by_columns": ["creation_date"]
},
{
"type": "MAX",
"agg_columns": ["p_name"]
},
{
"type": "SUM",
"agg_columns": ["p_id"]
},
{
"type": "MAX",
"agg_columns": ["creation_date"]
},
{
"type": "MAX",
"agg_columns": ["p_id"],
"group_by_columns": ["creation_date"]
}
],
"source_name": "product_prod",
"target_name": "product",
"jdbc_reader_options": {
"number_partitions": 10,
"partition_column": "p_id",
"lower_bound": "0",
"upper_bound": "10000000"
},
"join_columns": [
"p_id"
],
"drop_columns": [
"comment"
],
"column_mapping": [
{
"source_name": "p_id",
"target_name": "product_id"
},
{
"source_name": "p_name",
"target_name": "product_name"
}
],
"transformations": [
{
"column_name": "creation_date",
"source": "creation_date",
"target": "to_date(creation_date,'yyyy-mm-dd')"
}
],
"column_thresholds": [
{
"column_name": "price",
"upper_bound": "-50",
"lower_bound": "50",
"type": "float"
}
],
"table_thresholds": [
{
"lower_bound": "0%",
"upper_bound": "5%",
"model": "mismatch"
}
],
"filters": {
"source": "p_id > 0",
"target": "product_id > 0"
}
}
]
}

```
112 changes: 112 additions & 0 deletions docs/aggregates_reconcile_configurations/visualisation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
## data
### with group by
```mermaid
flowchart TB
subgraph source
direction TB
A["id: 1<br>city: New York<br>population: 100<br>state: NY"]
B["id: 2<br>city: Yonkers<br>population: 10<br>state: NY"]
C["id: 3<br>city: Los Angeles<br>population: 300<br>state: CA"]
D["id: 4<br>city: San Francisco<br>population: 30<br>state: CA"]
E["id: 6<br>city: Washington<br>population: 600<br>state: DC"]
end

subgraph target
direction TB
F["id: 1<br>city: New York<br>population: 100<br>state: NY"]
G["id: 2<br>city: Yonkers<br>population: 10<br>state: NY"]
H["id: 3<br>city: Los Angeles<br>population: 300<br>state: CA"]
I["id: 5<br>city: San Diego<br>population: 40<br>state: CA"]
J["id: 7<br>city: Phoenix<br>population: 500<br>state: AZ"]
end

subgraph source-aggregated
direction TB
K["sum(population): 110<br>state: NY"]
L["sum(population): 330<br>state: CA"]
M["sum(population): 600<br>state: DC"]
end

subgraph target-aggregated
direction TB
N["sum(population): 110<br>state: NY"]
O["sum(population): 340<br>state: CA"]
P["sum(population): 500<br>state: AZ"]
end

subgraph missing_in_src
direction TB
Q["sum(population): 500<br>state: AZ"]
end

subgraph missing_in_tgt
direction TB
R["sum(population): 600<br>state: DC"]
end

subgraph mismatch
direction TB
S["state: CA<br>source_sum(population): 330<br>target_sum(population): 340<br>sum(population)_match: false"]
end

subgraph aggregates-reconcile
direction TB
T["aggregate: <b>SUM</b> as type<br><b>population</b> as agg-columns<br><b>state</b>as group_by_columns"]
end

source --> source-aggregated
target --> target-aggregated
source-aggregated --> aggregates-reconcile
target-aggregated --> aggregates-reconcile
aggregates-reconcile --> missing_in_src
aggregates-reconcile --> missing_in_tgt
aggregates-reconcile --> mismatch
```


### without group by
```mermaid
flowchart TB
subgraph source
direction TB
A["id: 1<br>city: New York<br>population: 100<br>state: NY"]
D["id: 4<br>city: San Francisco<br>population: 30<br>state: CA"]
E["id: 6<br>city: Washington<br>population: 600<br>state: DC"]
end

subgraph target
direction TB
F["id: 1<br>city: New York<br>population: 100<br>state: NY"]
I["id: 5<br>city: San Diego<br>population: 40<br>state: CA"]
J["id: 7<br>city: Phoenix<br>population: 500<br>state: AZ"]
end

subgraph source-aggregated
direction TB
K["min(population): 30"]
end

subgraph target-aggregated
direction TB
O["min(population): 40"]
end


subgraph mismatch
direction TB
S["source_min(population): 30<br>target_min(population): 40<br>min(population)_match: false"]
end

subgraph aggregates-reconcile
direction TB
T["aggregate: <b>MIN</b> as type<br><b>population</b> as agg-columns"]
end

source --> source-aggregated
target --> target-aggregated
source-aggregated --> aggregates-reconcile
target-aggregated --> aggregates-reconcile
aggregates-reconcile --> mismatch
```


Binary file added docs/img/aggregates-reconcile-help.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/img/aggregates-reconcile-run.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/recon_configurations/reconcile_config_samples.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Consider the below tables that we want to reconcile:

| categroy | catalog | schema | table_name | schema | primary_key |
| category | catalog | schema | table_name | schema | primary_key |
|----------|----------------|---------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------|-------------|
| source | source_catalog | source_schema | product_prod | p_id INT,<br>p_name STRING,<br>price NUMBER,<br>discount DECIMAL(5,3),<br>offer DOUBLE,<br>creation_date DATE<br>comment STRING<br> | p_id |
| target | target_catalog | target_schema | product | product_id INT,<br>product_name STRING,<br>price NUMBER,<br>discount DECIMAL(5,3),<br>offer DOUBLE,<br>creation_date DATE<br>comment STRING<br> | product_id |
Expand Down
Loading