Skip to content

Commit

Permalink
fix: merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
saraburns1 committed Oct 2, 2024
2 parents c3b8639 + 0a00530 commit fe08281
Show file tree
Hide file tree
Showing 36 changed files with 114 additions and 632 deletions.
17 changes: 12 additions & 5 deletions tutoraspects/commands_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,14 @@ def init_clickhouse() -> list[tuple[str, str]]:
# Ex: "tutor local do performance-metrics "
@click.command(context_settings={"ignore_unknown_options": True})
@click.option(
"--course_key",
"--org",
default="",
help="A course_key to apply as a filter, you must include the 'course-v1:'.",
help="An organization to apply as a filter.",
)
@click.option(
"--course_name",
default="",
help="A course_name to apply as a filter.",
)
@click.option(
"--dashboard_slug", default="", help="Only run charts for the given dashboard."
Expand All @@ -167,13 +172,15 @@ def init_clickhouse() -> list[tuple[str, str]]:
@click.option(
"--fail_on_error", is_flag=True, default=False, help="Allow errors to fail the run."
)
def performance_metrics(
course_key, dashboard_slug, slice_name, print_sql, fail_on_error
def performance_metrics( # pylint: disable=too-many-arguments,too-many-positional-arguments
org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error
) -> (list)[tuple[str, str]]:
"""
Job to measure performance metrics of charts and its queries in Superset and ClickHouse.
"""
options = f"--course_key {course_key}" if course_key else ""
options = ""
options += f"--org '{org}' " if org else ""
options += f"--course_name '{course_name}' " if course_name else ""
options += f" --dashboard_slug {dashboard_slug}" if dashboard_slug else ""
options += f' --slice_name "{slice_name}"' if slice_name else ""
options += " --print_sql" if print_sql else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import yaml
from copy import deepcopy
from pathlib import Path
from sqlfmt.api import format_string
from sqlfmt.mode import Mode
from collections import defaultdict

from superset import security_manager
Expand Down Expand Up @@ -128,6 +130,9 @@ def write_asset_to_file(asset, asset_name, folder, file_name, roles, translated_
asset["sqlalchemy_uri"] = DATABASES.get(asset["database_name"])
if folder in ["charts", "dashboards", "datasets"]:
for locale in DASHBOARD_LOCALES:
if folder == "datasets":
asset["sql"] = format_string(asset["sql"], mode=Mode(dialect_name="clickhouse"))

updated_asset = generate_translated_asset(
asset, asset_name, folder, locale, roles, translated_asset_uuids
)
Expand Down Expand Up @@ -171,7 +176,6 @@ def generate_translated_asset(asset, asset_name, folder, language, roles, transl

# Save parent & translated uuids in yaml file
translated_asset_uuids[parent_uuid].add(copy['uuid'])

if folder == "dashboards":
copy["slug"] = f"{copy['slug']}-{language}"
copy["description"] = get_translation(copy["description"], language)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
across Superset installations.
"""

from create_assets import BASE_DIR, ASSET_FOLDER_MAPPING, app

import json
import logging
import os
import time
Expand All @@ -20,10 +17,12 @@
import click
import sqlparse
import yaml
from create_assets import app

from flask import g
from superset import security_manager
from superset.commands.chart.data.get_data_command import ChartDataCommand
from superset.charts.schemas import ChartDataQueryContextSchema
from superset.commands.chart.data.get_data_command import ChartDataCommand
from superset.extensions import db
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
Expand All @@ -42,46 +41,53 @@
"Result rows: {result_rows}\n"
"Memory Usage (MB): {memory_usage_mb}\n"
"Row count (superset) {rowcount:}\n"
"Filters: {filters}\n\n"
"Filters: {filters}\n"
"SQL:\n"
"{sql}\n\n\n"
)


@click.command()
@click.option("--org", default="", help="An organization to apply as a filter.")
@click.option(
"--course_key",
"--course_name",
default="",
help="A course_key to apply as a filter, you must include the 'course-v1:'.")
help="A course_name to apply as a filter, you must include the 'course-v1:'.",
)
@click.option(
"--dashboard_slug",
default="",
help="Only run charts for the given dashboard.")
"--dashboard_slug", default="", help="Only run charts for the given dashboard."
)
@click.option(
"--slice_name",
default="",
help="Only run charts for the given slice name, if the name appears in more than "
"one dashboard it will be run for each.")
"one dashboard it will be run for each.",
)
@click.option(
"--print_sql",
is_flag=True,
default=False,
help="Whether to print the SQL run."
"--print_sql", is_flag=True, default=False, help="Whether to print the SQL run."
)
@click.option(
"--fail_on_error", is_flag=True, default=False, help="Allow errors to fail the run."
)
def performance_metrics(course_key, dashboard_slug, slice_name, print_sql,
fail_on_error):
def performance_metrics(
org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error
):
"""
Measure the performance of the dashboard.
"""
# Mock the client name to identify the queries in the clickhouse system.query_log
# table by by the http_user_agent field.
extra_filters = []
if course_key:
extra_filters += [{"col": "course_key", "op": "==", "val": course_key}]
if course_name:
extra_filters += [{"col": "course_name", "op": "IN", "val": course_name}]
if org:
extra_filters += [{"col": "org", "op": "IN", "val": org}]

with patch("clickhouse_connect.common.build_client_name") as mock_build_client_name:
mock_build_client_name.return_value = RUN_ID
target_dashboards = [dashboard_slug] if dashboard_slug else {{SUPERSET_EMBEDDABLE_DASHBOARDS}}
target_dashboards = (
[dashboard_slug] if dashboard_slug else {{SUPERSET_EMBEDDABLE_DASHBOARDS}}
)

dashboards = (
db.session.query(Dashboard)
Expand All @@ -98,14 +104,13 @@ def performance_metrics(course_key, dashboard_slug, slice_name, print_sql,
logger.info(f"Dashboard: {dashboard.slug}")
for slice in dashboard.slices:
if slice_name and not slice_name == slice.slice_name:
logger.info(f"{slice.slice_name} doesn't match {slice_name}, "
f"skipping.")
logger.info(
f"{slice.slice_name} doesn't match {slice_name}, " f"skipping."
)
continue

query_context = get_slice_query_context(
slice,
query_contexts,
extra_filters
slice, query_contexts, extra_filters
)
result = measure_chart(slice, query_context, fail_on_error)
if not result:
Expand Down Expand Up @@ -167,27 +172,32 @@ def get_slice_query_context(slice, query_contexts, extra_filters=None):
}
)

query_context["form_data"]["extra_form_data"] = {"filters": extra_filters}

if extra_filters:
for query in query_context["queries"]:
query["filters"] += extra_filters

return query_context


def measure_chart(slice, query_context, fail_on_error):
def measure_chart(slice, query_context_dict, fail_on_error):
"""
Measure the performance of a chart and return the results.
"""
logger.info(f"Fetching slice data: {slice}")

g.user = security_manager.find_user(username="{{SUPERSET_ADMIN_USERNAME}}")
query_context = ChartDataQueryContextSchema().load(query_context)
query_context = ChartDataQueryContextSchema().load(query_context_dict)
command = ChartDataCommand(query_context)

start_time = datetime.now()
command.validate()
g.form_data = query_context.form_data
try:
start_time = datetime.now()
result = command.run()

end_time = datetime.now()
result["time_elapsed"] = (end_time - start_time).total_seconds()
result["slice"] = slice
for query in result["queries"]:
if "error" in query and query["error"]:
raise query["error"]
Expand All @@ -197,11 +207,6 @@ def measure_chart(slice, query_context, fail_on_error):
raise e
return

end_time = datetime.now()

result["time_elapsed"] = (end_time - start_time).total_seconds()
result["slice"] = slice

return result


Expand All @@ -227,44 +232,38 @@ def get_query_log_from_clickhouse(report, query_contexts, print_sql, fail_on_err
parsed_sql = str(sqlparse.parse(row.pop("query"))[0])
clickhouse_queries[parsed_sql] = row

if print_sql:
logger.info("ClickHouse SQL: ")
logger.info(parsed_sql)

# Sort report by slowest queries
report = sorted(report, key=lambda x: x["time_elapsed"], reverse=True)

report_str = f"\nSuperset Reports: {RUN_ID}\n\n"
for i, chart_result in enumerate(report):
report_str += (
report_format.format(
i=(i + 1),
dashboard=chart_result["dashboard"],
slice=chart_result["slice"],
superset_time=chart_result["time_elapsed"]
)
)
for i, query in enumerate(chart_result["queries"]):
for k, chart_result in enumerate(report):
for query in chart_result["queries"]:
parsed_sql = (
str(sqlparse.parse(query["query"])[0]).replace(";", "")
+ "\n FORMAT Native"
)
chart_result["sql"] = parsed_sql
clickhouse_report = clickhouse_queries.get(parsed_sql, {})
chart_result.update(clickhouse_report)
chart_result.update(
{"query_duration_ms": chart_result.get("query_duration_ms", 0)}
)

if print_sql:
logger.info("Superset SQL: ")
logger.info(parsed_sql)
# Sort report by slowest queries
report = sorted(report, key=lambda x: x["query_duration_ms"], reverse=True)

clickhouse_report = clickhouse_queries.get(parsed_sql, {})
report_str += (
query_format.format(
query_duration_ms=clickhouse_report.get(
"query_duration_ms", 0
) / 1000,
memory_usage_mb=clickhouse_report.get("memory_usage_mb"),
result_rows=clickhouse_report.get("result_rows"),
rowcount=query["rowcount"],
filters=query["applied_filters"],
)
report_str = f"\nSuperset Reports: {RUN_ID}\n\n"
for k, chart_result in enumerate(report):
report_str += report_format.format(
i=(k + 1),
dashboard=chart_result["dashboard"],
slice=chart_result["slice"],
superset_time=chart_result["time_elapsed"],
)
for query in chart_result["queries"]:
report_str += query_format.format(
query_duration_ms=chart_result.get("query_duration_ms") / 1000,
memory_usage_mb=chart_result.get("memory_usage_mb"),
result_rows=chart_result.get("result_rows"),
rowcount=query["rowcount"],
filters=query["applied_filters"],
sql=chart_result["sql"] if print_sql else "",
)
logger.info(report_str)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ openedx-atlas
ruamel-yaml==0.18.6
sentry-sdk[flask]
urllib3>=1.26.15,<2
shandy-sqlfmt[jinjafmt]==0.21.2
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ with
recent_activity as (
select course_key, COUNT(DISTINCT actor_id) as active_last_7_days
from {{ ASPECTS_XAPI_DATABASE }}.navigation_events
where emission_time >= NOW() - INTERVAL 7 DAY
where
emission_time >= NOW() - INTERVAL 7 DAY
{% include 'openedx-assets/queries/common_filters.sql' %}
group by course_key
)

select fss.*, COALESCE(ra.active_last_7_days, 0) as active_within_last_7_days
from {{ DBT_PROFILE_TARGET_DATABASE }}.fact_student_status fss
left join recent_activity ra on fss.course_key = ra.course_key
where 1 = 1 {% include 'openedx-assets/queries/common_filters.sql' %}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ with
select org, course_key, learners.actor_id as actor_id
from {{ DBT_PROFILE_TARGET_DATABASE }}.fact_student_status learners
join page_visits using (org, course_key, actor_id)
where approving_state = 'failed' and enrollment_status = 'registered'
where
approving_state = 'failed' and enrollment_status = 'registered'
{% include 'openedx-assets/queries/common_filters.sql' %}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ join
(
{% include 'openedx-assets/queries/at_risk_learner_filter.sql' %}
) as at_risk_learners using (org, course_key, actor_id)
where 1 = 1 {% include 'openedx-assets/queries/common_filters.sql' %}
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ where
approving_state = 'failed'
and enrollment_status = 'registered'
and page_visits.last_visited < subtractDays(now(), 7)
{% include 'openedx-assets/queries/common_filters.sql' %}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ left join
{{ ASPECTS_EVENT_SINK_DATABASE }}.course_names cn
on fes.org = cn.org
and fes.course_key = cn.course_key
where 1 = 1 {% include 'openedx-assets/queries/common_filters.sql' %}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ join
(
{% include 'openedx-assets/queries/at_risk_learner_filter.sql' %}
) as at_risk_learners using (org, course_key, actor_id)
where 1 = 1 {% include 'openedx-assets/queries/common_filters.sql' %}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ join
(
{% include 'openedx-assets/queries/at_risk_learner_filter.sql' %}
) as at_risk_learners using (org, course_key, actor_id)
where 1 = 1 {% include 'openedx-assets/queries/common_filters.sql' %}
Loading

0 comments on commit fe08281

Please sign in to comment.