Skip to content

Commit

Permalink
Aggregates Reconcile CLI Implementation (databrickslabs#770)
Browse files Browse the repository at this point in the history
  • Loading branch information
vijaypavann-db authored Aug 12, 2024
1 parent ecf795e commit 2e7d906
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 11 deletions.
2 changes: 2 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ commands:
{{end}}
- name: reconcile
description: Reconcile is an utility to streamline the reconciliation process between source data and target data residing on Databricks.
- name: aggregates-reconcile
description: Aggregates Reconcile is an utility to streamline the reconciliation process, specific aggregate metric is compared between source and target data residing on Databricks.
- name: generate-lineage
description: Utility to generate a lineage of the SQL files
flags:
Expand Down
20 changes: 18 additions & 2 deletions src/databricks/labs/remorph/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os


from databricks.labs.blueprint.cli import App
from databricks.labs.blueprint.entrypoint import get_logger
from databricks.labs.remorph.config import SQLGLOT_DIALECTS, MorphConfig
Expand All @@ -10,6 +9,8 @@
from databricks.labs.remorph.reconcile.runner import ReconcileRunner
from databricks.labs.remorph.lineage import lineage_generator
from databricks.labs.remorph.transpiler.execute import morph
from databricks.labs.remorph.reconcile.execute import RECONCILE_OPERATION_NAME, AGG_RECONCILE_OPERATION_NAME

from databricks.sdk import WorkspaceClient

remorph = App(__file__)
Expand Down Expand Up @@ -105,7 +106,22 @@ def reconcile(w: WorkspaceClient):
ctx.install_state,
ctx.prompts,
)
recon_runner.run()
recon_runner.run(operation_name=RECONCILE_OPERATION_NAME)


@remorph.command
def aggregates_reconcile(w: WorkspaceClient):
"""[EXPERIMENTAL] Reconciles Aggregated source to Databricks datasets"""
ctx = ApplicationContext(w)
logger.info(f"User: {ctx.current_user}")
recon_runner = ReconcileRunner(
ctx.workspace_client,
ctx.installation,
ctx.install_state,
ctx.prompts,
)

recon_runner.run(operation_name=AGG_RECONCILE_OPERATION_NAME)


@remorph.command
Expand Down
6 changes: 4 additions & 2 deletions src/databricks/labs/remorph/deployment/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import InvalidParameterValue
from databricks.sdk.service import compute
from databricks.sdk.service.jobs import Task, PythonWheelTask, JobCluster, JobSettings
from databricks.sdk.service.jobs import Task, PythonWheelTask, JobCluster, JobSettings, JobParameterDefinition

from databricks.labs.remorph.config import ReconcileConfig
from databricks.labs.remorph.reconcile.constants import ReconSourceType

logger = logging.getLogger(__name__)


_TEST_JOBS_PURGE_TIMEOUT = timedelta(hours=1, minutes=15)


Expand Down Expand Up @@ -103,6 +102,8 @@ def _recon_job_settings(
recon_config,
),
],
"max_concurrent_runs": 2,
"parameters": [JobParameterDefinition(name="operation_name", default="reconcile")],
}

def _job_recon_task(self, jobs_task: Task, recon_config: ReconcileConfig) -> Task:
Expand All @@ -125,6 +126,7 @@ def _job_recon_task(self, jobs_task: Task, recon_config: ReconcileConfig) -> Tas
python_wheel_task=PythonWheelTask(
package_name="databricks_labs_remorph",
entry_point="reconcile",
parameters=["{{job.parameters.[operation_name]}}"],
),
)

Expand Down
56 changes: 56 additions & 0 deletions src/databricks/labs/remorph/reconcile/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,17 @@ def validate_input(input_value: str, list_of_value: set, message: str):


def main(*argv) -> None:

logger.debug(f"Arguments received: {argv}")

assert len(sys.argv) == 2, f"Invalid number of arguments: {len(sys.argv)}," f" Operation name must be specified."
operation_name = sys.argv[1]

assert operation_name in {
RECONCILE_OPERATION_NAME,
AGG_RECONCILE_OPERATION_NAME,
}, f"Invalid option: {operation_name}"

w = WorkspaceClient()

installation = Installation.assume_user_home(w, "remorph")
Expand All @@ -95,6 +104,17 @@ def main(*argv) -> None:

table_recon = installation.load(type_ref=TableRecon, filename=filename)

if operation_name == AGG_RECONCILE_OPERATION_NAME:
return _trigger_reconcile_aggregates(w, table_recon, reconcile_config)

return _trigger_recon(w, table_recon, reconcile_config)


def _trigger_recon(
w: WorkspaceClient,
table_recon: TableRecon,
reconcile_config: ReconcileConfig,
):
try:
recon_output = recon(
ws=w,
Expand All @@ -109,6 +129,42 @@ def main(*argv) -> None:
raise e


def _trigger_reconcile_aggregates(
ws: WorkspaceClient,
table_recon: TableRecon,
reconcile_config: ReconcileConfig,
):
"""
Triggers the reconciliation process for aggregated data between source and target tables.
Supported Aggregate functions: MIN, MAX, COUNT, SUM, AVG, MEAN, MODE, PERCENTILE, STDDEV, VARIANCE, MEDIAN
This function attempts to reconcile aggregate data based on the configurations provided. It logs the outcome
of the reconciliation process, including any errors encountered during execution.
Parameters:
- ws (WorkspaceClient): The workspace client used to interact with Databricks workspaces.
- table_recon (TableRecon): Configuration for the table reconciliation process, including source and target details.
- reconcile_config (ReconcileConfig): General configuration for the reconciliation process,
including database and table settings.
Raises:
- ReconciliationException: If an error occurs during the reconciliation process, it is caught and re-raised
after logging the error details.
"""
try:
recon_output = reconcile_aggregates(
ws=ws,
spark=DatabricksSession.builder.getOrCreate(),
table_recon=table_recon,
reconcile_config=reconcile_config,
)
logger.info(f"recon_output: {recon_output}")
logger.info(f"recon_id: {recon_output.recon_id}")
except ReconciliationException as e:
logger.error(f"Error while running aggregate reconcile: {str(e)}")
raise e


def recon(
ws: WorkspaceClient,
spark: SparkSession,
Expand Down
9 changes: 6 additions & 3 deletions src/databricks/labs/remorph/reconcile/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from databricks.labs.remorph.config import ReconcileConfig, TableRecon
from databricks.labs.remorph.deployment.recon import RECON_JOB_NAME
from databricks.labs.remorph.reconcile.execute import RECONCILE_OPERATION_NAME

logger = logging.getLogger(__name__)

Expand All @@ -29,16 +30,18 @@ def __init__(
self._install_state = install_state
self._prompts = prompts

def run(self):
def run(self, operation_name=RECONCILE_OPERATION_NAME):
reconcile_config = self._get_verified_recon_config()
job_id = self._get_recon_job_id(reconcile_config)
logger.info(f"Triggering the reconcile job with job_id: `{job_id}`")
wait = self._ws.jobs.run_now(job_id)
wait = self._ws.jobs.run_now(job_id, job_parameters={"operation_name": operation_name})
if not wait.run_id:
raise SystemExit(f"Job {job_id} execution failed. Please check the job logs for more details.")

job_run_url = f"{self._ws.config.host}/jobs/{job_id}/runs/{wait.run_id}"
logger.info(f"Reconcile job started. Please check the job_url `{job_run_url}` for the current status.")
logger.info(
f"'{operation_name.upper()}' job started. Please check the job_url `{job_run_url}` for the current status."
)
if self._prompts.confirm(f"Would you like to open the job run URL `{job_run_url}` in the browser?"):
webbrowser.open(job_run_url)

Expand Down
59 changes: 58 additions & 1 deletion tests/unit/reconcile/test_aggregates_reconcile.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

from dataclasses import dataclass
from pathlib import Path

Expand All @@ -9,7 +11,7 @@

from databricks.labs.remorph.config import DatabaseConfig, ReconcileMetadataConfig, get_dialect
from databricks.labs.remorph.reconcile.connectors.data_source import MockDataSource
from databricks.labs.remorph.reconcile.execute import Reconciliation
from databricks.labs.remorph.reconcile.execute import Reconciliation, main
from databricks.labs.remorph.reconcile.recon_config import (
Aggregate,
AggregateQueryOutput,
Expand Down Expand Up @@ -340,3 +342,58 @@ def test_reconcile_aggregate_data_mismatch_and_missing_records(
_compare_reconcile_output(
actual.reconcile_output, expected_reconcile_output_dict(mock_spark).get(actual.rule.agg_type)
)


def test_run_with_invalid_operation_name(monkeypatch):
test_args = ["databricks_labs_remorph", "invalid-operation"]
monkeypatch.setattr(sys, 'argv', test_args)
with pytest.raises(AssertionError, match="Invalid option:"):
main()


def test_aggregates_reconcile_invalid_aggregates():
invalid_agg_type_message = "Invalid aggregate type: std, only .* are supported."
with pytest.raises(AssertionError, match=invalid_agg_type_message):
Aggregate(agg_columns=["discount"], group_by_columns=["p_id"], type="STD")


def test_aggregates_reconcile_aggregate_columns():
agg = Aggregate(agg_columns=["discount", "price"], group_by_columns=["p_dept_id", "p_sub_dept"], type="STDDEV")

assert agg.get_agg_type() == "stddev"
assert agg.group_by_columns_as_str == "p_dept_id+__+p_sub_dept"
assert agg.agg_columns_as_str == "discount+__+price"

agg1 = Aggregate(agg_columns=["discount"], type="MAX")
assert agg1.get_agg_type() == "max"
assert agg1.group_by_columns_as_str == "NA"
assert agg1.agg_columns_as_str == "discount"


def test_aggregates_reconcile_aggregate_rule():
agg_rule = AggregateRule(
agg_column="discount",
group_by_columns=["p_dept_id", "p_sub_dept"],
group_by_columns_as_str="p_dept_id+__+p_sub_dept",
agg_type="stddev",
)

assert agg_rule.column_from_rule == "stddev_discount_p_dept_id+__+p_sub_dept"
assert agg_rule.group_by_columns_as_table_column == "\"p_dept_id, p_sub_dept\""
expected_rule_query = """ SELECT 1234 as rule_id, 'AGGREGATE' as rule_type, map( 'agg_type', 'stddev',
'agg_column', 'discount',
'group_by_columns', "p_dept_id, p_sub_dept"
)
as rule_info """
assert agg_rule.get_rule_query(1234) == expected_rule_query


agg_rule1 = AggregateRule(agg_column="discount", group_by_columns=None, group_by_columns_as_str="NA", agg_type="max")
assert agg_rule1.column_from_rule == "max_discount_NA"
assert agg_rule1.group_by_columns_as_table_column == "NULL"
EXPECTED_RULE1_QUERY = """ SELECT 1234 as rule_id, 'AGGREGATE' as rule_type, map( 'agg_type', 'max',
'agg_column', 'discount',
'group_by_columns', NULL
)
as rule_info """
assert agg_rule1.get_rule_query(1234) == EXPECTED_RULE1_QUERY
86 changes: 83 additions & 3 deletions tests/unit/reconcile/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def test_run_with_job_id_in_config():

recon_runner = ReconcileRunner(ws, installation, install_state, prompts)
recon_runner.run()
ws.jobs.run_now.assert_called_once_with(1234)
ws.jobs.run_now.assert_called_once_with(1234, job_parameters={'operation_name': 'reconcile'})


def test_run_with_job_id_in_state(monkeypatch):
Expand Down Expand Up @@ -310,7 +310,7 @@ def test_run_with_job_id_in_state(monkeypatch):

recon_runner = ReconcileRunner(ws, installation, install_state, prompts)
recon_runner.run()
ws.jobs.run_now.assert_called_once_with(1234)
ws.jobs.run_now.assert_called_once_with(1234, job_parameters={'operation_name': 'reconcile'})


def test_run_with_failed_execution():
Expand Down Expand Up @@ -371,4 +371,84 @@ def test_run_with_failed_execution():
recon_runner = ReconcileRunner(ws, installation, install_state, prompts)
with pytest.raises(SystemExit):
recon_runner.run()
ws.jobs.run_now.assert_called_once_with(1234)
ws.jobs.run_now.assert_called_once_with(1234, job_parameters={'operation_name': 'reconcile'})


def test_aggregates_reconcile_run_with_job_id_in_state(monkeypatch):
monkeypatch.setattr("webbrowser.open", lambda url: None)
ws = create_autospec(WorkspaceClient)
prompts = MockPrompts(
{
r"Would you like to open the job run URL .*": "yes",
}
)
state = {
"resources": {"jobs": {RECON_JOB_NAME: "1234"}},
"version": 1,
}

reconcile = {
"data_source": "snowflake",
"database_config": {
"source_catalog": "abc",
"source_schema": "def",
"target_catalog": "tgt",
"target_schema": "sch",
},
"report_type": "all",
"secret_scope": "remorph",
"tables": {
"filter_type": "all",
"tables_list": ["*"],
},
"metadata_config": {
"catalog": "remorph",
"schema": "reconcile",
"volume": "reconcile_volume",
},
"version": 1,
}

sf_recon_config = {
"source_catalog": "abc",
"source_schema": "def",
"tables": [
{
"aggregates": [
{"type": "MIN", "agg_columns": ["discount"], "group_by_columns": ["p_id"]},
{"type": "AVG", "agg_columns": ["discount"], "group_by_columns": ["p_id"]},
{"type": "MAX", "agg_columns": ["p_id"], "group_by_columns": ["creation_date"]},
{"type": "MAX", "agg_columns": ["p_name"]},
{"type": "SUM", "agg_columns": ["p_id"]},
{"type": "MAX", "agg_columns": ["creation_date"]},
{"type": "MAX", "agg_columns": ["p_id"], "group_by_columns": ["creation_date"]},
],
"column_mapping": [
{"source_name": "p_id", "target_name": "product_id"},
{"source_name": "p_name", "target_name": "product_name"},
],
"join_columns": ["p_id"],
"select_columns": ["p_id", "p_name"],
"source_name": "product",
"target_name": "product_delta",
}
],
"target_catalog": "tgt",
"target_schema": "sch",
}

installation = MockInstallation(
{
"state.json": state,
"reconcile.yml": reconcile,
"recon_config_snowflake_abc_all.json": sf_recon_config,
}
)
install_state = InstallState.from_installation(installation)
wait = Mock()
wait.run_id = "rid"
ws.jobs.run_now.return_value = wait

recon_runner = ReconcileRunner(ws, installation, install_state, prompts)
recon_runner.run(operation_name="aggregates-reconcile")
ws.jobs.run_now.assert_called_once_with(1234, job_parameters={'operation_name': 'aggregates-reconcile'})
5 changes: 5 additions & 0 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,8 @@ def test_cli_configure_secrets_config(mock_workspace_client):
def test_cli_reconcile(mock_workspace_client):
with patch("databricks.labs.remorph.reconcile.runner.ReconcileRunner.run", return_value=True):
cli.reconcile(mock_workspace_client)


def test_cli_aggregates_reconcile(mock_workspace_client):
with patch("databricks.labs.remorph.reconcile.runner.ReconcileRunner.run", return_value=True):
cli.aggregates_reconcile(mock_workspace_client)

0 comments on commit 2e7d906

Please sign in to comment.