-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add script to create kafka topics #10080
Conversation
@@ -61,6 +61,7 @@ per-file-ignores = | |||
./ee/management/commands/migrate_clickhouse.py: T001 | |||
./ee/management/commands/run_async_migrations.py: T001 | |||
./ee/management/commands/backfill_persons_and_groups_on_events.py: T001 | |||
./ee/management/commands/create_kafka_topics.py: T001 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be adding this here. There are always better solutions than print.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All commands seem to use print as you see here hence the choice
) | ||
|
||
if len(missing_topics) > 0: | ||
print("Some topics are missing! PostHog may not work correctly.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use logger instead of print.
print(f"Topic {topic.name} already exists. Skipping creation...\n") | ||
existing_topics.append(topic.name) | ||
except Exception as e: | ||
print(f"Could not create topic {topic.name} with error: {e.__str__()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use logger instead of print, passing the error as a separate arg. Decent logging platforms will be able to take advantage of that.
created_topics_str = ", ".join(created_topics) | ||
existing_topics_str = ", ".join(existing_topics) | ||
missing_topics_str = ", ".join(missing_topics) | ||
print( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use logger instead of print, passing each array as separate (and not joining them).
help = "Set up databases for non-Python tests that depend on the Django server" | ||
|
||
def handle(self, *args, **options): | ||
admin_client = KafkaAdminClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem: We have ee/kafka_client/client.py. Now any change with authentication will need to be synced across the two.
Suggestion: Centralize.
@@ -126,6 +126,8 @@ | |||
KAFKA_SASL_USER = os.getenv("KAFKA_SASL_USER", None) | |||
KAFKA_SASL_PASSWORD = os.getenv("KAFKA_SASL_PASSWORD", None) | |||
|
|||
KAFKA_DEFAULT_TOPIC_REPLICATION_FACTOR = os.getenv("KAFKA_DEFAULT_TOPIC_REPLICATION_FACTOR", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels fundamentally dangerous to specify this way - it should lead to installations with insufficient replication factors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How else should this be specified? Currently all topics are autocreated and I believe with a replication factor of 1.
Somewhere this needs to be a config option, even if not here
Maybe but not as-is. Current solution is pretty raw and geared towards out cloud setups, both smaller and larger setups would be broken with the solution outlined here. We should not be controlling any of partition counts, replication factors and retention in python code for them, this is chart-level important setup! I'm worried about making this a django command - this leaves us a maintanence burden and makes it more likely we'll mess this up somehow. Rather than make this a "official" command can we e.g. chunk it in posthog/oneoffs or similar? This gives a clear indication this is only intended to work as we merge this. |
This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the |
This PR was closed due to 2 weeks of inactivity. Feel free to reopen it if still relevant. |
To do
Problem
This gets us closer to solving two problems:
For problem 2, I initially considered spinning up some Kafka Manager UI solution, but I actually quite like defining topics in code like this, as we would define other schemas.
Changes
Add a script to create Kafka topics according to their definitions in code.
Note that this should be added as a command at the create/upgrade/migrate step (e.g. after
migrate_clickhouse
), but currently it isn't being added anymore.Also note that this will be a no-op for all existing deployments. Topics already exist and so nothing will happen.
For new deployments though, topics will be created at this step rather than autocreated when a producer/consumer is set up.
How did you test this code?
So far only manually