diff --git a/src/databricks/labs/ucx/assessment/crawlers.py b/src/databricks/labs/ucx/assessment/crawlers.py index 9c74388ab2..33372e25dc 100644 --- a/src/databricks/labs/ucx/assessment/crawlers.py +++ b/src/databricks/labs/ucx/assessment/crawlers.py @@ -1,4 +1,5 @@ import json +import re from dataclasses import dataclass from databricks.sdk import WorkspaceClient @@ -13,6 +14,17 @@ "spark.databricks.hive.metastore.glueCatalog.enabled", ] +_AZURE_SP_CONF = [ + "fs.azure.account.key", + "fs.azure.account.auth.type", + "fs.azure.account.oauth.provider.type", + "fs.azure.account.oauth2.client.id", + "fs.azure.account.oauth2.client.secret", + "fs.azure.account.oauth2.client.endpoint", +] + +_AZURE_SP_CONF_FAILURE_MSG = "Uses azure service principal credentials config in " + @dataclass class JobInfo: @@ -32,6 +44,23 @@ class ClusterInfo: failures: str +@dataclass +class PipelineInfo: + pipeline_id: str + pipeline_name: str + creator_name: str + success: int + failures: str + + +def _azure_sp_conf_present_check(config: dict) -> bool: + for key in config.keys(): + for conf in _AZURE_SP_CONF: + if re.search(conf, key): + return True + return False + + def spark_version_compatibility(spark_version: str) -> str: first_comp_custom_rt = 3 first_comp_custom_x = 2 @@ -51,6 +80,37 @@ def spark_version_compatibility(spark_version: str) -> str: return "supported" +class PipelinesCrawler(CrawlerBase): + def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): + super().__init__(sbe, "hive_metastore", schema, "pipelines") + self._ws = ws + + def _crawl(self) -> list[PipelineInfo]: + all_pipelines = list(self._ws.pipelines.list_pipelines()) + return list(self._assess_pipelines(all_pipelines)) + + def _assess_pipelines(self, all_pipelines): + for pipeline in all_pipelines: + pipeline_info = PipelineInfo(pipeline.pipeline_id, pipeline.name, pipeline.creator_user_name, 1, "") + failures = [] + pipeline_config = self._ws.pipelines.get(pipeline.pipeline_id).spec.configuration + if pipeline_config: + if _azure_sp_conf_present_check(pipeline_config): + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} pipeline.") + + pipeline_info.failures = json.dumps(failures) + if len(failures) > 0: + pipeline_info.success = 0 + yield pipeline_info + + def snapshot(self) -> list[PipelineInfo]: + return self._snapshot(self._try_fetch, self._crawl) + + def _try_fetch(self) -> list[PipelineInfo]: + for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"): + yield PipelineInfo(*row) + + class ClustersCrawler(CrawlerBase): def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): super().__init__(sbe, "hive_metastore", schema, "clusters") @@ -78,6 +138,20 @@ def _assess_clusters(self, all_clusters): for value in cluster.spark_conf.values(): if "dbfs:/mnt" in value or "/dbfs/mnt" in value: failures.append(f"using DBFS mount in configuration: {value}") + + # Checking if Azure cluster config is present in spark config + if _azure_sp_conf_present_check(cluster.spark_conf): + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + + # Checking if Azure cluster config is present in cluster policies + if cluster.policy_id: + policy = self._ws.cluster_policies.get(cluster.policy_id) + if _azure_sp_conf_present_check(json.loads(policy.definition)): + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + if policy.policy_family_definition_overrides: + if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + cluster_info.failures = json.dumps(failures) if len(failures) > 0: cluster_info.success = 0 @@ -139,6 +213,20 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> list[JobI for value in cluster_config.spark_conf.values(): if "dbfs:/mnt" in value or "/dbfs/mnt" in value: job_assessment[job.job_id].add(f"using DBFS mount in configuration: {value}") + + # Checking if Azure cluster config is present in spark config + if _azure_sp_conf_present_check(cluster_config.spark_conf): + job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + + # Checking if Azure cluster config is present in cluster policies + if cluster_config.policy_id: + policy = self._ws.cluster_policies.get(cluster_config.policy_id) + if _azure_sp_conf_present_check(json.loads(policy.definition)): + job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + if policy.policy_family_definition_overrides: + if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): + job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + for job_key in job_details.keys(): job_details[job_key].failures = json.dumps(list(job_assessment[job_key])) if len(job_assessment[job_key]) > 0: diff --git a/tests/unit/assessment/test_assessment.py b/tests/unit/assessment/test_assessment.py index 6695b830de..828dd10324 100644 --- a/tests/unit/assessment/test_assessment.py +++ b/tests/unit/assessment/test_assessment.py @@ -2,8 +2,14 @@ from databricks.sdk.service.compute import AutoScale, ClusterDetails, ClusterSource from databricks.sdk.service.jobs import BaseJob, JobSettings, NotebookTask, Task +from databricks.sdk.service.pipelines import PipelineState, PipelineStateInfo -from databricks.labs.ucx.assessment.crawlers import ClustersCrawler, JobsCrawler +from databricks.labs.ucx.assessment.crawlers import ( + ClustersCrawler, + JobsCrawler, + PipelineInfo, + PipelinesCrawler, +) from databricks.labs.ucx.hive_metastore.data_objects import ExternalLocationCrawler from databricks.labs.ucx.hive_metastore.mounts import Mount from databricks.labs.ucx.mixins.sql import Row @@ -111,7 +117,148 @@ def test_job_assessment(): assert result_set[1].success == 0 -def test_cluster_assessment(): +def test_job_assessment_for_azure_spark_config(): + sample_jobs = [ + BaseJob( + created_time=1694536604319, + creator_user_name="anonymous@databricks.com", + job_id=536591785949415, + settings=JobSettings( + compute=None, + continuous=None, + tasks=[ + Task( + task_key="Ingest", + existing_cluster_id="0807-225846-avon493", + notebook_task=NotebookTask( + notebook_path="/Users/foo.bar@databricks.com/Customers/Example/Test/Load" + ), + timeout_seconds=0, + ) + ], + timeout_seconds=0, + ), + ), + BaseJob( + created_time=1694536604321, + creator_user_name="anonymous@databricks.com", + job_id=536591785949416, + settings=JobSettings( + compute=None, + continuous=None, + tasks=[ + Task( + task_key="Ingest", + existing_cluster_id="0810-229933-chicago99", + notebook_task=NotebookTask( + notebook_path="/Users/foo.bar@databricks.com/Customers/Example/Test/Load" + ), + timeout_seconds=0, + ) + ], + timeout_seconds=0, + ), + ), + BaseJob( + created_time=1694536604319, + creator_user_name="anonymous@databricks.com", + job_id=536591785949417, + settings=JobSettings( + compute=None, + continuous=None, + tasks=[ + Task( + task_key="Ingest", + existing_cluster_id="0811-929933-maine96", + notebook_task=NotebookTask( + notebook_path="/Users/foo.bar@databricks.com/Customers/Example/Test/Load" + ), + timeout_seconds=0, + ) + ], + timeout_seconds=0, + ), + ), + ] + + sample_clusters = [ + ClusterDetails( + autoscale=AutoScale(min_workers=1, max_workers=6), + spark_conf={ + "spark.hadoop.fs.azure.account." + "oauth2.client.id.abcde.dfs.core.windows.net": "{{secrets/abcff/sp_app_client_id}}", + "spark.hadoop.fs.azure.account." + "oauth2.client.endpoint.abcde.dfs.core.windows.net": "https://login.microsoftonline.com/dedededede/token", + "spark.hadoop.fs.azure.account." + "oauth2.client.secret.abcde.dfs.core.windows.net": "{{secrets/abcff/sp_secret}}", + }, + spark_context_id=5134472582179566666, + spark_env_vars=None, + spark_version="13.3.x-cpu-ml-scala2.12", + cluster_id="0807-225846-avon493", + cluster_source=ClusterSource.JOB, + ), + ClusterDetails( + autoscale=AutoScale(min_workers=1, max_workers=6), + spark_conf={"spark.databricks.delta.preview.enabled": "true"}, + spark_context_id=5134472582179566666, + spark_env_vars=None, + spark_version="13.3.x-cpu-ml-scala2.12", + cluster_id="0810-229933-chicago99", + cluster_source=ClusterSource.JOB, + ), + ClusterDetails( + autoscale=AutoScale(min_workers=1, max_workers=6), + spark_conf={"spark.databricks.delta.preview.enabled": "true"}, + spark_context_id=5134472582179566666, + spark_env_vars=None, + spark_version="13.3.x-cpu-ml-scala2.12", + policy_id="D96308F1BF0003A9", + cluster_id="0811-929933-maine96", + cluster_source=ClusterSource.JOB, + ), + ] + ws = Mock() + ws.cluster_policies.get().definition = ( + '{\n "spark_conf.fs.azure.account.auth.type": {\n ' + '"type": "fixed",\n "value": "OAuth",\n ' + ' "hidden": true\n },\n "spark_conf.fs.azure.account.oauth.provider.type": {\n ' + ' "type": "fixed",\n "value": ' + '"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",\n ' + '"hidden": true\n },\n "spark_conf.fs.azure.account.oauth2.client.id": {\n ' + '"type": "fixed",\n "value": "fsfsfsfsffsfsf",\n "hidden": true\n },\n ' + '"spark_conf.fs.azure.account.oauth2.client.secret": {\n "type": "fixed",\n ' + '"value": "gfgfgfgfggfggfgfdds",\n "hidden": true\n },\n ' + '"spark_conf.fs.azure.account.oauth2.client.endpoint": {\n ' + '"type": "fixed",\n ' + '"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",\n ' + '"hidden": true\n }\n}' + ) + ws.cluster_policies.get().policy_family_definition_overrides = ( + '{\n "spark_conf.fs.azure.account.auth.type": {\n ' + '"type": "fixed",\n "value": "OAuth",\n ' + ' "hidden": true\n },\n "spark_conf.fs.azure.account.oauth.provider.type": {\n ' + ' "type": "fixed",\n "value": ' + '"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",\n ' + '"hidden": true\n },\n "spark_conf.fs.azure.account.oauth2.client.id": {\n ' + '"type": "fixed",\n "value": "fsfsfsfsffsfsf",\n "hidden": true\n },\n ' + '"spark_conf.fs.azure.account.oauth2.client.secret": {\n "type": "fixed",\n ' + '"value": "gfgfgfgfggfggfgfdds",\n "hidden": true\n },\n ' + '"spark_conf.fs.azure.account.oauth2.client.endpoint": {\n ' + '"type": "fixed",\n ' + '"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",\n ' + '"hidden": true\n }\n}' + ) + result_set = JobsCrawler(ws, MockBackend(), "ucx")._assess_jobs( + sample_jobs, {c.cluster_id: c for c in sample_clusters} + ) + assert len(result_set) == 3 + assert result_set[0].success == 0 + assert result_set[1].success == 1 + assert result_set[2].success == 0 + + +def test_cluster_assessment(mocker): sample_clusters = [ ClusterDetails( autoscale=AutoScale(min_workers=1, max_workers=6), @@ -129,8 +276,194 @@ def test_cluster_assessment(): spark_version="9.3.x-cpu-ml-scala2.12", cluster_id="0810-225833-atlanta69", ), + ClusterDetails( + cluster_name="Tech Summit FY24 Cluster", + autoscale=AutoScale(min_workers=1, max_workers=6), + spark_conf={ + "spark.hadoop.fs.azure.account." + "oauth2.client.id.abcde.dfs.core.windows.net": "{{secrets/abcff/sp_app_client_id}}", + "spark.hadoop.fs.azure.account." + "oauth2.client.endpoint.abcde.dfs.core.windows.net": "https://login.microsoftonline.com/dedededede/token", + "spark.hadoop.fs.azure.account." + "oauth2.client.secret.abcde.dfs.core.windows.net": "{{secrets/abcff/sp_secret}}", + }, + spark_context_id=5134472582179565315, + spark_env_vars=None, + spark_version="13.3.x-cpu-ml-scala2.12", + cluster_id="0915-190044-3dqy6751", + ), + ClusterDetails( + cluster_name="Tech Summit FY24 Cluster-1", + autoscale=AutoScale(min_workers=1, max_workers=6), + spark_context_id=5134472582179565315, + spark_env_vars=None, + policy_id="D96308F1BF0003A7", + spark_version="13.3.x-cpu-ml-scala2.12", + cluster_id="0915-190044-3dqy6751", + ), ] - result_set = list(ClustersCrawler(Mock(), MockBackend(), "ucx")._assess_clusters(sample_clusters)) - assert len(result_set) == 2 + + ws = Mock() + + ws.cluster_policies.get().definition = ( + '{\n "spark_conf.fs.azure.account.auth.type": {\n ' + '"type": "fixed",\n "value": "OAuth",\n ' + ' "hidden": true\n },\n "spark_conf.fs.azure.account.oauth.provider.type": {\n ' + ' "type": "fixed",\n "value": ' + '"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",\n ' + '"hidden": true\n },\n "spark_conf.fs.azure.account.oauth2.client.id": {\n ' + '"type": "fixed",\n "value": "fsfsfsfsffsfsf",\n "hidden": true\n },\n ' + '"spark_conf.fs.azure.account.oauth2.client.secret": {\n "type": "fixed",\n ' + '"value": "gfgfgfgfggfggfgfdds",\n "hidden": true\n },\n ' + '"spark_conf.fs.azure.account.oauth2.client.endpoint": {\n ' + '"type": "fixed",\n ' + '"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",\n ' + '"hidden": true\n }\n}' + ) + ws.cluster_policies.get().policy_family_definition_overrides = ( + '{\n "spark_conf.fs.azure.account.auth.type": {\n ' + '"type": "fixed",\n "value": "OAuth",\n ' + ' "hidden": true\n },\n "spark_conf.fs.azure.account.oauth.provider.type": {\n ' + ' "type": "fixed",\n "value": ' + '"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",\n ' + '"hidden": true\n },\n "spark_conf.fs.azure.account.oauth2.client.id": {\n ' + '"type": "fixed",\n "value": "fsfsfsfsffsfsf",\n "hidden": true\n },\n ' + '"spark_conf.fs.azure.account.oauth2.client.secret": {\n "type": "fixed",\n ' + '"value": "gfgfgfgfggfggfgfdds",\n "hidden": true\n },\n ' + '"spark_conf.fs.azure.account.oauth2.client.endpoint": {\n ' + '"type": "fixed",\n ' + '"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",\n ' + '"hidden": true\n }\n}' + ) + + crawler = ClustersCrawler(ws, MockBackend(), "ucx")._assess_clusters(sample_clusters) + result_set = list(crawler) + + assert len(result_set) == 4 assert result_set[0].success == 1 assert result_set[1].success == 0 + assert result_set[2].success == 0 + assert result_set[3].success == 0 + + +def test_cluster_assessment_cluster_policy_no_spark_conf(mocker): + sample_clusters1 = [ + ClusterDetails( + cluster_name="Tech Summit FY24 Cluster-2", + autoscale=AutoScale(min_workers=1, max_workers=6), + spark_context_id=5134472582179565315, + spark_env_vars=None, + policy_id="D96308F1BF0003A8", + spark_version="13.3.x-cpu-ml-scala2.12", + cluster_id="0915-190044-3dqy6751", + ) + ] + ws = Mock() + ws.cluster_policies.get().definition = ( + '{"node_type_id":{"type":"allowlist","values":["Standard_DS3_v2",' + '"Standard_DS4_v2","Standard_DS5_v2","Standard_NC4as_T4_v3"],"defaultValue":' + '"Standard_DS3_v2"},"spark_version":{"type":"unlimited","defaultValue":"auto:latest-ml"},' + '"runtime_engine":{"type":"fixed","value":"STANDARD","hidden":true},' + '"num_workers":{"type":"fixed","value":0,"hidden":true},"data_security_mode":' + '{"type":"allowlist","values":["SINGLE_USER","LEGACY_SINGLE_USER","LEGACY_SINGLE_USER_STANDARD"],' + '"defaultValue":"SINGLE_USER","hidden":true},"driver_instance_pool_id":{"type":"forbidden","hidden":true},' + '"cluster_type":{"type":"fixed","value":"all-purpose"},"instance_pool_id":{"type":"forbidden","hidden":true},' + '"azure_attributes.availability":{"type":"fixed","value":"ON_DEMAND_AZURE","hidden":true},' + '"spark_conf.spark.databricks.cluster.profile":{"type":"fixed","value":"singleNode","hidden":true},' + '"autotermination_minutes":{"type":"unlimited","defaultValue":4320,"isOptional":true}}' + ) + + ws.cluster_policies.get().policy_family_definition_overrides = ( + '{\n "not.spark.conf": {\n ' + '"type": "fixed",\n "value": "OAuth",\n ' + ' "hidden": true\n },\n "not.a.type": {\n ' + ' "type": "fixed",\n "value": ' + '"not.a.matching.type",\n ' + '"hidden": true\n },\n "not.a.matching.type": {\n ' + '"type": "fixed",\n "value": "fsfsfsfsffsfsf",\n "hidden": true\n },\n ' + '"not.a.matching.type": {\n "type": "fixed",\n ' + '"value": "gfgfgfgfggfggfgfdds",\n "hidden": true\n },\n ' + '"not.a.matching.type": {\n ' + '"type": "fixed",\n ' + '"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",\n ' + '"hidden": true\n }\n}' + ) + + crawler = ClustersCrawler(ws, MockBackend(), "ucx")._assess_clusters(sample_clusters1) + result_set1 = list(crawler) + assert len(result_set1) == 1 + assert result_set1[0].success == 1 + + +def test_pipeline_assessment_with_config(mocker): + sample_pipelines = [ + PipelineStateInfo( + cluster_id=None, + creator_user_name="abcde.defgh@databricks.com", + latest_updates=None, + name="New DLT Pipeline", + pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7407", + run_as_user_name="abcde.defgh@databricks.com", + state=PipelineState.IDLE, + ) + ] + + ws = Mock() + config_dict = { + "spark.hadoop.fs.azure.account.auth.type.abcde.dfs.core.windows.net": "SAS", + "spark.hadoop.fs.azure.sas.token.provider.type.abcde.dfs." + "core.windows.net": "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider", + "spark.hadoop.fs.azure.sas.fixed.token.abcde.dfs.core.windows.net": "{{secrets/abcde_access/sasFixedToken}}", + } + ws.pipelines.get().spec.configuration = config_dict + + crawler = PipelinesCrawler(ws, MockBackend(), "ucx")._assess_pipelines(sample_pipelines) + result_set = list(crawler) + + assert len(result_set) == 1 + assert result_set[0].success == 0 + + +def test_pipeline_assessment_without_config(mocker): + sample_pipelines = [ + PipelineStateInfo( + cluster_id=None, + creator_user_name="abcde.defgh@databricks.com", + latest_updates=None, + name="New DLT Pipeline", + pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7497", + run_as_user_name="abcde.defgh@databricks.com", + state=PipelineState.IDLE, + ) + ] + ws = Mock() + config_dict = {} + ws.pipelines.get().spec.configuration = config_dict + crawler = PipelinesCrawler(ws, MockBackend(), "ucx")._assess_pipelines(sample_pipelines) + result_set = list(crawler) + + assert len(result_set) == 1 + assert result_set[0].success == 1 + + +def test_pipeline_snapshot_with_config(): + sample_pipelines = [ + PipelineInfo( + creator_name="abcde.defgh@databricks.com", + pipeline_name="New DLT Pipeline", + pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7497", + success=1, + failures="", + ) + ] + mock_ws = Mock() + + crawler = PipelinesCrawler(mock_ws, MockBackend(), "ucx") + + crawler._try_fetch = Mock(return_value=[]) + crawler._crawl = Mock(return_value=sample_pipelines) + + result_set = crawler.snapshot() + + assert len(result_set) == 1 + assert result_set[0].success == 1