diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 629e40974f5ff4..224066a599aa38 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -109,6 +109,8 @@ def get_long_description(): "postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"}, "redash": {"redash-toolbelt"}, "redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, + "redshift-usage": sql_common + | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "sagemaker": aws_common, "snowflake": sql_common | {"snowflake-sqlalchemy<=1.2.4"}, "snowflake-usage": sql_common | {"snowflake-sqlalchemy<=1.2.4"}, @@ -175,6 +177,8 @@ def get_long_description(): "datahub-kafka", "datahub-rest", "redash", + "redshift", + "redshift-usage" # airflow is added below ] for dependency in plugins[plugin] @@ -247,6 +251,7 @@ def get_long_description(): "postgres = datahub.ingestion.source.sql.postgres:PostgresSource", "redash = datahub.ingestion.source.redash:RedashSource", "redshift = datahub.ingestion.source.sql.redshift:RedshiftSource", + "redshift-usage = datahub.ingestion.source.usage.redshift_usage:RedshiftUsageSource", "snowflake = datahub.ingestion.source.sql.snowflake:SnowflakeSource", "snowflake-usage = datahub.ingestion.source.usage.snowflake_usage:SnowflakeUsageSource", "superset = datahub.ingestion.source.superset:SupersetSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py new file mode 100644 index 00000000000000..93c2bcad071899 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py @@ -0,0 +1,225 @@ +import collections +import dataclasses +import logging +from datetime import datetime +from typing import Dict, Iterable, List + +from dateutil import parser +from pydantic import Field +from pydantic.main import BaseModel +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + +import datahub.emitter.mce_builder as builder +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.sql.redshift import RedshiftConfig +from datahub.ingestion.source.usage.usage_common import ( + BaseUsageConfig, + GenericAggregatedDataset, + get_time_bucket, +) + +logger = logging.getLogger(__name__) + +redshift_datetime_format = "%Y-%m-%d %H:%M:%S" + +# add this join to the sql comment for more metrics on completed queries +# LEFT JOIN svl_query_metrics_summary sqms ON ss.query = sqms.query +# Reference: https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_QUERY_METRICS_SUMMARY.html + +# this sql query joins stl_scan over table info, +# querytext, and user info to get usage stats +# using non-LEFT joins here to limit the results to +# queries run by the user on user-defined tables. +redshift_usage_sql_comment = """ +SELECT DISTINCT ss.userid, + ss.query, + sui.usename, + ss.tbl, + sq.text, + sti.database, + sti.schema, + sti.table, + ss.starttime, + ss.endtime +FROM stl_scan ss + JOIN svv_table_info sti ON ss.tbl = sti.table_id + LEFT JOIN stl_querytext sq ON ss.query = sq.query + JOIN svl_user_info sui ON sq.userid = sui.usesysid +WHERE ss.starttime >= '{start_time}' +AND ss.endtime < '{end_time}' +ORDER BY ss.endtime DESC; +""".strip() + + +RedshiftTableRef = str +AggregatedDataset = GenericAggregatedDataset[RedshiftTableRef] + + +class RedshiftJoinedAccessEvent(BaseModel): + userid: int + usename: str = None # type:ignore + query: int + tbl: int + text: str = None # type:ignore + database: str = None # type:ignore + schema_: str = Field(None, alias="schema") + table: str = None # type:ignore + starttime: datetime + endtime: datetime + + +class RedshiftUsageConfig(RedshiftConfig, BaseUsageConfig): + env: str = builder.DEFAULT_ENV + email_domain: str + options: dict = {} + + def get_sql_alchemy_url(self): + return super().get_sql_alchemy_url(uri_opts=self.options) + + +@dataclasses.dataclass +class RedshiftUsageSource(Source): + config: RedshiftUsageConfig + report: SourceReport = dataclasses.field(default_factory=SourceReport) + + @classmethod + def create(cls, config_dict, ctx): + config = RedshiftUsageConfig.parse_obj(config_dict) + return cls(ctx, config) + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + """Gets Redshift usage stats as work units""" + access_events = self._get_redshift_history() + # If the query results is empty, we don't want to proceed + if not access_events: + return [] + + joined_access_event = self._get_joined_access_event(access_events) + aggregated_info = self._aggregate_access_events(joined_access_event) + + for time_bucket in aggregated_info.values(): + for aggregate in time_bucket.values(): + wu = self._make_usage_stat(aggregate) + self.report.report_workunit(wu) + yield wu + + def _make_usage_query(self) -> str: + return redshift_usage_sql_comment.format( + start_time=self.config.start_time.strftime(redshift_datetime_format), + end_time=self.config.end_time.strftime(redshift_datetime_format), + ) + + def _make_sql_engine(self) -> Engine: + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url = {url}") + engine = create_engine(url, **self.config.options) + return engine + + def _get_redshift_history(self): + query = self._make_usage_query() + engine = self._make_sql_engine() + results = engine.execute(query) + events = [] + for row in results: + # minor type conversion + if hasattr(row, "_asdict"): + event_dict = row._asdict() + else: + event_dict = dict(row) + + # stripping extra spaces caused by above _asdict() conversion + for k, v in event_dict.items(): + if isinstance(v, str): + event_dict[k] = v.strip() + + if event_dict.get("starttime", None): + event_dict["starttime"] = event_dict.get("starttime").__str__() + if event_dict.get("endtime", None): + event_dict["endtime"] = event_dict.get("endtime").__str__() + + logger.debug(f"event_dict: {event_dict}") + events.append(event_dict) + + if events: + return events + + # SQL results can be empty. If results is empty, the SQL connection closes. + # Then, we don't want to proceed ingestion. + logging.info("SQL Result is empty") + return None + + def _convert_str_to_datetime(self, v): + if isinstance(v, str): + isodate = parser.parse(v) # compatible with Python 3.6+ + return isodate.strftime(redshift_datetime_format) + + def _get_joined_access_event(self, events): + joined_access_events = [] + for event_dict in events: + + event_dict["starttime"] = self._convert_str_to_datetime( + event_dict.get("starttime") + ) + event_dict["endtime"] = self._convert_str_to_datetime( + event_dict.get("endtime") + ) + + if not ( + event_dict.get("database", None) + and event_dict.get("schema", None) + and event_dict.get("table", None) + ): + logging.info("An access event parameter(s) is missing. Skipping ....") + continue + + if not event_dict.get("usename") or event_dict["usename"] == "": + logging.info("The username parameter is missing. Skipping ....") + continue + + joined_access_events.append(RedshiftJoinedAccessEvent(**event_dict)) + return joined_access_events + + def _aggregate_access_events( + self, events: List[RedshiftJoinedAccessEvent] + ) -> Dict[datetime, Dict[RedshiftTableRef, AggregatedDataset]]: + datasets: Dict[ + datetime, Dict[RedshiftTableRef, AggregatedDataset] + ] = collections.defaultdict(dict) + + for event in events: + floored_ts = get_time_bucket(event.starttime, self.config.bucket_duration) + + resource = f"{event.database}.{event.schema_}.{event.table}" + + agg_bucket = datasets[floored_ts].setdefault( + resource, + AggregatedDataset(bucket_start_time=floored_ts, resource=resource), + ) + + # add @unknown.com to username + # current limitation in user stats UI, we need to provide email to show users + username = f"{event.usename if event.usename else 'unknown'}@{self.config.email_domain}" + logger.info(f"username: {username}") + agg_bucket.add_read_entry( + username, + event.text, + [], # TODO: not currently supported by redshift; find column level changes + ) + return datasets + + def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit: + return agg.make_usage_workunit( + self.config.bucket_duration, + lambda resource: builder.make_dataset_urn( + "redshift", resource.lower(), self.config.env + ), + self.config.top_n_queries, + ) + + def get_report(self) -> SourceReport: + return self.report + + def close(self) -> None: + pass diff --git a/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json new file mode 100644 index 00000000000000..154eb006f6c00b --- /dev/null +++ b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json @@ -0,0 +1,15 @@ +[ +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": []}", + "contentType": "application/json" + }, + "systemMetadata": null +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py b/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py new file mode 100644 index 00000000000000..6f9c9fac28d83e --- /dev/null +++ b/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py @@ -0,0 +1,89 @@ +import json +import pathlib +from unittest.mock import patch + +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.usage.redshift_usage import RedshiftUsageConfig +from tests.test_helpers import mce_helpers + +FROZEN_TIME = "2021-08-24 09:00:00" + + +def test_redshift_usage_config(): + config = RedshiftUsageConfig.parse_obj( + dict( + host_port="xxxxx", + database="xxxxx", + username="xxxxx", + password="xxxxx", + email_domain="xxxxx", + include_views=True, + include_tables=True, + ) + ) + + assert config.host_port == "xxxxx" + assert config.database == "xxxxx" + assert config.username == "xxxxx" + assert config.email_domain == "xxxxx" + assert config.include_views + assert config.include_tables + + +def yield_function(li): + for i in li: + yield i + + +@freeze_time(FROZEN_TIME) +def test_redshift_usage_source(pytestconfig, tmp_path): + + test_resources_dir = pathlib.Path( + pytestconfig.rootpath / "tests/integration/redshift-usage" + ) + + with patch( + "datahub.ingestion.source.usage.redshift_usage.RedshiftUsageSource._get_redshift_history" + ) as mock_event_history: + access_events = load_access_events(test_resources_dir) + mock_event_history.return_value = access_events + + # Run ingestion + pipeline = Pipeline.create( + { + "run_id": "test-redshift-usage", + "source": { + "type": "redshift-usage", + "config": { + "host_port": "xxxxx", + "database": "xxxxx", + "username": "xxxxx", + "password": "xxxxx", + "email_domain": "acryl.io", + "include_views": True, + "include_tables": True, + }, + }, + "sink": { + "type": "file", + "config": {"filename": f"{tmp_path}/redshift_usages.json"}, + }, + }, + ) + pipeline.run() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig=pytestconfig, + output_path=tmp_path / "redshift_usages.json", + golden_path=test_resources_dir / "redshift_usages_golden.json", + ) + + +def load_access_events(test_resources_dir): + access_events_history_file = test_resources_dir / "usage_events_history.json" + with access_events_history_file.open() as access_events_json: + access_events = json.loads(access_events_json.read()) + return access_events diff --git a/metadata-ingestion/tests/integration/redshift-usage/usage_events_history.json b/metadata-ingestion/tests/integration/redshift-usage/usage_events_history.json new file mode 100644 index 00000000000000..8a497bc2737818 --- /dev/null +++ b/metadata-ingestion/tests/integration/redshift-usage/usage_events_history.json @@ -0,0 +1,50 @@ +[ + { + "userid": 1, + "query": 293100, + "usename": "test-name", + "tbl": 101587, + "text": "select userid from users", + "database": "dev", + "schema": "public", + "table": "users", + "starttime": "2021-09-14 00:00:00", + "endtime": "2021-09-15 00:00:00" + }, + { + "userid": 2, + "query": 293101, + "usename": null, + "tbl": 101588, + "text": "select catid from category", + "database": "dev", + "schema": "public", + "table": "category", + "starttime": "2021-09-14 00:00:00", + "endtime": "2021-09-15 00:00:00" + }, + { + "userid": 3, + "query": 293102, + "usename": "chinmay", + "tbl": 101587, + "text": "select catid from category", + "database": null, + "schema": null, + "table": "category", + "starttime": "2021-09-14 00:00:00", + "endtime": "2021-09-15 00:00:00" + }, + { + "userid": 4, + "query": 293102, + "usename": "shirshanka", + "tbl": 101588, + "text": "select catid from category", + "database": "db1", + "schema": "schema1", + "table": null, + "starttime": "2021-09-14 00:00:00", + "endtime": "2021-09-15 00:00:00" + } +] \ No newline at end of file