Reconcile is an automated tool designed to streamline the reconciliation process between source data and target data residing on Databricks. Currently, the platform exclusively offers support for Snowflake, Oracle and other Databricks tables as the primary data source. This tool empowers users to efficiently identify discrepancies and variations in data when comparing the source with the Databricks target.
- Types of Report Supported
- Report Type-Flow Chart
- Supported Source System
- TABLE Config JSON filename
- TABLE Config Elements
- Key Considerations for Oracle JDBC Reader Options
- Reconciliation Example
- DataFlow Example
- Aggregates Reconcile
report type | sample visualisation | description | key outputs captured in the recon metrics tables |
---|---|---|---|
schema | schema | reconcile the schema of source and target. - validate the datatype is same or compatible |
- schema_comparison - schema_difference |
row | row | reconcile the data only at row level(hash value of the source row is matched with the hash value of the target).Preferred when there are no join columns identified between source and target. | - missing_in_src(sample rows that are available in target but missing in source + sample rows in the target that don't match with the source) - missing_in_tgt(sample rows that are available in source but are missing in target + sample rows in the source that doesn't match with target) NOTE: the report won't differentiate the mismatch and missing here. |
data | data | reconcile the data at row and column level- join_columns will help us to identify mismatches at each row and column level |
- mismatch_data(the sample data with mismatches captured at each column and row level ) - missing_in_src(sample rows that are available in target but missing in source) - missing_in_tgt(sample rows that are available in source but are missing in target) - threshold_mismatch(configured column will be reconciled based on percentile or threshold boundary or date boundary) - mismatch_columns(consolidated list of columns that has mismatches in them) |
all | all | this is a combination of data + schema | - data + schema outputs |
flowchart TD
REPORT_TYPE --> DATA
REPORT_TYPE --> SCHEMA
REPORT_TYPE --> ROW
REPORT_TYPE --> ALL
flowchart TD
SCHEMA --> SCHEMA_VALIDATION
flowchart TD
ROW --> MISSING_IN_SRC
ROW --> MISSING_IN_TGT
flowchart TD
DATA --> MISMATCH_ROWS
DATA --> MISSING_IN_SRC
DATA --> MISSING_IN_TGT
flowchart TD
ALL --> MISMATCH_ROWS
ALL --> MISSING_IN_SRC
ALL --> MISSING_IN_TGT
ALL --> SCHEMA_VALIDATION
Source | Schema | Row | Data | All |
---|---|---|---|---|
Oracle | Yes | Yes | Yes | Yes |
Snowflake | Yes | Yes | Yes | Yes |
Databricks | Yes | Yes | Yes | Yes |
The config file must be named as recon_config_<DATA_SOURCE>_<SOURCE_CATALOG_OR_SCHEMA>_<REPORT_TYPE>.json
and should be placed in the remorph root directory .remorph
within the Databricks Workspace.
The filename pattern would remain the same for all the data_sources.
Please find the Table Recon
filename examples below for the Snowflake
, Oracle
, and Databricks
source systems.
Data Source | Reconcile Config | Table Recon filename |
---|---|---|
Snowflake |
database_config:
source_catalog: sample_data
source_schema: default
...
metadata_config:
...
data_source: snowflake
report_type: all
...
|
recon_config_snowflake_sample_data_all.json |
Oracle |
database_config:
source_schema: orc
...
metadata_config:
...
data_source: oracle
report_type: data
...
|
recon_config_oracle_orc_data.json |
Databricks (Hive MetaStore) |
database_config:
source_schema: hms
...
metadata_config:
...
data_source: databricks
report_type: schema
...
|
recon_config_databricks_hms_schema.json |
Note: the filename must be created in the same case as <SOURCE_CATALOG_OR_SCHEMA> is defined. For example, if the source schema is defined as
ORC
in thereconcile
config, the filename should berecon_config_oracle_ORC_data.json
.
Python | JSON |
---|---|
@dataclass
class Table:
source_name: str
target_name: str
aggregates: list[Aggregate] | None = None
join_columns: list[str] | None = None
jdbc_reader_options: JdbcReaderOptions | None = None
select_columns: list[str] | None = None
drop_columns: list[str] | None = None
column_mapping: list[ColumnMapping] | None = None
transformations: list[Transformation] | None = None
column_thresholds: list[ColumnThresholds] | None = None
filters: Filters | None = None
table_thresholds: list[TableThresholds] | None = None |
{
"source_name": "<SOURCE_NAME>",
"target_name": "<TARGET_NAME>",
"aggregates": null,
"join_columns": ["<COLUMN_NAME_1>","<COLUMN_NAME_2>"],
"jdbc_reader_options": null,
"select_columns": null,
"drop_columns": null,
"column_mapping": null,
"transformation": null,
"column_thresholds": null,
"filters": null,
"table_thresholds": null
} |
config_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
source_name | string | name of the source table | required | product |
target_name | string | name of the target table | required | product |
aggregates | list[Aggregate] | list of aggregates, refer Aggregate for more information | optional(default=None) | "aggregates": [{"type": "MAX", "agg_columns": ["<COLUMN_NAME_4>"]}], |
join_columns | list[string] | list of column names which act as the primary key to the table | optional(default=None) | ["product_id"] or ["product_id", "order_id"] |
jdbc_reader_options | string | jdbc_reader_option, which helps to parallelise the data read from jdbc sources based on the given configuration.For more info jdbc_reader_options | optional(default=None) | "jdbc_reader_options": {"number_partitions": 10,"partition_column": "s_suppkey","upper_bound": "10000000","lower_bound": "10","fetch_size":"100"} |
select_columns | list[string] | list of columns to be considered for the reconciliation process | optional(default=None) | ["id", "name", "address"] |
drop_columns | list[string] | list of columns to be eliminated from the reconciliation process | optional(default=None) | ["comment"] |
column_mapping | list[ColumnMapping] | list of column_mapping that helps in resolving column name mismatch between src and tgt, e.g., "id" in src and "emp_id" in tgt.For more info column_mapping | optional(default=None) | "column_mapping": [{"source_name": "id","target_name": "emp_id"}] |
transformations | list[Transformations] | list of user-defined transformations that can be applied to src and tgt columns in case of any incompatibility data types or explicit transformation is applied during migration.For more info transformations | optional(default=None) | "transformations": [{"column_name": "s_address","source": "trim(s_address)","target": "trim(s_address)"}] |
column_thresholds | list[ColumnThresholds] | list of threshold conditions that can be applied on the columns to match the minor exceptions in data. It supports percentile, absolute, and date fields. For more info column_thresholds | optional(default=None) | "thresholds": [{"column_name": "sal", "lower_bound": "-5%", "upper_bound": "5%", "type": "int"}] |
table_thresholds | list[TableThresholds] | list of table thresholds conditions that can be applied on the tables to match the minor exceptions in mismatch count. It supports percentile, absolute. For more info table_thresholds | optional(default=None) | "table_thresholds": [{"lower_bound": "0%", "upper_bound": "5%", "model": "mismatch"}] |
filters | Filters | filter expr that can be used to filter the data on src and tgt based on respective expressions | optional(default=None) | "filters": {"source": "lower(dept_name)>’ it’”, "target": "lower(department_name)>’ it’”} |
Python | JSON |
---|---|
@dataclass
class JdbcReaderOptions:
number_partitions: int
partition_column: str
lower_bound: str
upper_bound: str
fetch_size: int = 100 |
"jdbc_reader_options":{
"number_partitions": "<NUMBER_PARTITIONS>",
"partition_column": "<PARTITION_COLUMN>",
"lower_bound": "<LOWER_BOUND>",
"upper_bound": "<UPPER_BOUND>",
"fetch_size": "<FETCH_SIZE>"
} |
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
number_partitions | string | the number of partitions for reading input data in parallel | required | "200" |
partition_column | string | Int/date/timestamp parameter defining the column used for partitioning, typically the primary key of the source table. Note that this parameter accepts only one column, which is especially crucial when dealing with a composite primary key. In such cases, provide the column with higher cardinality. | required | "employee_id |
upper_bound | string | integer or date or timestamp without time zone value as string), that should be set appropriately (usually the maximum value in case of non-skew data) so the data read from the source should be approximately equally distributed | required | "1" |
lower_bound | string | integer or date or timestamp without time zone value as string), that should be set appropriately (usually the minimum value in case of non-skew data) so the data read from the source should be approximately equally distributed | required | "100000" |
fetch_size | string | This parameter influences the number of rows fetched per round-trip between Spark and the JDBC database, optimising data retrieval performance. Adjusting this option significantly impacts the efficiency of data extraction, controlling the volume of data retrieved in each fetch operation. More details on configuring fetch size can be found here | optional(default="100") | "10000" |
For Oracle source, the following options are automatically set:
- "oracle.jdbc.mapDateToTimestamp": "False",
- "sessionInitStatement": "BEGIN dbms_session.set_nls('nls_date_format', '''YYYY-MM-DD''');dbms_session.set_nls('nls_timestamp_format', '''YYYY-MM-DD HH24:MI:SS''');END;"
While configuring Recon for Oracle source, the above options should be taken into consideration.
Python | JSON |
---|---|
@dataclass
class ColumnMapping:
source_name: str
target_name: str |
"column_mapping":[
{
"source_name": "<SOURCE_COLUMN_NAME>",
"target_name": "<TARGET_COLUMN_NAME>"
}
] |
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
source_name | string | source column name | required | "dept_id" |
target_name | string | target column name | required | "department_id" |
Python | JSON |
---|---|
@dataclass
class Transformation:
column_name: str
source: str
target: str | None = None |
"transformations":[
{
"column_name": "<COLUMN_NAME>",
"source": "<TRANSFORMATION_EXPRESSION>",
"target": "<TRANSFORMATION_EXPRESSION>"
}
] |
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
column_name | string | the column name on which the transformation to be applied | required | "s_address" |
source | string | the transformation sql expr to be applied on source column | required | "trim(s_address)" or "s_address" |
target | string | the transformation sql expr to be applied on source column | required | "trim(s_address)" or "s_address" |
Note: Reconciliation also takes an udf in the transformation expr.Say for eg. we have a udf named sort_array_input() that takes an unsorted array as input and returns an array sorted.We can use that in transformation as below:
transformations=[Transformation(column_name)="array_col",source=sort_array_input(array_col),target=sort_array_input(array_col)]
Note:
NULL
values are defaulted to_null_recon_
using the transformation expressions in these files: 1. expression_generator.py 2. sampling_query.py. If User is looking for any specific behaviour, they can override these rules using transformations accordingly.
Transformation Expressions | |||
---|---|---|---|
filename | function / variable | transformation_rule | description |
sampling_query.py | _get_join_clause | transform(coalesce, default="_null_recon_", is_string=True) | Applies the coalesce transformation function for String column and defaults to `_null_recon_` if column is NULL |
expression_generator.py | DataType_transform_mapping | (coalesce, default='_null_recon_', is_string=True) | Default String column Transformation rule for all dialects. Applies the coalesce transformation function and defaults to `_null_recon_` if column is NULL |
expression_generator.py | DataType_transform_mapping | "oracle": DataType...NCHAR: ..."NVL(TRIM(TO_CHAR..,'_null_recon_')" | Transformation rule for oracle dialect 'NCHAR' datatype. Applies TO_CHAR, TRIM transformation functions. If column is NULL, then defaults to `_null_recon_` |
expression_generator.py | DataType_transform_mapping | "oracle": DataType...NVARCHAR: ..."NVL(TRIM(TO_CHAR..,'_null_recon_')" | Transformation rule for oracle dialect 'NVARCHAR' datatype. Applies TO_CHAR, TRIM transformation functions. If column is NULL, then defaults to `_null_recon_` |
Python | JSON |
---|---|
@dataclass
class ColumnThresholds:
column_name: str
lower_bound: str
upper_bound: str
type: str |
"column_thresholds":[
{
"column_name": "<COLUMN_NAME>",
"lower_bound": "<LOWER_BOUND>",
"upper_bound": "<UPPER_BOUND>",
"type": "<DATA_TYPE>"
}
] |
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
column_name | string | the column that should be considered for column threshold reconciliation | required | "product_discount" |
lower_bound | string | the lower bound of the difference between the source value and the target value | required | -5% |
upper_bound | string | the upper bound of the difference between the source value and the target value | required | 5% |
type | string | The user must specify the column type. Supports SQLGLOT DataType.NUMERIC_TYPES and DataType.TEMPORAL_TYPES. | required | int |
Python | JSON |
---|---|
@dataclass
class TableThresholds:
lower_bound: str
upper_bound: str
model: str |
"table_thresholds":[
{
"lower_bound": "<LOWER_BOUND>",
"upper_bound": "<UPPER_BOUND>",
"model": "<MODEL>"
}
] |
- The threshold bounds for the table must be non-negative, with the lower bound not exceeding the upper bound.
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
lower_bound | string | the lower bound of the difference between the source mismatch and the target mismatch count | required | 0% |
upper_bound | string | the upper bound of the difference between the source mismatch and the target mismatch count | required | 5% |
model | string | The user must specify on which table model it should be applied; for now, we support only "mismatch" | required | int |
Python | JSON |
---|---|
@dataclass
class Filters:
source: str | None = None
target: str | None = None |
"filters":{
"source": "<FILTER_EXPRESSION>",
"target": "<FILTER_EXPRESSION>"
} |
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
source | string | the sql expression to filter the data from source | optional(default=None) | "lower(dept_name)='finance'" |
target | string | the sql expression to filter the data from target | optional(default=None) | "lower(dept_name)='finance'" |
- The column names are always converted to lowercase and considered for reconciliation.
- Currently, it doesn't support case insensitivity and doesn't have collation support
- Table Transformation internally considers the default value as the column value. It doesn't apply any default
transformations
if not provided.
eg:Transformation(column_name="address",source_name=None,target_name="trim(s_address)")
For the given example, the source transformation is None, so the raw value in the source is considered for reconciliation. - If no user transformation is provided for a given column in the configuration by default, depending on the source data type, our reconciler will apply default transformation on both source and target to get the matching hash value in source and target. Please find the detailed default transformations here.
- Always the column reference to be source column names in all the configs, except Transformations and Filters as these are dialect-specific SQL expressions that are applied directly in the SQL.
- Transformations and Filters are always should be in their respective dialect SQL expressions, and the reconciler will not apply any logic on top of this.
-
Download
ojdbc8.jar
from Oracle: Visit the official Oracle website to acquire theojdbc8.jar
JAR file. This file is crucial for establishing connectivity between Databricks and Oracle databases. -
Install the JAR file on Databricks: Upon completing the download, install the JAR file onto your Databricks cluster. Refer to this page For comprehensive instructions on uploading a JAR file, Python egg, or Python wheel to your Databricks workspace.
- Install ojdbc8 library from Maven: Follow this guide to install the Maven library on a cluster. Refer to this document for obtaining the Maven coordinates.
This installation is a necessary step to enable seamless comparison between Oracle and Databricks, ensuring that the required Oracle JDBC functionality is readily available within the Databricks environment.
source_type | data_type | source_transformation | target_transformation | source_value_example | target_value_example | comments |
---|---|---|---|---|---|---|
Oracle | number(10,5) | trim(to_char(coalesce(<col_name>,0.0), ’99990.99999’)) | cast(coalesce(<col_name>,0.0) as decimal(10,5)) | 1.00 | 1.00000 | this can be used for any precision and scale by adjusting accordingly in the transformation |
Snowflake | array | array_to_string(array_compact(<col_name>),’,’) | concat_ws(’,’, <col_name>) | [1,undefined,2] | [1,2] | in case of removing "undefined" during migration(converts sparse array to dense array) |
Snowflake | array | array_to_string(array_sort(array_compact(<col_name>), true, true),’,’) | concat_ws(’,’, <col_name>) | [2,undefined,1] | [1,2] | in case of removing "undefined" during migration and want to sort the array |
Snowflake | timestamp_ntz | date_part(epoch_second,<col_name>) | unix_timestamp(<col_name>) | 2020-01-01 00:00:00.000 | 2020-01-01 00:00:00.000 | convert timestamp_ntz to epoch for getting a match between Snowflake and data bricks |
For more Reconciliation Config example, please refer to sample config.
Report Types Data Visualisation
Aggregates Reconcile is an utility to streamline the reconciliation process, specific aggregate metric is compared between source and target data residing on Databricks.
operation_name | sample visualisation | description | key outputs captured in the recon metrics tables |
---|---|---|---|
aggregates-reconcile | data | reconciles the data for each aggregate metric - join_columns are used to identify the mismatches at aggregated metric level |
- mismatch_data(sample data with mismatches captured at aggregated metric level ) - missing_in_src(sample rows that are available in target but missing in source) - missing_in_tgt(sample rows that are available in source but are missing in target) |
Aggregate Functions |
---|
min |
max |
count |
sum |
avg |
mean |
mode |
stddev |
variance |
median |
[back to aggregates-reconciliation]
flowchart TD
Aggregates-Reconcile --> MISMATCH_ROWS
Aggregates-Reconcile --> MISSING_IN_SRC
Aggregates-Reconcile --> MISSING_IN_TGT
[back to aggregates-reconciliation]
Python | JSON |
---|---|
@dataclass
class Aggregate:
agg_columns: list[str]
type: str
group_by_columns: list[str] | None = None |
{
"type": "MIN",
"agg_columns": ["<COLUMN_NAME_3>"],
"group_by_columns": ["<GROUP_COLUMN_NAME>"]
} |
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
type | string | Supported Aggregate Functions | required | MIN |
agg_columns | list[string] | list of columns names on which aggregate function needs to be applied | required | ["product_discount"] |
group_by_columns | list[string] | list of column names on which grouping needs to be applied | optional(default=None) | ["product_id"] or None |
[back to aggregates-reconciliation]
Please refer TABLE Config Elements for Class and JSON configs.
Python | JSON |
---|---|
Table(
source_name= "<SOURCE_NAME>",
target_name= "<TARGET_NAME>",
join_columns= ["<COLUMN_NAME_1>", "<COLUMN_NAME_2>"]
aggregates= [
Aggregate(
agg_columns=["<COLUMN_NAME_3>"],
type= "MIN",
group_by_columns= ["<GROUP_COLUMN_NAME>"]
),
Aggregate(
agg_columns=["<COLUMN_NAME_4>"],
type= "max"
)
]
) |
{
"source_name": "<SOURCE_NAME>",
"target_name": "<TARGET_NAME>",
"join_columns": ["<COLUMN_NAME_1>","<COLUMN_NAME_2>"],
"aggregates": [{
"type": "MIN",
"agg_columns": ["<COLUMN_NAME_3>"],
"group_by_columns": ["<GROUP_COLUMN_NAME>"]
},
{
"type": "MAX",
"agg_columns": ["<COLUMN_NAME_4>"],
}],
} |
- The aggregate column names, group by columns and type are always converted to lowercase and considered for reconciliation.
- Currently, it doesn't support aggregates on window function using the OVER clause.
- It doesn't support case insensitivity and does not have collation support
- The queries with “group by” column(s) are compared based on the same group by columns.
- The queries without “group by” column(s) are compared row-to-row.
- Existing features like
column_mapping
,transformations
,JDBCReaderOptions
andfilters
are leveraged for the aggregate metric reconciliation. - Existing
select_columns
anddrop_columns
are not considered for the aggregate metric reconciliation. - Even though the user provides the
select_columns
anddrop_columns
, those are not considered. - If Transformations are defined, those are applied to both the “aggregate columns” and “group by columns”.
[back to aggregates-reconciliation]
Please refer this sample config for detailed example config.
[back to aggregates-reconciliation]
Aggregates Reconcile Data Visualisation