Skip to content

Commit

Permalink
feat(ingest): add redshift usage source (datahub-project#3277)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmay-bhat authored and jgiannuzzi committed Oct 6, 2021
1 parent f6d132f commit 0daf67a
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 0 deletions.
5 changes: 5 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def get_long_description():
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"redash": {"redash-toolbelt"},
"redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"redshift-usage": sql_common
| {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"sagemaker": aws_common,
"snowflake": sql_common | {"snowflake-sqlalchemy<=1.2.4"},
"snowflake-usage": sql_common | {"snowflake-sqlalchemy<=1.2.4"},
Expand Down Expand Up @@ -175,6 +177,8 @@ def get_long_description():
"datahub-kafka",
"datahub-rest",
"redash",
"redshift",
"redshift-usage"
# airflow is added below
]
for dependency in plugins[plugin]
Expand Down Expand Up @@ -247,6 +251,7 @@ def get_long_description():
"postgres = datahub.ingestion.source.sql.postgres:PostgresSource",
"redash = datahub.ingestion.source.redash:RedashSource",
"redshift = datahub.ingestion.source.sql.redshift:RedshiftSource",
"redshift-usage = datahub.ingestion.source.usage.redshift_usage:RedshiftUsageSource",
"snowflake = datahub.ingestion.source.sql.snowflake:SnowflakeSource",
"snowflake-usage = datahub.ingestion.source.usage.snowflake_usage:SnowflakeUsageSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import collections
import dataclasses
import logging
from datetime import datetime
from typing import Dict, Iterable, List

from dateutil import parser
from pydantic import Field
from pydantic.main import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.redshift import RedshiftConfig
from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
get_time_bucket,
)

logger = logging.getLogger(__name__)

redshift_datetime_format = "%Y-%m-%d %H:%M:%S"

# add this join to the sql comment for more metrics on completed queries
# LEFT JOIN svl_query_metrics_summary sqms ON ss.query = sqms.query
# Reference: https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_QUERY_METRICS_SUMMARY.html

# this sql query joins stl_scan over table info,
# querytext, and user info to get usage stats
# using non-LEFT joins here to limit the results to
# queries run by the user on user-defined tables.
redshift_usage_sql_comment = """
SELECT DISTINCT ss.userid,
ss.query,
sui.usename,
ss.tbl,
sq.text,
sti.database,
sti.schema,
sti.table,
ss.starttime,
ss.endtime
FROM stl_scan ss
JOIN svv_table_info sti ON ss.tbl = sti.table_id
LEFT JOIN stl_querytext sq ON ss.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE ss.starttime >= '{start_time}'
AND ss.endtime < '{end_time}'
ORDER BY ss.endtime DESC;
""".strip()


RedshiftTableRef = str
AggregatedDataset = GenericAggregatedDataset[RedshiftTableRef]


class RedshiftJoinedAccessEvent(BaseModel):
userid: int
usename: str = None # type:ignore
query: int
tbl: int
text: str = None # type:ignore
database: str = None # type:ignore
schema_: str = Field(None, alias="schema")
table: str = None # type:ignore
starttime: datetime
endtime: datetime


class RedshiftUsageConfig(RedshiftConfig, BaseUsageConfig):
env: str = builder.DEFAULT_ENV
email_domain: str
options: dict = {}

def get_sql_alchemy_url(self):
return super().get_sql_alchemy_url(uri_opts=self.options)


@dataclasses.dataclass
class RedshiftUsageSource(Source):
config: RedshiftUsageConfig
report: SourceReport = dataclasses.field(default_factory=SourceReport)

@classmethod
def create(cls, config_dict, ctx):
config = RedshiftUsageConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
"""Gets Redshift usage stats as work units"""
access_events = self._get_redshift_history()
# If the query results is empty, we don't want to proceed
if not access_events:
return []

joined_access_event = self._get_joined_access_event(access_events)
aggregated_info = self._aggregate_access_events(joined_access_event)

for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
yield wu

def _make_usage_query(self) -> str:
return redshift_usage_sql_comment.format(
start_time=self.config.start_time.strftime(redshift_datetime_format),
end_time=self.config.end_time.strftime(redshift_datetime_format),
)

def _make_sql_engine(self) -> Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url = {url}")
engine = create_engine(url, **self.config.options)
return engine

def _get_redshift_history(self):
query = self._make_usage_query()
engine = self._make_sql_engine()
results = engine.execute(query)
events = []
for row in results:
# minor type conversion
if hasattr(row, "_asdict"):
event_dict = row._asdict()
else:
event_dict = dict(row)

# stripping extra spaces caused by above _asdict() conversion
for k, v in event_dict.items():
if isinstance(v, str):
event_dict[k] = v.strip()

if event_dict.get("starttime", None):
event_dict["starttime"] = event_dict.get("starttime").__str__()
if event_dict.get("endtime", None):
event_dict["endtime"] = event_dict.get("endtime").__str__()

logger.debug(f"event_dict: {event_dict}")
events.append(event_dict)

if events:
return events

# SQL results can be empty. If results is empty, the SQL connection closes.
# Then, we don't want to proceed ingestion.
logging.info("SQL Result is empty")
return None

def _convert_str_to_datetime(self, v):
if isinstance(v, str):
isodate = parser.parse(v) # compatible with Python 3.6+
return isodate.strftime(redshift_datetime_format)

def _get_joined_access_event(self, events):
joined_access_events = []
for event_dict in events:

event_dict["starttime"] = self._convert_str_to_datetime(
event_dict.get("starttime")
)
event_dict["endtime"] = self._convert_str_to_datetime(
event_dict.get("endtime")
)

if not (
event_dict.get("database", None)
and event_dict.get("schema", None)
and event_dict.get("table", None)
):
logging.info("An access event parameter(s) is missing. Skipping ....")
continue

if not event_dict.get("usename") or event_dict["usename"] == "":
logging.info("The username parameter is missing. Skipping ....")
continue

joined_access_events.append(RedshiftJoinedAccessEvent(**event_dict))
return joined_access_events

def _aggregate_access_events(
self, events: List[RedshiftJoinedAccessEvent]
) -> Dict[datetime, Dict[RedshiftTableRef, AggregatedDataset]]:
datasets: Dict[
datetime, Dict[RedshiftTableRef, AggregatedDataset]
] = collections.defaultdict(dict)

for event in events:
floored_ts = get_time_bucket(event.starttime, self.config.bucket_duration)

resource = f"{event.database}.{event.schema_}.{event.table}"

agg_bucket = datasets[floored_ts].setdefault(
resource,
AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
)

# add @unknown.com to username
# current limitation in user stats UI, we need to provide email to show users
username = f"{event.usename if event.usename else 'unknown'}@{self.config.email_domain}"
logger.info(f"username: {username}")
agg_bucket.add_read_entry(
username,
event.text,
[], # TODO: not currently supported by redshift; find column level changes
)
return datasets

def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit:
return agg.make_usage_workunit(
self.config.bucket_duration,
lambda resource: builder.make_dataset_urn(
"redshift", resource.lower(), self.config.env
),
self.config.top_n_queries,
)

def get_report(self) -> SourceReport:
return self.report

def close(self) -> None:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"[email protected]\"}], \"fieldCounts\": []}",
"contentType": "application/json"
},
"systemMetadata": null
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import pathlib
from unittest.mock import patch

from freezegun import freeze_time

from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.usage.redshift_usage import RedshiftUsageConfig
from tests.test_helpers import mce_helpers

FROZEN_TIME = "2021-08-24 09:00:00"


def test_redshift_usage_config():
config = RedshiftUsageConfig.parse_obj(
dict(
host_port="xxxxx",
database="xxxxx",
username="xxxxx",
password="xxxxx",
email_domain="xxxxx",
include_views=True,
include_tables=True,
)
)

assert config.host_port == "xxxxx"
assert config.database == "xxxxx"
assert config.username == "xxxxx"
assert config.email_domain == "xxxxx"
assert config.include_views
assert config.include_tables


def yield_function(li):
for i in li:
yield i


@freeze_time(FROZEN_TIME)
def test_redshift_usage_source(pytestconfig, tmp_path):

test_resources_dir = pathlib.Path(
pytestconfig.rootpath / "tests/integration/redshift-usage"
)

with patch(
"datahub.ingestion.source.usage.redshift_usage.RedshiftUsageSource._get_redshift_history"
) as mock_event_history:
access_events = load_access_events(test_resources_dir)
mock_event_history.return_value = access_events

# Run ingestion
pipeline = Pipeline.create(
{
"run_id": "test-redshift-usage",
"source": {
"type": "redshift-usage",
"config": {
"host_port": "xxxxx",
"database": "xxxxx",
"username": "xxxxx",
"password": "xxxxx",
"email_domain": "acryl.io",
"include_views": True,
"include_tables": True,
},
},
"sink": {
"type": "file",
"config": {"filename": f"{tmp_path}/redshift_usages.json"},
},
},
)
pipeline.run()
pipeline.raise_from_status()

mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=tmp_path / "redshift_usages.json",
golden_path=test_resources_dir / "redshift_usages_golden.json",
)


def load_access_events(test_resources_dir):
access_events_history_file = test_resources_dir / "usage_events_history.json"
with access_events_history_file.open() as access_events_json:
access_events = json.loads(access_events_json.read())
return access_events
Loading

0 comments on commit 0daf67a

Please sign in to comment.