This repository has been archived by the owner on Apr 3, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add event listener for course publish
Creates the edx-platform plugin plumbing, adds some new requirements, maps the appropriate Django Signal to push course structure to ClickHouse.
- Loading branch information
Showing
14 changed files
with
807 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
import csv | ||
import io | ||
|
||
from django.conf import settings | ||
from django.utils import timezone | ||
|
||
import requests | ||
|
||
|
||
class ClickHouseDestination: | ||
connection_overrides = None | ||
log = None | ||
|
||
def __init__(self, connection_overrides, log): | ||
|
||
self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] | ||
self.ch_auth = (settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["user"], | ||
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"]) | ||
|
||
self.connection_overrides = connection_overrides | ||
self.log = log | ||
|
||
@staticmethod | ||
def strip_branch_and_version(location): | ||
""" | ||
Removes the branch and version information from a location. | ||
Args: | ||
location: an xblock's location. | ||
Returns: that xblock's location without branch and version information. | ||
""" | ||
return location.for_branch(None) | ||
|
||
@staticmethod | ||
def get_course_last_published(course_key): | ||
""" | ||
Approximately when was a course last published? | ||
We use the 'modified' column in the CourseOverview table as a quick and easy | ||
(although perhaps inexact) way of determining when a course was last | ||
published. This works because CourseOverview rows are re-written upon | ||
course publish. | ||
Args: | ||
course_key: a CourseKey | ||
Returns: The datetime the course was last published at, stringified. | ||
Uses Python's default str(...) implementation for datetimes, which | ||
is sortable and similar to ISO 8601: | ||
https://docs.python.org/3/library/datetime.html#datetime.date.__str__ | ||
""" | ||
# Import is placed here to avoid model import at project startup. | ||
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview | ||
|
||
approx_last_published = CourseOverview.get_from_id(course_key).modified | ||
return str(approx_last_published) | ||
|
||
@staticmethod | ||
def serialize_item(item, index): | ||
""" | ||
Args: | ||
item: an XBlock | ||
index: a number indicating where the item falls in the course hierarchy | ||
Returns: | ||
fields: a *limited* dictionary of an XBlock's field names and values | ||
block_type: the name of the XBlock's type (i.e. 'course' | ||
or 'problem') | ||
""" | ||
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES | ||
|
||
course_key = item.scope_ids.usage_id.course_key | ||
block_type = item.scope_ids.block_type | ||
|
||
rtn_fields = { | ||
'org': course_key.org, | ||
'course_key': str(course_key), | ||
'course': course_key.course, | ||
'run': course_key.run, | ||
'location': str(item.location), | ||
'display_name': item.display_name_with_default.replace("'", "\'"), | ||
'block_type': block_type, | ||
'detached': 1 if block_type in DETACHED_XBLOCK_TYPES else 0, | ||
'edited_on': str(getattr(item, 'edited_on', '')), | ||
'time_last_dumped': str(timezone.now()), | ||
'order': index, | ||
} | ||
|
||
return rtn_fields, block_type | ||
|
||
def serialize_course(self, course_id): | ||
""" | ||
Serializes a course into a CSV of nodes and relationships. | ||
Args: | ||
course_id: CourseKey of the course we want to serialize | ||
Returns: | ||
nodes: a csv of nodes for the course | ||
relationships: a csv of relationships between nodes | ||
""" | ||
# Import is placed here to avoid model import at project startup. | ||
from xmodule.modulestore.django import modulestore | ||
|
||
# create a location to node mapping we'll need later for | ||
# writing relationships | ||
location_to_node = {} | ||
items = modulestore().get_items(course_id) | ||
|
||
# create nodes | ||
i = 0 | ||
for item in items: | ||
i += 1 | ||
fields, block_type = self.serialize_item(item, i) | ||
location_to_node[self.strip_branch_and_version(item.location)] = fields | ||
|
||
# create relationships | ||
relationships = [] | ||
for item in items: | ||
for index, child in enumerate(item.get_children()): | ||
parent_node = location_to_node.get(self.strip_branch_and_version(item.location)) | ||
child_node = location_to_node.get(self.strip_branch_and_version(child.location)) | ||
|
||
if parent_node is not None and child_node is not None: | ||
relationship = { | ||
'course_key': str(course_id), | ||
'parent_location': str(parent_node["location"]), | ||
'child_location': str(child_node["location"]), | ||
'order': index | ||
} | ||
relationships.append(relationship) | ||
|
||
nodes = list(location_to_node.values()) | ||
return nodes, relationships | ||
|
||
def dump(self, course_key): | ||
nodes, relationships = self.serialize_course(course_key) | ||
self.log.info( | ||
"Now dumping %s to ClickHouse: %d nodes and %d relationships", | ||
course_key, | ||
len(nodes), | ||
len(relationships), | ||
) | ||
|
||
course_string = str(course_key) | ||
|
||
try: | ||
# Params that begin with "param_" will be used in the query replacement | ||
# all others are ClickHouse settings. | ||
params = { | ||
# Fail early on bulk inserts | ||
"input_format_allow_errors_num": 1, | ||
"input_format_allow_errors_ratio": 0.1, | ||
} | ||
|
||
# "query" is a special param for the query, it's the best way to get the FORMAT CSV in there. | ||
params["query"] = "INSERT INTO coursegraph.coursegraph_nodes FORMAT CSV" | ||
|
||
output = io.StringIO() | ||
writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) | ||
|
||
for node in nodes: | ||
writer.writerow(node.values()) | ||
|
||
response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth) | ||
self.log.info(response.headers) | ||
self.log.info(response) | ||
self.log.info(response.text) | ||
response.raise_for_status() | ||
|
||
# Just overwriting the previous query | ||
params["query"] = "INSERT INTO coursegraph.coursegraph_relationships FORMAT CSV" | ||
output = io.StringIO() | ||
writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) | ||
|
||
for relationship in relationships: | ||
writer.writerow(relationship.values()) | ||
|
||
response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth) | ||
self.log.info(response.headers) | ||
self.log.info(response) | ||
self.log.info(response.text) | ||
response.raise_for_status() | ||
|
||
self.log.info("Completed dumping %s to ClickHouse", course_key) | ||
|
||
except Exception: # pylint: disable=broad-except | ||
self.log.exception( | ||
"Error trying to dump course %s to ClickHouse!", | ||
course_string | ||
) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
""" | ||
Default settings for the openedx_event_sink_clickhouse app. | ||
""" | ||
|
||
|
||
def plugin_settings(settings): | ||
""" | ||
Adds default settings | ||
""" | ||
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG = { | ||
# URL to a running ClickHouse server's HTTP interface. ex: https://foo.openedx.org:8443/ or | ||
# http://foo.openedx.org:8123/ | ||
"url": "http://clickhouse:8123", | ||
"username": "ch_admin", | ||
"password": "dazimUehjqKjWdpDNhRrGOfp" | ||
} |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
""" | ||
Signal handler functions, mapped to specific signals in apps.py. | ||
""" | ||
|
||
|
||
def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument | ||
""" | ||
Receives publishing signal and performs publishing related workflows, such as | ||
registering proctored exams, building up credit requirements, and performing | ||
search indexing | ||
""" | ||
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded | ||
from ..tasks import dump_course_to_clickhouse | ||
|
||
course_key_str = str(course_key) | ||
dump_course_to_clickhouse.delay(course_key_str) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
""" | ||
This file contains a management command for exporting course modulestore data to ClickHouse | ||
""" | ||
|
||
import logging | ||
|
||
from celery import shared_task | ||
from edx_django_utils.cache import RequestCache | ||
from edx_django_utils.monitoring import set_code_owner_attribute | ||
from opaque_keys.edx.keys import CourseKey | ||
|
||
from destination import ClickHouseDestination | ||
|
||
log = logging.getLogger(__name__) | ||
celery_log = logging.getLogger('edx.celery.task') | ||
|
||
|
||
@shared_task | ||
@set_code_owner_attribute | ||
def dump_course_to_clickhouse(course_key_string, connection_overrides=None): | ||
""" | ||
Serializes a course and writes it to neo4j. | ||
Arguments: | ||
course_key_string: course key for the course to be exported | ||
connection_overrides (dict): overrides to ClickHouse connection | ||
parameters specified in `settings.COURSEGRAPH_CONNECTION`. | ||
""" | ||
course_key = CourseKey.from_string(course_key_string) | ||
destination = ClickHouseDestination(connection_overrides=connection_overrides, log=celery_log) | ||
destination.dump(course_key) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,8 @@ | ||
# Core requirements for using this application | ||
-c constraints.txt | ||
|
||
celery # Asynchronous task execution library | ||
Django # Web application framework | ||
|
||
requests # HTTP request library | ||
edx-django-utils # Django utilities, we use caching and monitoring | ||
edx-opaque-keys # Parsing library for course and usage keys |
Oops, something went wrong.