Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ingest/redshift): limit copy lineage #11662

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
# We use 290 instead instead of the standard 320, because escape characters can add to the length.
_QUERY_SEQUENCE_LIMIT = 290

_MAX_COPY_ENTRIES_PER_TABLE = 20


class RedshiftCommonQuery:
CREATE_TEMP_TABLE_CLAUSE = "create temp table"
Expand Down Expand Up @@ -293,28 +295,37 @@ def alter_table_rename_query(
def list_copy_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
select
distinct
"schema" as target_schema,
"table" as target_table,
c.file_name as filename
from
SYS_QUERY_DETAIL as si
join SYS_LOAD_DETAIL as c on
si.query_id = c.query_id
join SVV_TABLE_INFO sti on
sti.table_id = si.table_id
where
database = '{db_name}'
and si.start_time >= '{start_time}'
and si.start_time < '{end_time}'
order by target_schema, target_table, si.start_time asc
""".format(
return """\
SELECT DISTINCT
target_schema,
target_table,
filename
FROM (
SELECT
sti."schema" AS target_schema,
sti."table" AS target_table,
c.file_name AS filename,
ROW_NUMBER() OVER (
PARTITION BY sti."schema", sti."table"
ORDER BY si.start_time ASC
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use DESC in favour of getting latest rows ?

) AS rn
FROM
SYS_QUERY_DETAIL AS si
JOIN SYS_LOAD_DETAIL AS c ON si.query_id = c.query_id
JOIN SVV_TABLE_INFO sti ON sti.table_id = si.table_id
WHERE
sti.database = '{db_name}'
AND si.start_time >= '{start_time}'
AND si.start_time < '{end_time}'
) subquery
WHERE rn <= {_MAX_COPY_ENTRIES_PER_TABLE}
ORDER BY target_schema, target_table, filename
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
_MAX_COPY_ENTRIES_PER_TABLE=_MAX_COPY_ENTRIES_PER_TABLE,
)

@staticmethod
Expand Down
Loading