From 41af70ab2754fce054c62a7a12b10cf9aad77a80 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Wed, 24 Aug 2022 14:45:28 -0400 Subject: [PATCH 01/12] wip getting multipart upload to ckan working --- .../_gtfs_schedule_latest_only.yml | 36 ++++---- .../gtfs_schedule_latest_only/stop_times.sql | 1 + warehouse/scripts/publish.py | 85 ++++++++++++++++--- 3 files changed, 93 insertions(+), 29 deletions(-) diff --git a/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml b/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml index 813acd254b..9535b105a7 100644 --- a/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml +++ b/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml @@ -756,24 +756,24 @@ exposures: - type: ckan bucket: gs://calitp-publish format: csv - url: https://data.ca.gov/api/3/action/resource_update + url: https://data.ca.gov ids: - agency: e8f9d49e-2bb6-400b-b01f-28bc2e0e7df2 - routes: c6bbb637-988f-431c-8444-aef7277297f8 +# agency: e8f9d49e-2bb6-400b-b01f-28bc2e0e7df2 + # routes: c6bbb637-988f-431c-8444-aef7277297f8 # TODO: add stop_times back in once size limit lifted - # stop_times: d31eef2f-e223-4ca4-a86b-170acc6b2590 - stops: 8c876204-e12b-48a2-8299-10f6ae3d4f2b - trips: 0e4da89e-9330-43f8-8de9-305cb7d4918f - attributions: 038b7354-06e8-4082-a4a1-40debd3110d5 - calendar: a79f10b8-b322-43f3-b3f4-ba46a8dbe9ab - calendar_dates: 06a21a8e-dba3-4e7e-8726-f2e992cc1a80 - fare_attributes: 9db51dfa-fd6d-481b-b655-96e8af722ab5 - fare_rules: 4fb1fa39-0ef9-457d-97b1-bb7e9e848312 - feed_info: 50d12559-635e-4222-ac25-3706c066902d - frequencies: 48542c8f-8ce1-43e3-a965-6c68771d6fe5 - levels: 288a08cd-7929-479e-aa88-08b677a08510 - pathways: a01484af-c460-40a4-ac8a-896b0196e8c2 + stop_times: d31eef2f-e223-4ca4-a86b-170acc6b2590 + # stops: 8c876204-e12b-48a2-8299-10f6ae3d4f2b + # trips: 0e4da89e-9330-43f8-8de9-305cb7d4918f + # attributions: 038b7354-06e8-4082-a4a1-40debd3110d5 + # calendar: a79f10b8-b322-43f3-b3f4-ba46a8dbe9ab + # calendar_dates: 06a21a8e-dba3-4e7e-8726-f2e992cc1a80 + # fare_attributes: 9db51dfa-fd6d-481b-b655-96e8af722ab5 + # fare_rules: 4fb1fa39-0ef9-457d-97b1-bb7e9e848312 + # feed_info: 50d12559-635e-4222-ac25-3706c066902d + # frequencies: 48542c8f-8ce1-43e3-a965-6c68771d6fe5 + # levels: 288a08cd-7929-479e-aa88-08b677a08510 + # pathways: a01484af-c460-40a4-ac8a-896b0196e8c2 # TODO: add shapes when the size limit has been lifted -# shapes: 2f5e7bdb-33e8-4633-b163-6bab42ad0951 - transfers: f8dcda5d-0c6d-4c70-b5f5-6716adcf6ffc - translations: 7abe9256-6cd2-4c1f-9b6a-72108022a382 + # shapes: 2f5e7bdb-33e8-4633-b163-6bab42ad0951 + # transfers: f8dcda5d-0c6d-4c70-b5f5-6716adcf6ffc + # translations: 7abe9256-6cd2-4c1f-9b6a-72108022a382 diff --git a/warehouse/models/gtfs_schedule_latest_only/stop_times.sql b/warehouse/models/gtfs_schedule_latest_only/stop_times.sql index 0804f76723..55623ec23f 100644 --- a/warehouse/models/gtfs_schedule_latest_only/stop_times.sql +++ b/warehouse/models/gtfs_schedule_latest_only/stop_times.sql @@ -10,3 +10,4 @@ stop_times AS ( ) SELECT * FROM stop_times +LIMIT 1740900 diff --git a/warehouse/scripts/publish.py b/warehouse/scripts/publish.py index 907b10fccd..cdbc0c7456 100644 --- a/warehouse/scripts/publish.py +++ b/warehouse/scripts/publish.py @@ -1,13 +1,14 @@ """ Publishes various dbt models to various sources. """ +import functools from datetime import timedelta import csv import pendulum from google.cloud import bigquery -from typing import Optional, Literal, List, Dict +from typing import Optional, Literal, List, Dict, BinaryIO, TextIO from pathlib import Path @@ -66,6 +67,64 @@ def make_linestring(x): return pts[0] +CHUNK_SIZE = 64 * 1024 * 1024 + + +def upload_to_ckan(url: str, fname: str, fsize: int, file: TextIO, resource_id: str, api_key: str): + typer.secho(f"uploading {humanize.naturalsize(fsize)} to {resource_id}") + if fsize <= CHUNK_SIZE: + requests.post( + f"{url}/api/3/action/resource_update", + data={"id": resource_id}, + headers={"Authorization": API_KEY}, + files={"upload": file}, + ).raise_for_status() + else: + initiate_response = requests.post( + f"{url}/api/3/action/cloudstorage_initiate_multipart", + json={ + "id": resource_id, + "name": fname, + "size": fsize, + }, + headers={"Authorization": API_KEY}, + ) + initiate_response.raise_for_status() + upload_id = initiate_response.json()["result"]["id"] + + # https://stackoverflow.com/a/54989668 + chunker = functools.partial(file.read, CHUNK_SIZE) + for i, chunk in enumerate(iter(chunker, ""), start=1): + try: + typer.secho(f"uploading part {i} to multipart upload {upload_id}") + requests.post( + f"{url}/api/3/action/cloudstorage_upload_multipart", + json={ + "id": resource_id, + "uploadId": upload_id, + "partNumber": str(i), # the server throws a 500 if this isn't a string + "upload": (fname, chunk, "text/plain") + }, + headers={"Authorization": API_KEY}, + ).raise_for_status() + except Exception: + requests.post( + f"{url}/api/3/action/cloudstorage_abort_multipart", + data={ + "id": resource_id, + "uploadId": upload_id, + } + ).raise_for_status() + raise + requests.post( + f"{url}/api/3/action/cloudstorage_finish_multipart", + data={ + "uploadId": upload_id, + "save_action": "go-metadata", + } + ).raise_for_status() + + def _publish_exposure( project: str, bucket: str, exposure: Exposure, dry_run: bool, deploy: bool ): @@ -79,8 +138,8 @@ def _publish_exposure( ) # TODO: this should probably be driven by the depends_on nodes - for model_name, ckan_id in destination.ids.items(): - typer.secho(f"handling {model_name} {ckan_id}") + for model_name, resource_id in destination.ids.items(): + typer.secho(f"handling {model_name} {resource_id}") node = BaseNode._instances[f"model.calitp_warehouse.{model_name}"] fpath = os.path.join(tmpdir, destination.filename(model_name)) @@ -117,7 +176,7 @@ def _publish_exposure( hive_path = destination.hive_path(exposure, model_name, bucket) write_msg = f"writing {model_name} to {hive_path}" - upload_msg = f"uploading to {destination.url} {ckan_id}" + upload_msg = f"uploading to {destination.url} {resource_id}" if dry_run: typer.secho( f"would be {write_msg} and {upload_msg}", @@ -127,16 +186,20 @@ def _publish_exposure( typer.secho(write_msg, fg=typer.colors.GREEN) fs = gcsfs.GCSFileSystem(token="google_default") fs.put(fpath, hive_path) + fname=destination.filename(model_name) + fsize =os.path.getsize(fpath) if deploy: typer.secho(upload_msg, fg=typer.colors.GREEN) - with open(fpath, "rb") as fp: - requests.post( - destination.url, - data={"id": ckan_id}, - headers={"Authorization": API_KEY}, - files={"upload": fp}, - ).raise_for_status() + with open(fpath, "r") as fp: + upload_to_ckan( + url=destination.url, + fname=fname, + fsize=fsize, + file=fp, + resource_id=resource_id, + api_key=API_KEY, + ) else: typer.secho( f"would be {upload_msg} if --deploy", From 60a517abdc90ad232680697f2fa58ec233259bd5 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Wed, 24 Aug 2022 16:49:22 -0400 Subject: [PATCH 02/12] remove before I forget --- warehouse/models/gtfs_schedule_latest_only/stop_times.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/warehouse/models/gtfs_schedule_latest_only/stop_times.sql b/warehouse/models/gtfs_schedule_latest_only/stop_times.sql index 55623ec23f..0804f76723 100644 --- a/warehouse/models/gtfs_schedule_latest_only/stop_times.sql +++ b/warehouse/models/gtfs_schedule_latest_only/stop_times.sql @@ -10,4 +10,3 @@ stop_times AS ( ) SELECT * FROM stop_times -LIMIT 1740900 From a89e3b75f86641d0315ff60d350b9544bbf4eed4 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Thu, 25 Aug 2022 19:12:10 -0400 Subject: [PATCH 03/12] mirror the example script... we get 500s with too many chunks --- warehouse/scripts/publish.py | 82 +++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/warehouse/scripts/publish.py b/warehouse/scripts/publish.py index cdbc0c7456..b8257e0ad7 100644 --- a/warehouse/scripts/publish.py +++ b/warehouse/scripts/publish.py @@ -8,7 +8,7 @@ import pendulum from google.cloud import bigquery -from typing import Optional, Literal, List, Dict, BinaryIO, TextIO +from typing import Optional, Literal, List, Dict, BinaryIO from pathlib import Path @@ -29,6 +29,8 @@ import requests import typer from pydantic import BaseModel +from requests import Response +from requests_toolbelt import MultipartEncoder from dbt_artifacts import ( BaseNode, @@ -70,58 +72,88 @@ def make_linestring(x): CHUNK_SIZE = 64 * 1024 * 1024 -def upload_to_ckan(url: str, fname: str, fsize: int, file: TextIO, resource_id: str, api_key: str): - typer.secho(f"uploading {humanize.naturalsize(fsize)} to {resource_id}") +def upload_to_ckan( + url: str, fname: str, fsize: int, file: BinaryIO, resource_id: str, api_key: str +): + def ckan_request(action: str, data: Dict) -> Response: + encoder = MultipartEncoder(fields=data) + return requests.post( + f"{url}/api/action/{action}", + data=encoder, + headers={"Content-Type": encoder.content_type, "X-CKAN-API-Key": API_KEY}, + ) + if fsize <= CHUNK_SIZE: + typer.secho(f"uploading {humanize.naturalsize(fsize)} to {resource_id}") requests.post( - f"{url}/api/3/action/resource_update", + f"{url}/api/action/resource_update", data={"id": resource_id}, headers={"Authorization": API_KEY}, files={"upload": file}, ).raise_for_status() + elif fsize / CHUNK_SIZE > 4: + raise RuntimeError("we probably cannot upload more than 4 chunks to CKAN") else: - initiate_response = requests.post( - f"{url}/api/3/action/cloudstorage_initiate_multipart", - json={ + typer.secho( + f"uploading {humanize.naturalsize(fsize)} to {resource_id} in {humanize.naturalsize(CHUNK_SIZE)} chunks" + ) + initiate_response = ckan_request( + action="cloudstorage_initiate_multipart", + data={ "id": resource_id, "name": fname, - "size": fsize, + "size": str(fsize), }, - headers={"Authorization": API_KEY}, ) initiate_response.raise_for_status() upload_id = initiate_response.json()["result"]["id"] # https://stackoverflow.com/a/54989668 chunker = functools.partial(file.read, CHUNK_SIZE) - for i, chunk in enumerate(iter(chunker, ""), start=1): + for i, chunk in enumerate(iter(chunker, b""), start=1): + if i > 100: + raise RuntimeError( + "stopping after 100 chunks, this should be re-considered" + ) try: typer.secho(f"uploading part {i} to multipart upload {upload_id}") - requests.post( - f"{url}/api/3/action/cloudstorage_upload_multipart", - json={ + ckan_request( + action="cloudstorage_upload_multipart", + data={ "id": resource_id, "uploadId": upload_id, - "partNumber": str(i), # the server throws a 500 if this isn't a string - "upload": (fname, chunk, "text/plain") + "partNumber": str( + i + ), # the server throws a 500 if this isn't a string + "upload": (fname, chunk, "text/plain"), }, - headers={"Authorization": API_KEY}, ).raise_for_status() except Exception: - requests.post( - f"{url}/api/3/action/cloudstorage_abort_multipart", + ckan_request( + action="cloudstorage_abort_multipart", data={ "id": resource_id, "uploadId": upload_id, - } + }, ).raise_for_status() raise - requests.post( - f"{url}/api/3/action/cloudstorage_finish_multipart", + ckan_request( + action="cloudstorage_finish_multipart", data={ "uploadId": upload_id, + "id": resource_id, "save_action": "go-metadata", - } + }, + ).raise_for_status() + ckan_request( + action="resource_patch", + data={ + "id": resource_id, + "multipart_name": fname, + "url": fname, + "size": str(fsize), + "url_type": "upload", + }, ).raise_for_status() @@ -186,12 +218,12 @@ def _publish_exposure( typer.secho(write_msg, fg=typer.colors.GREEN) fs = gcsfs.GCSFileSystem(token="google_default") fs.put(fpath, hive_path) - fname=destination.filename(model_name) - fsize =os.path.getsize(fpath) + fname = destination.filename(model_name) + fsize = os.path.getsize(fpath) if deploy: typer.secho(upload_msg, fg=typer.colors.GREEN) - with open(fpath, "r") as fp: + with open(fpath, "rb") as fp: upload_to_ckan( url=destination.url, fname=fname, From b3bcfe974cc2d3ec38942b55d42be9104da5b6fd Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Fri, 26 Aug 2022 11:55:53 -0400 Subject: [PATCH 04/12] commit this while it is working --- .../_gtfs_schedule_latest_only.yml | 36 +++++++++---------- .../gtfs_schedule_latest_only/stop_times.sql | 1 + warehouse/scripts/publish.py | 23 ++++++++++-- warehouse/scripts/run_and_upload.py | 3 +- 4 files changed, 41 insertions(+), 22 deletions(-) diff --git a/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml b/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml index 9535b105a7..d7f563b668 100644 --- a/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml +++ b/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml @@ -758,22 +758,20 @@ exposures: format: csv url: https://data.ca.gov ids: -# agency: e8f9d49e-2bb6-400b-b01f-28bc2e0e7df2 - # routes: c6bbb637-988f-431c-8444-aef7277297f8 - # TODO: add stop_times back in once size limit lifted - stop_times: d31eef2f-e223-4ca4-a86b-170acc6b2590 - # stops: 8c876204-e12b-48a2-8299-10f6ae3d4f2b - # trips: 0e4da89e-9330-43f8-8de9-305cb7d4918f - # attributions: 038b7354-06e8-4082-a4a1-40debd3110d5 - # calendar: a79f10b8-b322-43f3-b3f4-ba46a8dbe9ab - # calendar_dates: 06a21a8e-dba3-4e7e-8726-f2e992cc1a80 - # fare_attributes: 9db51dfa-fd6d-481b-b655-96e8af722ab5 - # fare_rules: 4fb1fa39-0ef9-457d-97b1-bb7e9e848312 - # feed_info: 50d12559-635e-4222-ac25-3706c066902d - # frequencies: 48542c8f-8ce1-43e3-a965-6c68771d6fe5 - # levels: 288a08cd-7929-479e-aa88-08b677a08510 - # pathways: a01484af-c460-40a4-ac8a-896b0196e8c2 - # TODO: add shapes when the size limit has been lifted - # shapes: 2f5e7bdb-33e8-4633-b163-6bab42ad0951 - # transfers: f8dcda5d-0c6d-4c70-b5f5-6716adcf6ffc - # translations: 7abe9256-6cd2-4c1f-9b6a-72108022a382 + agency: e8f9d49e-2bb6-400b-b01f-28bc2e0e7df2 + routes: c6bbb637-988f-431c-8444-aef7277297f8 + # stop_times: d31eef2f-e223-4ca4-a86b-170acc6b2590 + stops: 8c876204-e12b-48a2-8299-10f6ae3d4f2b + trips: 0e4da89e-9330-43f8-8de9-305cb7d4918f + attributions: 038b7354-06e8-4082-a4a1-40debd3110d5 + calendar: a79f10b8-b322-43f3-b3f4-ba46a8dbe9ab + calendar_dates: 06a21a8e-dba3-4e7e-8726-f2e992cc1a80 + fare_attributes: 9db51dfa-fd6d-481b-b655-96e8af722ab5 + fare_rules: 4fb1fa39-0ef9-457d-97b1-bb7e9e848312 + feed_info: 50d12559-635e-4222-ac25-3706c066902d + frequencies: 48542c8f-8ce1-43e3-a965-6c68771d6fe5 + levels: 288a08cd-7929-479e-aa88-08b677a08510 + pathways: a01484af-c460-40a4-ac8a-896b0196e8c2 + shapes: 2f5e7bdb-33e8-4633-b163-6bab42ad0951 + transfers: f8dcda5d-0c6d-4c70-b5f5-6716adcf6ffc + translations: 7abe9256-6cd2-4c1f-9b6a-72108022a382 diff --git a/warehouse/models/gtfs_schedule_latest_only/stop_times.sql b/warehouse/models/gtfs_schedule_latest_only/stop_times.sql index 0804f76723..cfa0452ef7 100644 --- a/warehouse/models/gtfs_schedule_latest_only/stop_times.sql +++ b/warehouse/models/gtfs_schedule_latest_only/stop_times.sql @@ -10,3 +10,4 @@ stop_times AS ( ) SELECT * FROM stop_times +LIMIT 1040900 diff --git a/warehouse/scripts/publish.py b/warehouse/scripts/publish.py index b8257e0ad7..a571e25d28 100644 --- a/warehouse/scripts/publish.py +++ b/warehouse/scripts/publish.py @@ -91,8 +91,6 @@ def ckan_request(action: str, data: Dict) -> Response: headers={"Authorization": API_KEY}, files={"upload": file}, ).raise_for_status() - elif fsize / CHUNK_SIZE > 4: - raise RuntimeError("we probably cannot upload more than 4 chunks to CKAN") else: typer.secho( f"uploading {humanize.naturalsize(fsize)} to {resource_id} in {humanize.naturalsize(CHUNK_SIZE)} chunks" @@ -145,6 +143,9 @@ def ckan_request(action: str, data: Dict) -> Response: "save_action": "go-metadata", }, ).raise_for_status() + typer.secho( + f"finished multipart upload_id {upload_id}", fg=typer.colors.MAGENTA + ) ckan_request( action="resource_patch", data={ @@ -155,6 +156,7 @@ def ckan_request(action: str, data: Dict) -> Response: "url_type": "upload", }, ).raise_for_status() + typer.secho(f"patched resource {resource_id}", fg=typer.colors.MAGENTA) def _publish_exposure( @@ -538,5 +540,22 @@ def publish_exposure( ) +@app.command() +def multipart_ckan_upload( + resource_id: str, + fpath: Path, + url: str = "https://data.ca.gov", +): + with open(fpath, "rb") as f: + upload_to_ckan( + url=url, + fname=fpath.name, + fsize=os.path.getsize(fpath), + file=f, + resource_id=resource_id, + api_key=API_KEY, + ) + + if __name__ == "__main__": app() diff --git a/warehouse/scripts/run_and_upload.py b/warehouse/scripts/run_and_upload.py index d6495f3a66..0d9ad9fed6 100755 --- a/warehouse/scripts/run_and_upload.py +++ b/warehouse/scripts/run_and_upload.py @@ -79,7 +79,8 @@ def get_command(*args) -> List[str]: os.mkdir("docs/") fs = gcsfs.GCSFileSystem( - project="cal-itp-data-infra", token=os.getenv("BIGQUERY_KEYFILE_LOCATION") + project="cal-itp-data-infra", + token=os.getenv("BIGQUERY_KEYFILE_LOCATION"), ) for artifact in artifacts: From 95ea51643c056632afeb9915673584ecfb8dd080 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Fri, 26 Aug 2022 12:59:57 -0400 Subject: [PATCH 05/12] add this back --- .../gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml b/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml index d7f563b668..7aae1ea2eb 100644 --- a/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml +++ b/warehouse/models/gtfs_schedule_latest_only/_gtfs_schedule_latest_only.yml @@ -760,7 +760,7 @@ exposures: ids: agency: e8f9d49e-2bb6-400b-b01f-28bc2e0e7df2 routes: c6bbb637-988f-431c-8444-aef7277297f8 - # stop_times: d31eef2f-e223-4ca4-a86b-170acc6b2590 + stop_times: d31eef2f-e223-4ca4-a86b-170acc6b2590 stops: 8c876204-e12b-48a2-8299-10f6ae3d4f2b trips: 0e4da89e-9330-43f8-8de9-305cb7d4918f attributions: 038b7354-06e8-4082-a4a1-40debd3110d5 From e9a1f1014ac1daf4d9fe49697152509bd1aafbb9 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Fri, 26 Aug 2022 13:00:51 -0400 Subject: [PATCH 06/12] allow env var to control target and bucket --- airflow/dags/macros.py | 2 +- .../dags/transform_warehouse/dbt_run_and_upload_artifacts.yml | 4 +++- airflow/docker-compose.yaml | 3 +++ warehouse/scripts/run_and_upload.py | 4 ++-- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/airflow/dags/macros.py b/airflow/dags/macros.py index 5fa77057b8..b414a2d076 100644 --- a/airflow/dags/macros.py +++ b/airflow/dags/macros.py @@ -145,6 +145,6 @@ def prefix_bucket(bucket): "sql_airtable_mapping": airtable_mapping_generate_sql, "is_development": is_development_macro, "image_tag": lambda: "development" if is_development() else "latest", - "env_var": lambda key: os.getenv(key), + "env_var": os.getenv, "prefix_bucket": prefix_bucket, } diff --git a/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml b/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml index 29cd97f901..5b7008fe47 100644 --- a/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml +++ b/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml @@ -19,11 +19,13 @@ cluster_name: data-infra-apps namespace: airflow-jobs env_vars: + CALITP_BUCKET__DBT_ARTIFACTS: "{{ env_var('CALITP_BUCKET__DBT_ARTIFACTS') }}" BIGQUERY_KEYFILE_LOCATION: /secrets/jobs-data/service_account.json DBT_PROJECT_DIR: /app DBT_PROFILE_DIR: /app - DBT_TARGET: prod_service_account + DBT_TARGET: "{{ env_var('DBT_TARGET') }}" NETLIFY_SITE_ID: cal-itp-dbt-docs + secrets: - deploy_type: volume deploy_target: /secrets/jobs-data/ diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index f24da35290..fc3eeecfa4 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -80,6 +80,7 @@ x-airflow-common: GOOGLE_CLOUD_PROJECT: cal-itp-data-infra CALITP_BUCKET__AIRTABLE: "gs://test-calitp-airtable" + CALITP_BUCKET__DBT_ARTIFACTS: "gs://test-calitp-dbt-artifacts" CALITP_BUCKET__GTFS_RT_RAW: "gs://test-calitp-gtfs-rt-raw" CALITP_BUCKET__GTFS_RT_PARSED: "gs://test-calitp-gtfs-rt-parsed" CALITP_BUCKET__GTFS_RT_VALIDATION: "gs://test-calitp-gtfs-rt-validation" @@ -87,6 +88,8 @@ x-airflow-common: CALITP_BUCKET__GTFS_SCHEDULE_VALIDATION: "gs://test-calitp-gtfs-schedule-validation" CALITP_BUCKET__GTFS_SCHEDULE_UNZIPPED: "gs://test-calitp-gtfs-schedule-unzipped" + DBT_TARGET: staging_service_account + # TODO: this can be removed once we've confirmed it's no longer in Airtable GRAAS_SERVER_URL: $GRAAS_SERVER_URL diff --git a/warehouse/scripts/run_and_upload.py b/warehouse/scripts/run_and_upload.py index 0d9ad9fed6..0ba975fe1e 100755 --- a/warehouse/scripts/run_and_upload.py +++ b/warehouse/scripts/run_and_upload.py @@ -8,7 +8,7 @@ import gcsfs import typer -BUCKET = "calitp-dbt-artifacts" +BUCKET = os.environ["CALITP_BUCKET__DBT_ARTIFACTS"] artifacts = map( Path, ["index.html", "catalog.json", "manifest.json", "run_results.json"] @@ -87,7 +87,7 @@ def get_command(*args) -> List[str]: _from = str(project_dir / Path("target") / artifact) if save_artifacts: - _to = f"gs://{BUCKET}/latest/{artifact}" + _to = f"{BUCKET}/latest/{artifact}" typer.echo(f"writing {_from} to {_to}") fs.put(lpath=_from, rpath=_to) else: From 8cab9d9df6d3d77c6718b00471d5b083684c2fb8 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Fri, 26 Aug 2022 13:12:25 -0400 Subject: [PATCH 07/12] create weekly task to run publish california_open_data --- airflow/dags/publish_open_data/METADATA.yml | 19 +++++++ .../publish_california_open_data.yml | 50 +++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 airflow/dags/publish_open_data/METADATA.yml create mode 100644 airflow/dags/publish_open_data/publish_california_open_data.yml diff --git a/airflow/dags/publish_open_data/METADATA.yml b/airflow/dags/publish_open_data/METADATA.yml new file mode 100644 index 0000000000..3b7935356c --- /dev/null +++ b/airflow/dags/publish_open_data/METADATA.yml @@ -0,0 +1,19 @@ +description: "Publishes data to various open data portals" +schedule_interval: "0 0 * * 1" +tags: + - all_gusty_features +default_args: + owner: airflow + depends_on_past: False + start_date: !days_ago 1 + email: + - "andrew.v@jarv.us" + - "eric.dasmalchi@dot.ca.gov" + - "laurie.m@jarv.us" + email_on_failure: True + email_on_retry: False + retries: 1 + retry_delay: !timedelta 'minutes: 2' + concurrency: 50 + #sla: !timedelta 'hours: 2' +latest_only: True diff --git a/airflow/dags/publish_open_data/publish_california_open_data.yml b/airflow/dags/publish_open_data/publish_california_open_data.yml new file mode 100644 index 0000000000..f8297336f7 --- /dev/null +++ b/airflow/dags/publish_open_data/publish_california_open_data.yml @@ -0,0 +1,50 @@ +operator: 'operators.PodOperator' +name: 'publish-california-open-data' +image: 'ghcr.io/cal-itp/data-infra/warehouse:{{ image_tag() }}' + +cmds: + - python3 +arguments: + - '/app/scripts/run_and_upload.py' + - 'publish-exposure' + - 'california_open_data' + - '--deploy' + - '--sync-metabase' + - '--project' + - 'cal-itp-data-infra' + - '--bucket' + - 'gs://calitp-publish/' + - '--manifest' + - "{{ env_var('CALITP_BUCKET__DBT_ARTIFACTS') }}/latest/manifest.json" + +is_delete_operator_pod: true +get_logs: true +is_gke: true +pod_location: us-west1 +cluster_name: data-infra-apps +namespace: airflow-jobs + +env_vars: + BIGQUERY_KEYFILE_LOCATION: /secrets/jobs-data/service_account.json + CALITP_BUCKET__DBT_ARTIFACTS: "{{ env_var('CALITP_BUCKET__DBT_ARTIFACTS') }}" + +secrets: + - deploy_type: volume + deploy_target: /secrets/jobs-data/ + secret: jobs-data + key: service-account.json + +tolerations: + - key: pod-role + operator: Equal + value: computetask + effect: NoSchedule +affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: pod-role + operator: In + values: + - computetask From 0f57fd69f2cf31fe900103aae29cce785a517298 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Fri, 26 Aug 2022 13:34:34 -0400 Subject: [PATCH 08/12] allow manifest to be in gcs --- warehouse/scripts/publish.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/warehouse/scripts/publish.py b/warehouse/scripts/publish.py index a571e25d28..d266849aa4 100644 --- a/warehouse/scripts/publish.py +++ b/warehouse/scripts/publish.py @@ -511,6 +511,7 @@ def publish_exposure( bucket: str = typer.Option( "gs://test-calitp-publish", help="The bucket in which artifacts are persisted." ), + manifest: str = "./target/manifest.json", dry_run: bool = typer.Option(False, help="If True, skips writing out any data."), deploy: bool = typer.Option( False, help="If True, actually deploy to external systems." @@ -526,10 +527,17 @@ def publish_exposure( ), "cannot deploy from the staging project!" assert not bucket.startswith("gs://test-"), "cannot deploy with a test bucket!" - with open("./target/manifest.json") as f: - manifest = Manifest(**json.load(f)) + if manifest.startswith("gs://"): + typer.secho(f"fetching manifest from {manifest}", fg=typer.colors.GREEN) + fs = gcsfs.GCSFileSystem() + with fs.open(manifest) as f: + actual_manifest = Manifest(**json.load(f)) - exposure = manifest.exposures[f"exposure.calitp_warehouse.{exposure.value}"] + else: + with open(manifest) as f: + actual_manifest = Manifest(**json.load(f)) + + exposure = actual_manifest.exposures[f"exposure.calitp_warehouse.{exposure.value}"] _publish_exposure( project=project, From e85c714ad063cb8e4ef8dd9e837970bc3032e490 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Fri, 26 Aug 2022 14:52:37 -0400 Subject: [PATCH 09/12] get this actually working... --- .../publish_california_open_data.yml | 12 +++++------- airflow/docker-compose.yaml | 1 + .../models/gtfs_schedule_latest_only/stop_times.sql | 1 - warehouse/scripts/publish.py | 2 +- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/airflow/dags/publish_open_data/publish_california_open_data.yml b/airflow/dags/publish_open_data/publish_california_open_data.yml index f8297336f7..b3b267f1cc 100644 --- a/airflow/dags/publish_open_data/publish_california_open_data.yml +++ b/airflow/dags/publish_open_data/publish_california_open_data.yml @@ -5,15 +5,14 @@ image: 'ghcr.io/cal-itp/data-infra/warehouse:{{ image_tag() }}' cmds: - python3 arguments: - - '/app/scripts/run_and_upload.py' + - '/app/scripts/publish.py' - 'publish-exposure' - 'california_open_data' - - '--deploy' - - '--sync-metabase' + - '{% if is_development() %}--no-deploy{% else %}--deploy{% endif %}' - '--project' - - 'cal-itp-data-infra' + - '{{ get_project_id() }}' - '--bucket' - - 'gs://calitp-publish/' + - "{{ env_var('CALITP_BUCKET__PUBLISH') }}" - '--manifest' - "{{ env_var('CALITP_BUCKET__DBT_ARTIFACTS') }}/latest/manifest.json" @@ -25,8 +24,7 @@ cluster_name: data-infra-apps namespace: airflow-jobs env_vars: - BIGQUERY_KEYFILE_LOCATION: /secrets/jobs-data/service_account.json - CALITP_BUCKET__DBT_ARTIFACTS: "{{ env_var('CALITP_BUCKET__DBT_ARTIFACTS') }}" + GOOGLE_APPLICATION_CREDENTIALS: /secrets/jobs-data/service_account.json secrets: - deploy_type: volume diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index fc3eeecfa4..9f6d6471f0 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -87,6 +87,7 @@ x-airflow-common: CALITP_BUCKET__GTFS_SCHEDULE_RAW: "gs://test-calitp-gtfs-schedule-raw" CALITP_BUCKET__GTFS_SCHEDULE_VALIDATION: "gs://test-calitp-gtfs-schedule-validation" CALITP_BUCKET__GTFS_SCHEDULE_UNZIPPED: "gs://test-calitp-gtfs-schedule-unzipped" + CALITP_BUCKET__PUBLISH: "gs://test-calitp-publish" DBT_TARGET: staging_service_account diff --git a/warehouse/models/gtfs_schedule_latest_only/stop_times.sql b/warehouse/models/gtfs_schedule_latest_only/stop_times.sql index cfa0452ef7..0804f76723 100644 --- a/warehouse/models/gtfs_schedule_latest_only/stop_times.sql +++ b/warehouse/models/gtfs_schedule_latest_only/stop_times.sql @@ -10,4 +10,3 @@ stop_times AS ( ) SELECT * FROM stop_times -LIMIT 1040900 diff --git a/warehouse/scripts/publish.py b/warehouse/scripts/publish.py index d266849aa4..224d2b5bcf 100644 --- a/warehouse/scripts/publish.py +++ b/warehouse/scripts/publish.py @@ -529,7 +529,7 @@ def publish_exposure( if manifest.startswith("gs://"): typer.secho(f"fetching manifest from {manifest}", fg=typer.colors.GREEN) - fs = gcsfs.GCSFileSystem() + fs = gcsfs.GCSFileSystem(project=project) with fs.open(manifest) as f: actual_manifest = Manifest(**json.load(f)) From cc1c584c084781cc37a114fb06d251489c4219ba Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Fri, 26 Aug 2022 15:36:46 -0400 Subject: [PATCH 10/12] dockerignore --- warehouse/.dockerignore | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 warehouse/.dockerignore diff --git a/warehouse/.dockerignore b/warehouse/.dockerignore new file mode 100644 index 0000000000..09a3884c6d --- /dev/null +++ b/warehouse/.dockerignore @@ -0,0 +1,5 @@ +.user.yml + +target/ +dbt_packages/ +logs/ From 68e336245c62216bf34e4311d7957426d869b3ae Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Fri, 26 Aug 2022 15:50:24 -0400 Subject: [PATCH 11/12] clean up names, add resource requests, make work in pod operator --- .../publish_california_open_data.yml | 9 ++-- .../dbt_run_and_upload_artifacts.yml | 6 +++ airflow/dags/transform_warehouse/dbt_test.yml | 6 +++ warehouse/scripts/publish.py | 48 ++++++------------- 4 files changed, 33 insertions(+), 36 deletions(-) diff --git a/airflow/dags/publish_open_data/publish_california_open_data.yml b/airflow/dags/publish_open_data/publish_california_open_data.yml index b3b267f1cc..ed0c0ae389 100644 --- a/airflow/dags/publish_open_data/publish_california_open_data.yml +++ b/airflow/dags/publish_open_data/publish_california_open_data.yml @@ -8,9 +8,7 @@ arguments: - '/app/scripts/publish.py' - 'publish-exposure' - 'california_open_data' - - '{% if is_development() %}--no-deploy{% else %}--deploy{% endif %}' - - '--project' - - '{{ get_project_id() }}' + - '{% if is_development() %}--no-publish{% else %}--publish{% endif %}' - '--bucket' - "{{ env_var('CALITP_BUCKET__PUBLISH') }}" - '--manifest' @@ -32,11 +30,16 @@ secrets: secret: jobs-data key: service-account.json +resources: + request_memory: 2.0Gi + request_cpu: 1 + tolerations: - key: pod-role operator: Equal value: computetask effect: NoSchedule + affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: diff --git a/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml b/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml index 5b7008fe47..2e02b575a9 100644 --- a/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml +++ b/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml @@ -6,6 +6,7 @@ cmds: - python3 arguments: - '/app/scripts/run_and_upload.py' + - '--no-dbt-run' - '--dbt-docs' - '--save-artifacts' - '--deploy-docs' @@ -44,11 +45,16 @@ secrets: secret: jobs-data key: netlify-auth-token +resources: + request_memory: 2.0Gi + request_cpu: 1 + tolerations: - key: pod-role operator: Equal value: computetask effect: NoSchedule + affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: diff --git a/airflow/dags/transform_warehouse/dbt_test.yml b/airflow/dags/transform_warehouse/dbt_test.yml index 36b4a09fe6..c197b5a145 100644 --- a/airflow/dags/transform_warehouse/dbt_test.yml +++ b/airflow/dags/transform_warehouse/dbt_test.yml @@ -25,17 +25,23 @@ env_vars: DBT_PROJECT_DIR: /app DBT_PROFILE_DIR: /app DBT_TARGET: prod_service_account + secrets: - deploy_type: volume deploy_target: /secrets/jobs-data/ secret: jobs-data key: service-account.json +resources: + request_memory: 2.0Gi + request_cpu: 1 + tolerations: - key: pod-role operator: Equal value: computetask effect: NoSchedule + affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: diff --git a/warehouse/scripts/publish.py b/warehouse/scripts/publish.py index 224d2b5bcf..73e5584197 100644 --- a/warehouse/scripts/publish.py +++ b/warehouse/scripts/publish.py @@ -159,9 +159,7 @@ def ckan_request(action: str, data: Dict) -> Response: typer.secho(f"patched resource {resource_id}", fg=typer.colors.MAGENTA) -def _publish_exposure( - project: str, bucket: str, exposure: Exposure, dry_run: bool, deploy: bool -): +def _publish_exposure(bucket: str, exposure: Exposure, dry_run: bool, publish: bool): for destination in exposure.meta.destinations: with tempfile.TemporaryDirectory() as tmpdir: if isinstance(destination, CkanDestination): @@ -223,7 +221,7 @@ def _publish_exposure( fname = destination.filename(model_name) fsize = os.path.getsize(fpath) - if deploy: + if publish: typer.secho(upload_msg, fg=typer.colors.GREEN) with open(fpath, "rb") as fp: upload_to_ckan( @@ -236,7 +234,7 @@ def _publish_exposure( ) else: typer.secho( - f"would be {upload_msg} if --deploy", + f"would be {upload_msg} if --publish", fg=typer.colors.MAGENTA, ) @@ -247,7 +245,7 @@ def _publish_exposure( geojsonl_fpath = os.path.join(tmpdir, f"{node.name}.geojsonl") - client = bigquery.Client(project=project) + client = bigquery.Client() typer.secho(f"querying {node.schema_table}") # TODO: this is not great but we have to work around how BigQuery removes overlapping line segments df = client.query( @@ -318,16 +316,6 @@ def _publish_exposure( raise NotImplementedError -with open("./target/manifest.json") as f: - ExistingExposure = enum.Enum( - "ExistingExposure", - { - exposure.name: exposure.name - for exposure in Manifest(**json.load(f)).exposures.values() - }, - ) - - # once https://github.com/samuelcolvin/pydantic/pull/2745 is merged, we don't need this class ListOfStrings(BaseModel): __root__: List[str] @@ -403,14 +391,14 @@ class DictionaryRow(BaseModel): @app.command() def generate_exposure_documentation( - exposure: ExistingExposure, + exposure: str, metadata_output: Path = "./metadata.csv", dictionary_output: Path = "./dictionary.csv", ) -> None: with open("./target/manifest.json") as f: manifest = Manifest(**json.load(f)) - exposure = manifest.exposures[f"exposure.calitp_warehouse.{exposure.value}"] + exposure = manifest.exposures[f"exposure.calitp_warehouse.{exposure}"] typer.secho( f"writing out {metadata_output} and {dictionary_output}", @@ -506,45 +494,39 @@ def generate_exposure_documentation( @app.command() def publish_exposure( - exposure: ExistingExposure, - project: str = typer.Option("cal-itp-data-infra-staging"), + exposure: str, bucket: str = typer.Option( "gs://test-calitp-publish", help="The bucket in which artifacts are persisted." ), manifest: str = "./target/manifest.json", dry_run: bool = typer.Option(False, help="If True, skips writing out any data."), - deploy: bool = typer.Option( - False, help="If True, actually deploy to external systems." + publish: bool = typer.Option( + False, help="If True, actually publish to external systems." ), ) -> None: """ Only publish one exposure, by name. """ - if deploy: - assert not dry_run, "cannot deploy during a dry run!" - assert not project.endswith( - "-staging" - ), "cannot deploy from the staging project!" - assert not bucket.startswith("gs://test-"), "cannot deploy with a test bucket!" + if publish: + assert not dry_run, "cannot publish during a dry run!" + assert not bucket.startswith("gs://test-"), "cannot publish with a test bucket!" if manifest.startswith("gs://"): typer.secho(f"fetching manifest from {manifest}", fg=typer.colors.GREEN) - fs = gcsfs.GCSFileSystem(project=project) + fs = gcsfs.GCSFileSystem() with fs.open(manifest) as f: actual_manifest = Manifest(**json.load(f)) - else: with open(manifest) as f: actual_manifest = Manifest(**json.load(f)) - exposure = actual_manifest.exposures[f"exposure.calitp_warehouse.{exposure.value}"] + exposure = actual_manifest.exposures[f"exposure.calitp_warehouse.{exposure}"] _publish_exposure( - project=project, bucket=bucket, exposure=exposure, dry_run=dry_run, - deploy=deploy, + publish=publish, ) From 194ad1e1253d18fca71edbf3770ec8366cc91223 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Mon, 29 Aug 2022 10:44:16 -0400 Subject: [PATCH 12/12] address PR comments --- .../dbt_run_and_upload_artifacts.yml | 1 - airflow/dags/transform_warehouse/dbt_test.yml | 2 +- .../dbt_run_and_upload_artifacts.yml | 2 +- .../dbt_test.yml | 2 +- docs/publishing/sections/8_ckan.md | 25 +++++++++++-------- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml b/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml index 2e02b575a9..2c8c8c9709 100644 --- a/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml +++ b/airflow/dags/transform_warehouse/dbt_run_and_upload_artifacts.yml @@ -6,7 +6,6 @@ cmds: - python3 arguments: - '/app/scripts/run_and_upload.py' - - '--no-dbt-run' - '--dbt-docs' - '--save-artifacts' - '--deploy-docs' diff --git a/airflow/dags/transform_warehouse/dbt_test.yml b/airflow/dags/transform_warehouse/dbt_test.yml index c197b5a145..9b8f7392af 100644 --- a/airflow/dags/transform_warehouse/dbt_test.yml +++ b/airflow/dags/transform_warehouse/dbt_test.yml @@ -24,7 +24,7 @@ env_vars: BIGQUERY_KEYFILE_LOCATION: /secrets/jobs-data/service_account.json DBT_PROJECT_DIR: /app DBT_PROFILE_DIR: /app - DBT_TARGET: prod_service_account + DBT_TARGET: "{{ env_var('DBT_TARGET') }}" secrets: - deploy_type: volume diff --git a/airflow/dags/transform_warehouse_full_refresh/dbt_run_and_upload_artifacts.yml b/airflow/dags/transform_warehouse_full_refresh/dbt_run_and_upload_artifacts.yml index 0e27046bd5..84fd530796 100644 --- a/airflow/dags/transform_warehouse_full_refresh/dbt_run_and_upload_artifacts.yml +++ b/airflow/dags/transform_warehouse_full_refresh/dbt_run_and_upload_artifacts.yml @@ -23,7 +23,7 @@ env_vars: BIGQUERY_KEYFILE_LOCATION: /secrets/jobs-data/service_account.json DBT_PROJECT_DIR: /app DBT_PROFILE_DIR: /app - DBT_TARGET: prod_service_account + DBT_TARGET: "{{ env_var('DBT_TARGET') }}" NETLIFY_SITE_ID: cal-itp-dbt-docs secrets: - deploy_type: volume diff --git a/airflow/dags/transform_warehouse_full_refresh/dbt_test.yml b/airflow/dags/transform_warehouse_full_refresh/dbt_test.yml index 7596819254..fd3a2c28a2 100644 --- a/airflow/dags/transform_warehouse_full_refresh/dbt_test.yml +++ b/airflow/dags/transform_warehouse_full_refresh/dbt_test.yml @@ -23,7 +23,7 @@ env_vars: BIGQUERY_KEYFILE_LOCATION: /secrets/jobs-data/service_account.json DBT_PROJECT_DIR: /app DBT_PROFILE_DIR: /app - DBT_TARGET: prod_service_account + DBT_TARGET: "{{ env_var('DBT_TARGET') }}" secrets: - deploy_type: volume deploy_target: /secrets/jobs-data/ diff --git a/docs/publishing/sections/8_ckan.md b/docs/publishing/sections/8_ckan.md index cb60a68219..1261caa0bd 100644 --- a/docs/publishing/sections/8_ckan.md +++ b/docs/publishing/sections/8_ckan.md @@ -56,15 +56,20 @@ update the `meta` field to map the dbt models to the appropriate UUIDs. An example from the latest-only GTFS data exposure. ```yaml -meta: - destinations: - - type: ckan - bucket: gs://calitp-publish - format: csv - url: https://data.ca.gov/api/3/action/resource_update - ids: - agency: e8f9d49e-2bb6-400b-b01f-28bc2e0e7df2 - routes: c6bbb637-988f-431c-8444-aef7277297f8 + meta: + methodology: | + Cal-ITP collects the GTFS feeds from a statewide list [link] every night and aggegrates it into a statewide table + for analysis purposes only. Do not use for trip planner ingestation, rather is meant to be used for statewide + analytics and other use cases. Note: These data may or may or may not have passed GTFS-Validation. + coordinate_system_espg: "EPSG:4326" + destinations: + - type: ckan + bucket: gs://calitp-publish + format: csv + url: https://data.ca.gov + ids: + agency: e8f9d49e-2bb6-400b-b01f-28bc2e0e7df2 + routes: c6bbb637-988f-431c-8444-aef7277297f8 ``` ### Publish the data! @@ -79,7 +84,7 @@ poetry run python scripts/publish.py publish-exposure california_open_data --dry Example production deployment: ```bash -poetry run python scripts/publish.py publish-exposure california_open_data --project=cal-itp-data-infra --bucket="gs://calitp-publish" --deploy +poetry run python scripts/publish.py publish-exposure california_open_data --project=cal-itp-data-infra --bucket="gs://calitp-publish" --publish ```