Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.

Commit

Permalink
feat: Functional Kafka consumer/producer
Browse files Browse the repository at this point in the history
Still too many hard coded values, but it works when
Kafka and the event registry actually start.
  • Loading branch information
bmtcril committed Feb 12, 2024
1 parent 53b7bd5 commit e5d1121
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 15 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 15 additions & 13 deletions tutorevent_bus_redis/patches/local-docker-compose-services
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% if RUN_KAFKA_EVENT_BUS %}
{% if RUN_KAFKA_SERVER %}
# needed by Kafka to keep track of nodes, topics, and messages.
zookeeper:
image: confluentinc/cp-zookeeper:6.2.1
Expand Down Expand Up @@ -36,6 +36,20 @@ kafka:
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

# storage layer for data schemas in Kafka
schema-registry:
image: confluentinc/cp-schema-registry:6.2.1
depends_on:
- kafka
ports:
- "18081:18081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:18081
{% endif %}

{% if RUN_KAFKA_SERVER and RUN_KAFKA_UI %}
# browser app for monitoring local Kafka cluster. This is quite memory- and CPU-intensive, so it should only be used for local Kafka debugging
kafka-control-center:
image: confluentinc/cp-enterprise-control-center:6.2.1
Expand All @@ -52,16 +66,4 @@ kafka-control-center:
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021

# storage layer for data schemas in Kafka
schema-registry:
image: confluentinc/cp-schema-registry:6.2.1
depends_on:
- kafka
ports:
- "18081:18081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:18081
{% endif %}
27 changes: 27 additions & 0 deletions tutorevent_bus_redis/patches/openedx-common-settings
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Settings for producing events
SEND_CATALOG_INFO_SIGNAL = {{ EVENT_BUS_SEND_CATALOG_INFO_SIGNAL }}


{% if EVENT_BUS_BACKEND == 'redis' %}
# redis connection url
# https://redis.readthedocs.io/en/stable/examples/ssl_connection_examples.html#Connecting-to-a-Redis-instance-via-a-URL-string
EVENT_BUS_REDIS_CONNECTION_URL = "redis://{% if REDIS_USERNAME and REDIS_PASSWORD %}{{ REDIS_USERNAME }}:{{REDIS_PASSWORD }}{% endif %}@{{ REDIS_HOST }}:{{ REDIS_PORT }}/"
EVENT_BUS_TOPIC_PREFIX = "{{ EVENT_BUS_REDIS_TOPIC_PREFIX }}"
EVENT_BUS_PRODUCER = "{{ EVENT_BUS_REDIS_PRODUCER }}"
EVENT_BUS_CONSUMER = "{{ EVENT_BUS_REDIS_CONSUMER }}"
{% if EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT %}
EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT = int("{{ EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT }}")
{% endif %}
EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT = int("{{ EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT }}")
EVENT_BUS_REDIS_STREAM_MAX_LEN = int("{{ EVENT_BUS_REDIS_STREAM_MAX_LEN }}")
{% endif %}

{% if EVENT_BUS_BACKEND == 'kafka' %}
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL = "{{ EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL }}"
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS = "{{ EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS }}"
EVENT_BUS_PRODUCER = "{{ EVENT_BUS_KAFKA_PRODUCER }}"
EVENT_BUS_CONSUMER = "{{ EVENT_BUS_KAFKA_CONSUMER }}"
EVENT_BUS_TOPIC_PREFIX = "{{ EVENT_BUS_KAFKA_TOPIC_PREFIX }}"
{% endif %}

EVENT_BUS_PRODUCER_CONFIG = {{ EVENT_BUS_PRODUCER_CONFIG }}
Empty file.
95 changes: 93 additions & 2 deletions tutorevent_bus_redis/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,99 @@
# Each new setting is a pair: (setting_name, default_value).
# Prefix your setting names with 'EVENT_BUS_REDIS_'.
("EVENT_BUS_REDIS_VERSION", __version__),
("RUN_KAFKA_EVENT_BUS", True),
("RUN_REDIS_EVENT_BUS", True),

# Possible values are "kafka", "redis", or None to disable the
# event bus
("EVENT_BUS_BACKEND", "kafka"),

# Settings for producing events
("EVENT_BUS_SEND_CATALOG_INFO_SIGNAL", True),
(
# FIXME: We should only install the one that's configured
"OPENEDX_EXTRA_PIP_REQUIREMENTS",
[
"edx-event-bus-redis==0.3.2",
"edx-event-bus-kafka==v5.6.0",
"confluent_kafka[avro,schema-registry]",
],
),

# These are a subset of known events, there are many more.
# FIXME: Update this to a saner config structure less likely to break
("EVENT_BUS_PRODUCER_CONFIG", """{
'org.openedx.content_authoring.xblock.published.v1': {
'content-authoring-xblock-lifecycle':
{'event_key_field': 'xblock_info.usage_key', 'enabled': False},
'content-authoring-xblock-published':
{'event_key_field': 'xblock_info.usage_key', 'enabled': False},
},
'org.openedx.content_authoring.xblock.deleted.v1': {
'content-authoring-xblock-lifecycle':
{'event_key_field': 'xblock_info.usage_key', 'enabled': False},
},
'org.openedx.learning.auth.session.login.completed.v1': {
'user-login': {'event_key_field': 'user.pii.username', 'enabled': True},
},
}
"""),

######################################
# redis backend settings
# Version of https://github.com/openedx/event-bus-redis to install
# This is what follows 'pip install' so you can use official versions
# or install from forks / branches / PRs here
("EVENT_BUS_REDIS_RELEASE", "edx-event-bus-redis=='0.3.2'"),

# If true, this will run a separate instance of redis just for the
# event bus to prevent resource conflicts with other services
# TODO: Implement this
# ("RUN_REDIS_SERVER", True),

# Prefix for topics sent over the event bus
("EVENT_BUS_REDIS_TOPIC_PREFIX", "openedx"),

# Producer class which can send events to redis streams.
("EVENT_BUS_REDIS_PRODUCER", "edx_event_bus_redis.create_producer"),

# Consumer class which can consume events from redis streams.
("EVENT_BUS_REDIS_CONSUMER", "edx_event_bus_redis.RedisEventConsumer"),

# If the consumer encounters this many consecutive errors, exit with an error. This is intended to be used in a
# context where a management system (such as Kubernetes) will relaunch the consumer automatically.
# Default is "None", which means the consumer will never relaunch.
("EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT", 0),

# How long the consumer should wait for new entries in a stream.
# As we are running the consumer in a while True loop, changing this setting doesn't make much difference expect
# for changing number of monitoring messages while waiting for new events.
# https://redis.io/commands/xread/#blocking-for-data
("EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT", 60),

# Limits stream size to approximately this number
("EVENT_BUS_REDIS_STREAM_MAX_LEN", 10_000),

######################################
# Kafka backend settings
# TODO: Move hard coded settings from local-docker-compose-services here
# Version of https://github.com/openedx/event-bus-kafka to install
# This is what follows 'pip install' so you can use official versions
# or install from forks / branches / PRs here
("EVENT_BUS_KAFKA_RELEASE", "edx-event-bus-kafka=='v5.6.0'"),

# This will run schema-manager, zookeeper and kafka. Set to False if you
# are using a 3rd party to host Kafka or managing it outside of Tutor.
("RUN_KAFKA_SERVER", True),

# This will run kafka-control-center. This consumes a lot of resources,
# you can turn it off separately from the required services. Requires
# RUN_KAFKA_SERVER to be True as well.
("RUN_KAFKA_UI", True),

("EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL", "http://schema-registry:18081"),
("EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS", "kafka:29092"),
("EVENT_BUS_KAFKA_PRODUCER", "edx_event_bus_kafka.create_producer"),
("EVENT_BUS_KAFKA_CONSUMER", "edx_event_bus_kafka.KafkaEventConsumer"),
("EVENT_BUS_KAFKA_TOPIC_PREFIX", "dev"),
]
)

Expand Down

0 comments on commit e5d1121

Please sign in to comment.