Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added crawler for Azure Service principals used for direct storage access #305

Merged
merged 11 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/databricks/labs/ucx/assessment/crawlers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import re
from dataclasses import dataclass

from databricks.sdk import WorkspaceClient
Expand All @@ -13,6 +14,15 @@
"spark.databricks.hive.metastore.glueCatalog.enabled",
]

_AZURE_SP_CLUSTER_CONF = [
"fs.azure.account.key",
"fs.azure.account.auth.type",
"fs.azure.account.oauth.provider.type",
"fs.azure.account.oauth2.client.id",
"fs.azure.account.oauth2.client.secret",
"fs.azure.account.oauth2.client.endpoint",
]


@dataclass
class JobInfo:
Expand Down Expand Up @@ -60,13 +70,23 @@ def _crawl(self) -> list[ClusterInfo]:
all_clusters = list(self._ws.clusters.list())
return list(self._assess_clusters(all_clusters))

def _get_cluster_policy_definition(self, policy_id):
dipankarkush-db marked this conversation as resolved.
Show resolved Hide resolved
return str(self._ws.cluster_policies.get(policy_id).definition)

def _get_cluster_family_definition(self, policy_id):
return str(self._ws.cluster_policies.get(policy_id).policy_family_definition_overrides)

def _assess_clusters(self, all_clusters):
dipankarkush-db marked this conversation as resolved.
Show resolved Hide resolved
for cluster in all_clusters:
if cluster.cluster_source == ClusterSource.JOB:
continue
cluster_info = ClusterInfo(cluster.cluster_id, cluster.cluster_name, cluster.creator_user_name, 1, "")
support_status = spark_version_compatibility(cluster.spark_version)
failures = []
conf_list = []
cluster_dict = {}
conf_policy_list = []
cluster_policy_dict = {}
if support_status != "supported":
failures.append(f"not supported DBR: {cluster.spark_version}")

Expand All @@ -78,6 +98,32 @@ def _assess_clusters(self, all_clusters):
for value in cluster.spark_conf.values():
dipankarkush-db marked this conversation as resolved.
Show resolved Hide resolved
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
dipankarkush-db marked this conversation as resolved.
Show resolved Hide resolved
failures.append(f"using DBFS mount in configuration: {value}")

# Checking if Azure cluster config is present in spark config
for key, value in cluster.spark_conf.items():
for conf in _AZURE_SP_CLUSTER_CONF:
if re.search(conf, key):
conf_list += [{key: value}]
if conf_list:
cluster_dict["clusterName"] = cluster.cluster_name
cluster_dict["clusterSparkConf"] = conf_list
failures.append(json.dumps(cluster_dict))
dipankarkush-db marked this conversation as resolved.
Show resolved Hide resolved

# Checking if Azure cluster config is present in cluster policies
if cluster.policy_id:
cluster_policy_definition = self._get_cluster_policy_definition(cluster.policy_id)
cluster_family_definition = self._get_cluster_family_definition(cluster.policy_id)
for pol in _AZURE_SP_CLUSTER_CONF:
if re.search("spark_conf." + pol, cluster_policy_definition):
conf_policy_list += [{"definition": cluster_policy_definition}]
if re.search("spark_conf." + pol, cluster_family_definition):
conf_policy_list += [{"policy_family_definition_overrides": cluster_family_definition}]

if conf_policy_list:
cluster_policy_dict["clusterName"] = cluster.cluster_name
cluster_policy_dict["clusterPolicy"] = conf_policy_list
failures.append(json.dumps(cluster_policy_dict))

cluster_info.failures = json.dumps(failures)
if len(failures) > 0:
cluster_info.success = 0
Expand Down
94 changes: 91 additions & 3 deletions tests/unit/assessment/test_assessment.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def test_job_assessment():
assert result_set[1].success == 0


def test_cluster_assessment():
def test_cluster_assessment(mocker):
sample_clusters = [
ClusterDetails(
autoscale=AutoScale(min_workers=1, max_workers=6),
Expand All @@ -129,8 +129,96 @@ def test_cluster_assessment():
spark_version="9.3.x-cpu-ml-scala2.12",
cluster_id="0810-225833-atlanta69",
),
ClusterDetails(
cluster_name="Tech Summit FY24 Cluster",
autoscale=AutoScale(min_workers=1, max_workers=6),
spark_conf={
"spark.hadoop.fs.azure.account."
"oauth2.client.id.abcde.dfs.core.windows.net": "{{secrets/abcff/sp_app_client_id}}",
"spark.hadoop.fs.azure.account."
"oauth2.client.endpoint.abcde.dfs.core.windows.net": "https://login.microsoftonline.com/dedededede/token",
"spark.hadoop.fs.azure.account."
"oauth2.client.secret.abcde.dfs.core.windows.net": "{{secrets/abcff/sp_secret}}",
},
spark_context_id=5134472582179565315,
spark_env_vars=None,
spark_version="13.3.x-cpu-ml-scala2.12",
cluster_id="0915-190044-3dqy6751",
),
ClusterDetails(
cluster_name="Tech Summit FY24 Cluster-1",
autoscale=AutoScale(min_workers=1, max_workers=6),
spark_context_id=5134472582179565315,
spark_env_vars=None,
policy_id="D96308F1BF0003A7",
spark_version="13.3.x-cpu-ml-scala2.12",
cluster_id="0915-190044-3dqy6751",
),
]
result_set = list(ClustersCrawler(Mock(), MockBackend(), "ucx")._assess_clusters(sample_clusters))
assert len(result_set) == 2

ws = Mock()
ws.cluster_policies.get.return_value.definition = (
'{\n "spark_conf.fs.azure.account.auth.type": {\n '
'"type": "fixed",\n "value": "OAuth",\n '
' "hidden": true\n },\n "spark_conf.fs.azure.account.oauth.provider.type": {\n '
' "type": "fixed",\n "value": '
'"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",\n '
'"hidden": true\n },\n "spark_conf.fs.azure.account.oauth2.client.id": {\n '
'"type": "fixed",\n "value": "fsfsfsfsffsfsf",\n "hidden": true\n },\n '
'"spark_conf.fs.azure.account.oauth2.client.secret": {\n "type": "fixed",\n '
'"value": "gfgfgfgfggfggfgfdds",\n "hidden": true\n },\n '
'"spark_conf.fs.azure.account.oauth2.client.endpoint": {\n '
'"type": "fixed",\n '
'"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",\n '
'"hidden": true\n }\n}'
)
ws.cluster_policies.get.return_value.policy_family_definition_overrides = (
'{\n "spark_conf.fs.azure.account.auth.type": {\n '
'"type": "fixed",\n "value": "OAuth",\n '
' "hidden": true\n },\n "spark_conf.fs.azure.account.oauth.provider.type": {\n '
' "type": "fixed",\n "value": '
'"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",\n '
'"hidden": true\n },\n "spark_conf.fs.azure.account.oauth2.client.id": {\n '
'"type": "fixed",\n "value": "fsfsfsfsffsfsf",\n "hidden": true\n },\n '
'"spark_conf.fs.azure.account.oauth2.client.secret": {\n "type": "fixed",\n '
'"value": "gfgfgfgfggfggfgfdds",\n "hidden": true\n },\n '
'"spark_conf.fs.azure.account.oauth2.client.endpoint": {\n '
'"type": "fixed",\n '
'"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",\n '
'"hidden": true\n }\n}'
)

crawler = ClustersCrawler(ws, MockBackend(), "ucx")._assess_clusters(sample_clusters)
result_set = list(crawler)

assert len(result_set) == 4
assert result_set[0].success == 1
assert result_set[1].success == 0
assert result_set[2].success == 0
assert result_set[3].success == 0


def test_cluster_assessment_cluster_policy_no_spark_conf(mocker):
sample_clusters1 = [
ClusterDetails(
cluster_name="Tech Summit FY24 Cluster-2",
autoscale=AutoScale(min_workers=1, max_workers=6),
spark_context_id=5134472582179565315,
spark_env_vars=None,
policy_id="D96308F1BF0003A8",
spark_version="13.3.x-cpu-ml-scala2.12",
cluster_id="0915-190044-3dqy6751",
)
]
ws = Mock()
ws.cluster_policies.get.return_value.definition = {
"definition": '{"node_type_id":{"type":"allowlist",'
'"values":["Standard_DS3_v2","Standard_DS4_v2","Standard_DS5_v2","Standard_NC4as_T4_v3"],'
'"defaultValue":"Standard_DS3_v2"}'
}
ws.cluster_policies.get.return_value.policy_family_definition_overrides = "family_definition"

crawler = ClustersCrawler(ws, MockBackend(), "ucx")._assess_clusters(sample_clusters1)
result_set1 = list(crawler)
assert len(result_set1) == 1
assert result_set1[0].success == 1