-
-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dagster cli wrapper #2272
Dagster cli wrapper #2272
Conversation
I haven't figured out how to select tables to create using the config files. The API method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! A couple questions/suggestions that you should feel free to incorporate or not.
src/pudl/cli.py
Outdated
check_foreign_keys=not args.ignore_foreign_key_constraints, | ||
check_types=not args.ignore_type_constraints, | ||
check_values=not args.ignore_value_constraints, | ||
etl_job = etl.defs.get_job_def("etl_full") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-blocking - I think it might be nice to redefine the job here instead of pulling in an existing job def. This claims "etl_full" even if we are using the etl_fast.yml
, which could get confusing. Here's one kind of janky way to get the right label - I'm not totally happy with it but also this is the least invasive way I could think of to do it. Maybe someone who knows more about the Dagster API has a better shot :)
diff --git a/src/pudl/cli.py b/src/pudl/cli.py
index afee5d1e..a2dead3b 100644
--- a/src/pudl/cli.py
+++ b/src/pudl/cli.py
@@ -11,8 +11,11 @@ The output SQLite and Parquet files will be stored in ``PUDL_OUT`` in directorie
directories see ``pudl_setup --help``.
"""
import argparse
+from pathlib import Path
+import re
import sys
+from dagster import Definitions, define_asset_job
from dotenv import load_dotenv
import pudl
@@ -69,13 +72,20 @@ def main():
pudl.logging_helpers.configure_root_logger(
logfile=args.logfile, loglevel=args.loglevel
)
+ settings_path = Path(args.settings_file)
+
+ etl_job_name = re.sub("\W+", "_", f"etl_custom_{settings_path.stem}")
+ etl_job = Definitions(
+ assets=etl.default_assets,
+ resources=etl.default_resources,
+ jobs=[define_asset_job(etl_job_name)],
+ ).get_job_def(etl_job_name)
- etl_job = etl.defs.get_job_def("etl_full")
etl_job.execute_in_process(
run_config={
"resources": {
"dataset_settings": {
- "config": EtlSettings.from_yaml(args.settings_file).datasets.dict(
+ "config": EtlSettings.from_yaml(settings_path).datasets.dict(
exclude={
"ferc1": {"tables"},
"eia": {
diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py
index 3acb075a..258eb979 100644
--- a/src/pudl/etl/__init__.py
+++ b/src/pudl/etl/__init__.py
@@ -24,7 +24,7 @@ from . import ( # noqa: F401
logger = pudl.logging_helpers.get_logger(__name__)
-assets = (
+default_assets = (
*load_assets_from_modules([output_assets], group_name="output_tables"),
*load_assets_from_modules([epacems_assets], group_name="epacems"),
*load_assets_from_modules([eia_api_assets], group_name="eia_api"),
@@ -36,17 +36,19 @@ assets = (
*load_assets_from_modules([pudl.transform.ferc1], group_name="ferc_assets"),
)
+default_resources = {
+ "datastore": datastore,
+ "pudl_sqlite_io_manager": pudl_sqlite_io_manager,
+ "ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager,
+ "ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager,
+ "dataset_settings": dataset_settings,
+ "ferc_to_sqlite_settings": ferc_to_sqlite_settings,
+ "epacems_io_manager": epacems_io_manager,
+}
+
defs = Definitions(
- assets=assets,
- resources={
- "datastore": datastore,
- "pudl_sqlite_io_manager": pudl_sqlite_io_manager,
- "ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager,
- "ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager,
- "dataset_settings": dataset_settings,
- "ferc_to_sqlite_settings": ferc_to_sqlite_settings,
- "epacems_io_manager": epacems_io_manager,
- },
+ assets=default_assets,
+ resources=default_resources,
jobs=[
define_asset_job(name="etl_full"),
define_asset_job(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! I've started creating new jobs, and using them execute_job
to get concurrency.
args.settings_file | ||
).ferc_to_sqlite_settings.dict( | ||
exclude={ | ||
"ferc1_dbf_to_sqlite_settings": {"tables"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wish there was a less annoying way to do this... seems brittle when we add new tables. I guess we could potentially export the dict, then do some sort of recursive cleaning function to remove all "tables"
keys... shrug. We can deal with it next time we add a FERC data source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we want to remove the functionality to select a subset of tables, as @bendnorman mentioned below, we can just remove the tables
from the settings and not have to worry about it all. I'd like others thoughts before just completely removing them though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with it! Though maybe we should deprecate now & then give it some time before we remove the settings completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of deprecating the table selection before we remove the settings property from the settings object.
@@ -113,4 +113,3 @@ datasets: | |||
# so if you're loading CEMS data for a particular year, you should | |||
# also load the EIA 860 data for that year if possible | |||
states: [ID, ME] | |||
years: [2019, 2020, 2021] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this an intentional deletion? It's making my etl_fast
run take as long as it did to review the rest of this PR 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not intentional! My bad, should be back to normal now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were the years added back?
I'll do some poking around to see if this is possible. I think dropping this functionality is probably fine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like selecting upstream assets works using this notation: ["*plants_steam_ferc1"]
. However, dagster fails to reload the code when the upstream assets are produced by a multi_asset :/ I'll open an issue in the dagster repo.
For now I say we deprecate selecting subsets of tables.
src/pudl/cli.py
Outdated
check_types=not args.ignore_type_constraints, | ||
check_values=not args.ignore_value_constraints, | ||
etl_job = etl.defs.get_job_def("etl_full") | ||
etl_job.execute_in_process( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execute_in_process
processes all assets in the same process so we aren't getting of that sweet sweet parallelism. I think we need to use the execute_job
function:
This API represents dagster’s python entrypoint for out-of-process execution. For most testing purposes, execute_in_process() will be more suitable, but when wanting to run execution using an out-of-process executor (such as dagster. multiprocess_executor), then execute_job is suitable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to using this!
"resources": { | ||
"dataset_settings": { | ||
"config": EtlSettings.from_yaml(args.settings_file).datasets.dict( | ||
exclude={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we aren't letting people select tables do we need to use exclude
? I agree with @jdangerx it might be cleaner to recursively remove the table keys like pudl.settings.create_dagster_config()
. Eventually, should we just remove the table attribute from settings objects or convert them to class variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think maybe we just remove them entirely, because I don't think there's any use for them if they're not being used for selecting subsets of tables. I can also add a method as suggested though if we want to keep the tables for some reason
src/pudl/cli.py
Outdated
check_foreign_keys=not args.ignore_foreign_key_constraints, | ||
check_types=not args.ignore_type_constraints, | ||
check_values=not args.ignore_value_constraints, | ||
etl_job = etl.defs.get_job_def("etl_full") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we just get the job name from the settings file?
etl_settings = EtlSettings.from_yaml(args.settings_file)
etl_job = etl.defs.get_job_def(etl_settings.name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could imagine someone making their own etl settings file that doesn't match the name of etl_full
or etl_fast
which would be a kind of confusing error. But maybe nobody does that and people just edit their etl_fast
to whatever they want 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around with this using build_reconstructable_job
to dynamically create jobs with different names, but dagster has pretty strict requirements for names (i.e. no -
characters allowed), so it might be more trouble than it's worth as many custom names might generate errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of thoughts. Also curious how people feel about how to handle tables
@@ -60,6 +67,15 @@ def parse_command_line(argv): | |||
return arguments | |||
|
|||
|
|||
def get_etl_job(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reconstructable
used below requires a module level function that generates a job. I played around with build_reconstructable_job
for more dynamic job creation, but found it's not too useful right now.
@@ -8,6 +8,7 @@ | |||
import argparse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move this file to the ferc_to_sqlite
module? Could be something like src/pudl/ferc_to_sqlite/cli.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a less surprising place for it, yeah.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move src/pudl/cli.py
to src/pudl/etl/cli.py
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
@@ -8,6 +8,7 @@ | |||
import argparse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a less surprising place for it, yeah.
I moved the |
This is looking good! I'm glad the multiprocessing is working. What would removing the Also, I misinterpreted the multi_asset selection error. We can actually select subsets of assets produced by a multi_asset. This might add more complexity than it's worth though. |
PR Overview
This PR wraps the new dagster ETL with the old CLI. This will allow the interface to remain largely unchanged to help users transition to the dagster update.
PR Checklist
dev
).