Skip to content

Commit

Permalink
Detect direct access to cloud storage and raise a deprecation warning (
Browse files Browse the repository at this point in the history
…#1506)

## Changes

Uses the Pyspark linter, which detects the use of table names that
should be migrated, to also detect the use of direct access to cloud
storage. Additional deprecation advisories are issued when direct cloud
access is detected.

### Linked issues
<!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes,
fixed, resolve, resolves, resolved. See
https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword
-->

Resolves #1133  

Signed-off-by: Jim.Idle <[email protected]>
  • Loading branch information
jimidle authored Apr 24, 2024
1 parent 4aed0c9 commit cf55b87
Show file tree
Hide file tree
Showing 3 changed files with 750 additions and 28 deletions.
167 changes: 150 additions & 17 deletions src/databricks/labs/ucx/source_code/pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,43 @@
from databricks.labs.ucx.source_code.queries import FromTable


class AstHelper:
@staticmethod
def get_full_function_name(node):
if isinstance(node.func, ast.Attribute):
return AstHelper._get_value(node.func)

if isinstance(node.func, ast.Name):
return node.func.id

return None

@staticmethod
def _get_value(node):
if isinstance(node.value, ast.Name):
return node.value.id + '.' + node.attr

if isinstance(node.value, ast.Attribute):
return AstHelper._get_value(node.value) + '.' + node.attr

return None


@dataclass
class Matcher(ABC):
method_name: str
min_args: int
max_args: int
table_arg_index: int
table_arg_name: str | None = None
call_context: dict[str, set[str]] | None = None

def matches(self, node: ast.AST):
if not (isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute)):
return False
return self._get_table_arg(node) is not None
return (
isinstance(node, ast.Call)
and isinstance(node.func, ast.Attribute)
and self._get_table_arg(node) is not None
)

@abstractmethod
def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> Iterator[Advice]:
Expand All @@ -39,9 +64,26 @@ def _get_table_arg(self, node: ast.Call):
if len(node.args) > 0:
return node.args[self.table_arg_index] if self.min_args <= len(node.args) <= self.max_args else None
assert self.table_arg_name is not None
if not node.keywords:
return None
arg = next(kw for kw in node.keywords if kw.arg == self.table_arg_name)
return arg.value if arg is not None else None

def _check_call_context(self, node: ast.Call) -> bool:
assert isinstance(node.func, ast.Attribute) # Avoid linter warning
func_name = node.func.attr
qualified_name = AstHelper.get_full_function_name(node)

# Check if the call_context is None as that means all calls are checked
if self.call_context is None:
return True

# Get the qualified names from the call_context dictionary
qualified_names = self.call_context.get(func_name)

# Check if the qualified name is in the set of qualified names that are allowed
return qualified_name in qualified_names if qualified_names else False


@dataclass
class QueryMatcher(Matcher):
Expand Down Expand Up @@ -78,19 +120,8 @@ class TableNameMatcher(Matcher):

def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> Iterator[Advice]:
table_arg = self._get_table_arg(node)
if isinstance(table_arg, ast.Constant):
dst = self._find_dest(index, table_arg.value, from_table.schema)
if dst is not None:
yield Deprecation(
code='table-migrate',
message=f"Table {table_arg.value} is migrated to {dst.destination()} in Unity Catalog",
# SQLGlot does not propagate tokens yet. See https://github.com/tobymao/sqlglot/issues/3159
start_line=node.lineno,
start_col=node.col_offset,
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)
else:

if not isinstance(table_arg, ast.Constant):
assert isinstance(node.func, ast.Attribute) # always true, avoids a pylint warning
yield Advisory(
code='table-migrate',
Expand All @@ -100,6 +131,21 @@ def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) ->
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)
return

dst = self._find_dest(index, table_arg.value, from_table.schema)
if dst is None:
return

yield Deprecation(
code='table-migrate',
message=f"Table {table_arg.value} is migrated to {dst.destination()} in Unity Catalog",
# SQLGlot does not propagate tokens yet. See https://github.com/tobymao/sqlglot/issues/3159
start_line=node.lineno,
start_col=node.col_offset,
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)

def apply(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> None:
table_arg = self._get_table_arg(node)
Expand Down Expand Up @@ -135,7 +181,62 @@ def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) ->
)

def apply(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> None:
raise NotImplementedError("Should never get there!")
# No transformations to apply
return


@dataclass
class DirectFilesystemAccessMatcher(Matcher):
_DIRECT_FS_REFS = {
"s3a://",
"s3n://",
"s3://",
"wasb://",
"wasbs://",
"abfs://",
"abfss://",
"dbfs:/",
"hdfs://",
"file:/",
}

def matches(self, node: ast.AST):
return (
isinstance(node, ast.Call)
and isinstance(node.func, ast.Attribute)
and self._get_table_arg(node) is not None
)

def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> Iterator[Advice]:
table_arg = self._get_table_arg(node)

if not isinstance(table_arg, ast.Constant):
return

if any(table_arg.value.startswith(prefix) for prefix in self._DIRECT_FS_REFS):
yield Deprecation(
code='direct-filesystem-access',
message=f"The use of direct filesystem references is deprecated: {table_arg.value}",
start_line=node.lineno,
start_col=node.col_offset,
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)
return

if table_arg.value.startswith("/") and self._check_call_context(node):
yield Deprecation(
code='direct-filesystem-access',
message=f"The use of default dbfs: references is deprecated: {table_arg.value}",
start_line=node.lineno,
start_col=node.col_offset,
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)

def apply(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> None:
# No transformations to apply
return


class SparkMatchers:
Expand Down Expand Up @@ -193,6 +294,37 @@ def __init__(self):
TableNameMatcher("register", 1, 2, 0, "name"),
]

direct_fs_access_matchers = [
DirectFilesystemAccessMatcher("ls", 1, 1, 0, call_context={"ls": {"dbutils.fs.ls"}}),
DirectFilesystemAccessMatcher("cp", 1, 2, 0, call_context={"cp": {"dbutils.fs.cp"}}),
DirectFilesystemAccessMatcher("rm", 1, 1, 0, call_context={"rm": {"dbutils.fs.rm"}}),
DirectFilesystemAccessMatcher("head", 1, 1, 0, call_context={"head": {"dbutils.fs.head"}}),
DirectFilesystemAccessMatcher("put", 1, 2, 0, call_context={"put": {"dbutils.fs.put"}}),
DirectFilesystemAccessMatcher("mkdirs", 1, 1, 0, call_context={"mkdirs": {"dbutils.fs.mkdirs"}}),
DirectFilesystemAccessMatcher("mv", 1, 2, 0, call_context={"mv": {"dbutils.fs.mv"}}),
DirectFilesystemAccessMatcher("text", 1, 3, 0),
DirectFilesystemAccessMatcher("csv", 1, 1000, 0),
DirectFilesystemAccessMatcher("json", 1, 1000, 0),
DirectFilesystemAccessMatcher("orc", 1, 1000, 0),
DirectFilesystemAccessMatcher("parquet", 1, 1000, 0),
DirectFilesystemAccessMatcher("save", 0, 1000, -1, "path"),
DirectFilesystemAccessMatcher("load", 0, 1000, -1, "path"),
DirectFilesystemAccessMatcher("option", 1, 1000, 1), # Only .option("path", "xxx://bucket/path") will hit
DirectFilesystemAccessMatcher("addFile", 1, 3, 0),
DirectFilesystemAccessMatcher("binaryFiles", 1, 2, 0),
DirectFilesystemAccessMatcher("binaryRecords", 1, 2, 0),
DirectFilesystemAccessMatcher("dump_profiles", 1, 1, 0),
DirectFilesystemAccessMatcher("hadoopFile", 1, 8, 0),
DirectFilesystemAccessMatcher("newAPIHadoopFile", 1, 8, 0),
DirectFilesystemAccessMatcher("pickleFile", 1, 3, 0),
DirectFilesystemAccessMatcher("saveAsHadoopFile", 1, 8, 0),
DirectFilesystemAccessMatcher("saveAsNewAPIHadoopFile", 1, 7, 0),
DirectFilesystemAccessMatcher("saveAsPickleFile", 1, 2, 0),
DirectFilesystemAccessMatcher("saveAsSequenceFile", 1, 2, 0),
DirectFilesystemAccessMatcher("saveAsTextFile", 1, 2, 0),
DirectFilesystemAccessMatcher("load_from_path", 1, 1, 0),
]

# nothing to migrate in UserDefinedFunction, see https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UserDefinedFunction.html
# nothing to migrate in UserDefinedTableFunction, see https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UserDefinedTableFunction.html
self._matchers = {}
Expand All @@ -203,6 +335,7 @@ def __init__(self):
+ spark_dataframereader_matchers
+ spark_dataframewriter_matchers
+ spark_udtfregistration_matchers
+ direct_fs_access_matchers
):
self._matchers[matcher.method_name] = matcher

Expand Down
40 changes: 40 additions & 0 deletions tests/unit/source_code/test_notebook_linter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
end_line=4,
end_col=1024,
),
Deprecation(
code='direct-filesystem-access',
message='The use of default dbfs: references is deprecated: ' '/mnt/things/e/f/g',
start_line=14,
start_col=8,
end_line=14,
end_col=43,
),
Deprecation(
code='dbfs-usage',
message='Deprecated file system path in call to: /mnt/things/e/f/g',
Expand Down Expand Up @@ -82,6 +90,14 @@
""",
[
Deprecation(
code='direct-filesystem-access',
message='The use of default dbfs: references is deprecated: ' '/mnt/things/e/f/g',
start_line=5,
start_col=8,
end_line=5,
end_col=43,
),
Deprecation(
code='dbfs-usage',
message='Deprecated file system path in call to: /mnt/things/e/f/g',
Expand Down Expand Up @@ -154,6 +170,30 @@
MERGE INTO delta.`/dbfs/...` t USING source ON t.key = source.key WHEN MATCHED THEN DELETE
""",
[
Deprecation(
code='direct-filesystem-access',
message='The use of default dbfs: references is deprecated: /mnt/foo/bar',
start_line=15,
start_col=0,
end_line=15,
end_col=34,
),
Deprecation(
code='direct-filesystem-access',
message='The use of direct filesystem references is deprecated: dbfs:/mnt/foo/bar',
start_line=16,
start_col=0,
end_line=16,
end_col=39,
),
Deprecation(
code='direct-filesystem-access',
message='The use of direct filesystem references is deprecated: dbfs://mnt/foo/bar',
start_line=17,
start_col=0,
end_line=17,
end_col=40,
),
Advisory(
code='dbfs-usage',
message='Possible deprecated file system path: dbfs:/...',
Expand Down
Loading

0 comments on commit cf55b87

Please sign in to comment.