Skip to content

Commit

Permalink
put prefix_bucket back for parse_and_validate_rt and document env var…
Browse files Browse the repository at this point in the history
… configuration
  • Loading branch information
atvaccaro committed Aug 17, 2022
1 parent 1023d17 commit 74fe5d9
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 9 deletions.
9 changes: 9 additions & 0 deletions airflow/dags/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,18 @@ def airtable_mapping_generate_sql(table1, table2, col1, col2):
# key is alias that will be used to reference the template in DAG tasks
# value is name of function template as defined above


def prefix_bucket(bucket):
# TODO: use once we're in python 3.9+
# bucket = bucket.removeprefix("gs://")
bucket = bucket.replace("gs://", "")
return f"gs://test-{bucket}" if is_development() else f"gs://{bucket}"


data_infra_macros = {
"sql_airtable_mapping": airtable_mapping_generate_sql,
"is_development": is_development_macro,
"image_tag": lambda: "development" if is_development() else "latest",
"env_var": lambda key: os.getenv(key),
"prefix_bucket": prefix_bucket,
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ arguments:
- "service_alerts"
# we run on a delay because of archiver write latency but then we process the prior _hour_
- "{{ get_bucket() }}/rt/{{ execution_date.replace(minute=0, second=0).format('YYYY-MM-DDTHH*') }}"
- "{{ env_var('CALITP_BUCKET__RT_PARSED') }}"
- "{{ prefix_bucket('rt-parsed') }}"
- "--parse"
- "--verbose"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ arguments:
- "trip_updates"
# we run on a delay because of archiver write latency but then we process the prior _hour_
- "{{ get_bucket() }}/rt/{{ execution_date.replace(minute=0, second=0).format('YYYY-MM-DDTHH*') }}"
- "{{ env_var('CALITP_BUCKET__RT_PARSED') }}"
- "{{ prefix_bucket('rt-parsed') }}"
- "--parse"
- "--verbose"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ arguments:
- "vehicle_positions"
# we run on a delay because of archiver write latency but then we process the prior _hour_
- "{{ get_bucket() }}/rt/{{ execution_date.replace(minute=0, second=0).format('YYYY-MM-DDTHH*') }}"
- "{{ env_var('CALITP_BUCKET__RT_PARSED') }}"
- "{{ prefix_bucket('rt-parsed') }}"
- "--parse"
- "--verbose"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ arguments:
- "service_alerts"
# we run on a delay because of archiver write latency but then we process the prior _hour_
- "{{ get_bucket() }}/rt/{{ execution_date.replace(minute=0, second=0).format('YYYY-MM-DDTHH*') }}"
- "{{ env_var('CALITP_BUCKET__RT_PARSED') }}"
- "{{ prefix_bucket('rt-parsed') }}"
- "--threads=2"
- "--validate"
- "--verbose"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ arguments:
- "trip_updates"
# we run on a delay because of archiver write latency but then we process the prior _hour_
- "{{ get_bucket() }}/rt/{{ execution_date.replace(minute=0, second=0).format('YYYY-MM-DDTHH*') }}"
- "{{ env_var('CALITP_BUCKET__RT_PARSED') }}"
- "{{ prefix_bucket('rt-parsed') }}"
- "--threads=2"
- "--validate"
- "--verbose"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ arguments:
- "vehicle_positions"
# we run on a delay because of archiver write latency but then we process the prior _hour_
- "{{ get_bucket() }}/rt/{{ execution_date.replace(minute=0, second=0).format('YYYY-MM-DDTHH*') }}"
- "{{ env_var('CALITP_BUCKET__RT_PARSED') }}"
- "{{ prefix_bucket('rt-parsed') }}"
- "--threads=2"
- "--validate"
- "--verbose"
Expand Down
6 changes: 3 additions & 3 deletions airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ x-airflow-common:
GOOGLE_CLOUD_PROJECT: cal-itp-data-infra

CALITP_BUCKET__AIRTABLE: "gs://test-calitp-airtable"
CALITP_BUCKET__GTFS_RT_RAW: "gs://test-calitp-gtfs-rt-raw"
CALITP_BUCKET__GTFS_RT_PARSED: "gs://test-calitp-gtfs-rt-parsed"
CALITP_BUCKET__GTFS_RT_VALIDATION: "gs://test-calitp-gtfs-rt-validation"
# CALITP_BUCKET__GTFS_RT_RAW: "gs://test-calitp-gtfs-rt-raw"
# CALITP_BUCKET__GTFS_RT_PARSED: "gs://test-calitp-gtfs-rt-parsed"
# CALITP_BUCKET__GTFS_RT_VALIDATION: "gs://test-calitp-gtfs-rt-validation"
CALITP_BUCKET__GTFS_SCHEDULE_RAW: "gs://test-calitp-gtfs-schedule-raw"
CALITP_BUCKET__GTFS_SCHEDULE_VALIDATION: "gs://test-calitp-gtfs-schedule-validation"

Expand Down

0 comments on commit 74fe5d9

Please sign in to comment.