Skip to content

Commit

Permalink
fix(snowflake): support snowflake allow/deny pattern for lineage and …
Browse files Browse the repository at this point in the history
…usage.
  • Loading branch information
varunbharill committed Dec 15, 2021
1 parent 2e2ed34 commit 22f6d6b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 24 deletions.
29 changes: 16 additions & 13 deletions metadata-ingestion/source_docs/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,23 @@ sink:

Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
| ----------------------------- | -------- | -------------------------------------------------------------- | --------------------------------------------------------------- |
| `username` | | | Snowflake username. |
| `password` | | | Snowflake password. |
| `host_port` || | Snowflake host URL. |
| `warehouse` | | | Snowflake warehouse. |
| `role` | | | Snowflake role. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `apply_view_usage_to_tables` | | False | Attribute usage of views to the underlying table. |

| Field | Required | Default | Description |
| ----------------- | -------- | --------------------------------------------------------------------| --------------------------------------------------------------- |
| `username` | | | Snowflake username. |
| `password` | | | Snowflake password. |
| `host_port` || | Snowflake host URL. |
| `warehouse` | | | Snowflake warehouse. |
| `role` | | | Snowflake role. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `database_pattern`| | `"^UTIL_DB$" `<br />`"^SNOWFLAKE$"`<br />`"^SNOWFLAKE_SAMPLE_DATA$" | Allow/deny patterns for db in snowflake dataset names. |
| `schema_pattern` | | | Allow/deny patterns for schema in snowflake dataset names. |
| `view_pattern` | | | Allow/deny patterns for views in snowflake dataset names. |
| `table_pattern` | | | Allow/deny patterns for tables in snowflake dataset names. |
### Compatibility

Coming soon!
Expand Down
22 changes: 20 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ def _get_upstream_lineage_info(
for lineage_entry in lineage:
# Update the table-lineage
upstream_table_name = lineage_entry[0]
if not self._is_dataset_allowed(upstream_table_name):
continue
upstream_table = UpstreamClass(
dataset=builder.make_dataset_urn(
self.platform, upstream_table_name, self.config.env
Expand All @@ -229,8 +231,9 @@ def _get_upstream_lineage_info(
)
column_lineage[column_lineage_key] = column_lineage_value
logger.debug(f"{column_lineage_key}:{column_lineage_value}")

return UpstreamLineage(upstreams=upstream_tables), column_lineage
if upstream_tables:
return UpstreamLineage(upstreams=upstream_tables), column_lineage
return None

# Override the base class method.
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
Expand Down Expand Up @@ -288,3 +291,18 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:

# Emit the work unit from super.
yield wu

def _is_dataset_allowed(self, dataset_name: Optional[str]) -> bool:
# View lineages is not supported. Add the allow/deny pattern for that when it is supported.
if dataset_name is None:
return True
dataset_params = dataset_name.split(".")
if len(dataset_params) != 3:
return True
if (
not self.config.database_pattern.allowed(dataset_params[0])
or not self.config.schema_pattern.allowed(dataset_params[1])
or not self.config.table_pattern.allowed(dataset_params[2])
):
return False
return True
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from sqlalchemy.engine import Engine

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import get_time_bucket
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -95,6 +96,12 @@ class SnowflakeJoinedAccessEvent(PermissiveModel):
class SnowflakeUsageConfig(BaseSnowflakeConfig, BaseUsageConfig):
env: str = builder.DEFAULT_ENV
options: dict = {}
database_pattern: AllowDenyPattern = AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
)
schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
apply_view_usage_to_tables: bool = False

@pydantic.validator("role", always=True)
Expand Down Expand Up @@ -167,15 +174,57 @@ def is_unsupported_object_accessed(obj: Dict[str, Any]) -> bool:
unsupported_keys = ["locations"]
return any([obj.get(key) is not None for key in unsupported_keys])

def is_dataset_pattern_allowed(
dataset_name: Optional[Any], dataset_type: Optional[Any]
) -> bool:
# TODO: support table/view patterns for usage logs by pulling that information as well from the usage query
if not dataset_type or not dataset_name:
return True

table_or_view_pattern: Optional[
AllowDenyPattern
] = AllowDenyPattern.allow_all()
# Test domain type = external_table and then add it
table_or_view_pattern = (
self.config.table_pattern
if dataset_type.lower() in {"table"}
else (
self.config.view_pattern
if dataset_type.lower() in {"view", "materialized_view"}
else None
)
)
if table_or_view_pattern is None:
return True

dataset_params = dataset_name.split(".")
assert len(dataset_params) == 3
if (
not self.config.database_pattern.allowed(dataset_params[0])
or not self.config.schema_pattern.allowed(dataset_params[1])
or not table_or_view_pattern.allowed(dataset_params[2])
):
return False
return True

def is_object_valid(obj: Dict[str, Any]) -> bool:
if is_unsupported_object_accessed(
obj
) or not is_dataset_pattern_allowed(
obj.get("objectName"), obj.get("objectDomain")
):
return False
return True

event_dict["base_objects_accessed"] = [
obj
for obj in json.loads(event_dict["base_objects_accessed"])
if not is_unsupported_object_accessed(obj)
if is_object_valid(obj)
]
event_dict["direct_objects_accessed"] = [
obj
for obj in json.loads(event_dict["direct_objects_accessed"])
if not is_unsupported_object_accessed(obj)
if is_object_valid(obj)
]
event_dict["query_start_time"] = (
event_dict["query_start_time"]
Expand All @@ -202,15 +251,13 @@ def _aggregate_access_events(
event.query_start_time, self.config.bucket_duration
)

accessed_data = []
if self.config.apply_view_usage_to_tables:
accessed_data = event.base_objects_accessed
else:
accessed_data = event.direct_objects_accessed

accessed_data = (
event.base_objects_accessed
if self.config.apply_view_usage_to_tables
else event.direct_objects_accessed
)
for object in accessed_data:
resource = object.objectName

agg_bucket = datasets[floored_ts].setdefault(
resource,
AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
Expand Down

0 comments on commit 22f6d6b

Please sign in to comment.