-
Notifications
You must be signed in to change notification settings - Fork 5
Add Course Published event listener and plugin plumbing #1
Changes from all commits
990aeeb
7747b55
185c12c
e2a0e08
db42bc7
d6af144
61a1c6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
""" | ||
|
||
from django.apps import AppConfig | ||
from edx_django_utils.plugins import PluginSettings, PluginSignals | ||
|
||
|
||
class EventSinkClickhouseConfig(AppConfig): | ||
|
@@ -11,3 +12,39 @@ class EventSinkClickhouseConfig(AppConfig): | |
""" | ||
|
||
name = 'event_sink_clickhouse' | ||
verbose_name = "Event Sink ClickHouse" | ||
|
||
plugin_app = { | ||
PluginSettings.CONFIG: { | ||
'lms.djangoapp': { | ||
'production': {PluginSettings.RELATIVE_PATH: 'settings.production'}, | ||
'common': {PluginSettings.RELATIVE_PATH: 'settings.common'}, | ||
}, | ||
'cms.djangoapp': { | ||
'production': {PluginSettings.RELATIVE_PATH: 'settings.production'}, | ||
'common': {PluginSettings.RELATIVE_PATH: 'settings.common'}, | ||
} | ||
}, | ||
# Configuration setting for Plugin Signals for this app. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can remove these inline comments since the configurations are pretty self-explanatory |
||
PluginSignals.CONFIG: { | ||
# Configure the Plugin Signals for each Project Type, as needed. | ||
'cms.djangoapp': { | ||
# List of all plugin Signal receivers for this app and project type. | ||
PluginSignals.RECEIVERS: [{ | ||
# The name of the app's signal receiver function. | ||
PluginSignals.RECEIVER_FUNC_NAME: 'receive_course_publish', | ||
|
||
# The full path to the module where the signal is defined. | ||
PluginSignals.SIGNAL_PATH: 'xmodule.modulestore.django.COURSE_PUBLISHED', | ||
}], | ||
} | ||
}, | ||
} | ||
|
||
def ready(self): | ||
""" | ||
Import our Celery tasks for initialization. | ||
""" | ||
super().ready() | ||
|
||
from . import tasks # pylint: disable=import-outside-toplevel, unused-import | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use absolute imports across the project? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
""" | ||
Default settings for the openedx_event_sink_clickhouse app. | ||
""" | ||
|
||
|
||
def plugin_settings(settings): | ||
""" | ||
Adds default settings | ||
""" | ||
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG = { | ||
# URL to a running ClickHouse server's HTTP interface. ex: https://foo.openedx.org:8443/ or | ||
# http://foo.openedx.org:8123/ . Note that we only support the ClickHouse HTTP interface | ||
# to avoid pulling in more dependencies to the platform than necessary. | ||
"url": "http://clickhouse:8123", | ||
"username": "changeme", | ||
"password": "changeme", | ||
"database": "event_sink", | ||
"timeout_secs": 3, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
""" | ||
Production settings for the openedx_event_sink_clickhouse app. | ||
""" | ||
|
||
|
||
def plugin_settings(settings): | ||
""" | ||
Override the default app settings with production settings. | ||
""" | ||
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG = settings.ENV_TOKENS.get( | ||
'EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG', | ||
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
""" | ||
Signal handler functions, mapped to specific signals in apps.py. | ||
""" | ||
|
||
|
||
def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument | ||
""" | ||
Receives COURSE_PUBLISHED signal and queues the dump job. | ||
""" | ||
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded | ||
from .tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel | ||
|
||
dump_course_to_clickhouse.delay(str(course_key)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
""" | ||
Base classes for event sinks | ||
""" | ||
from collections import namedtuple | ||
|
||
import requests | ||
from django.conf import settings | ||
|
||
ClickHouseAuth = namedtuple("ClickHouseAuth", ["username", "password"]) | ||
|
||
|
||
class BaseSink: | ||
""" | ||
Base class for ClickHouse event sink, allows overwriting of default settings | ||
""" | ||
def __init__(self, connection_overrides, log): | ||
self.log = log | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need the log to be part of the class? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The next PR will be to add a management command that will call into here, so I'm using the pattern established in Coursegraph that passes in the log so that it can go to the celery log or normal IDA log based on how it's being run. |
||
self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] | ||
self.ch_auth = ClickHouseAuth(settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], | ||
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"]) | ||
self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"] | ||
self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["timeout_secs"] | ||
|
||
# If any overrides to the ClickHouse connection | ||
if connection_overrides: | ||
self.ch_url = connection_overrides.get("url", self.ch_url) | ||
self.ch_auth = ClickHouseAuth(connection_overrides.get("username", self.ch_auth.username), | ||
connection_overrides.get("password", self.ch_auth.password)) | ||
self.ch_database = connection_overrides.get("database", self.ch_database) | ||
self.ch_timeout_secs = connection_overrides.get("timeout_secs", self.ch_timeout_secs) | ||
|
||
def _send_clickhouse_request(self, request): | ||
""" | ||
Perform the actual HTTP requests to ClickHouse. | ||
""" | ||
session = requests.Session() | ||
prepared_request = request.prepare() | ||
|
||
try: | ||
response = session.send(prepared_request, timeout=self.ch_timeout_secs) | ||
response.raise_for_status() | ||
except requests.exceptions.HTTPError as e: | ||
self.log.error(str(e)) | ||
self.log.error(e.response.headers) | ||
self.log.error(e.response) | ||
self.log.error(e.response.text) | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does
sink
mean in this context? [curious]There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a message receiver that, in the context of this code, is just saving the data elsewhere. It's not performing any meaningful work or operating in the transactional environment of the service.