Skip to content

Commit

Permalink
clean up names, add resource requests, make work in pod operator
Browse files Browse the repository at this point in the history
  • Loading branch information
atvaccaro committed Aug 26, 2022
1 parent cc1c584 commit 68e3362
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ arguments:
- '/app/scripts/publish.py'
- 'publish-exposure'
- 'california_open_data'
- '{% if is_development() %}--no-deploy{% else %}--deploy{% endif %}'
- '--project'
- '{{ get_project_id() }}'
- '{% if is_development() %}--no-publish{% else %}--publish{% endif %}'
- '--bucket'
- "{{ env_var('CALITP_BUCKET__PUBLISH') }}"
- '--manifest'
Expand All @@ -32,11 +30,16 @@ secrets:
secret: jobs-data
key: service-account.json

resources:
request_memory: 2.0Gi
request_cpu: 1

tolerations:
- key: pod-role
operator: Equal
value: computetask
effect: NoSchedule

affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ cmds:
- python3
arguments:
- '/app/scripts/run_and_upload.py'
- '--no-dbt-run'
- '--dbt-docs'
- '--save-artifacts'
- '--deploy-docs'
Expand Down Expand Up @@ -44,11 +45,16 @@ secrets:
secret: jobs-data
key: netlify-auth-token

resources:
request_memory: 2.0Gi
request_cpu: 1

tolerations:
- key: pod-role
operator: Equal
value: computetask
effect: NoSchedule

affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
Expand Down
6 changes: 6 additions & 0 deletions airflow/dags/transform_warehouse/dbt_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,23 @@ env_vars:
DBT_PROJECT_DIR: /app
DBT_PROFILE_DIR: /app
DBT_TARGET: prod_service_account

secrets:
- deploy_type: volume
deploy_target: /secrets/jobs-data/
secret: jobs-data
key: service-account.json

resources:
request_memory: 2.0Gi
request_cpu: 1

tolerations:
- key: pod-role
operator: Equal
value: computetask
effect: NoSchedule

affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
Expand Down
48 changes: 15 additions & 33 deletions warehouse/scripts/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ def ckan_request(action: str, data: Dict) -> Response:
typer.secho(f"patched resource {resource_id}", fg=typer.colors.MAGENTA)


def _publish_exposure(
project: str, bucket: str, exposure: Exposure, dry_run: bool, deploy: bool
):
def _publish_exposure(bucket: str, exposure: Exposure, dry_run: bool, publish: bool):
for destination in exposure.meta.destinations:
with tempfile.TemporaryDirectory() as tmpdir:
if isinstance(destination, CkanDestination):
Expand Down Expand Up @@ -223,7 +221,7 @@ def _publish_exposure(
fname = destination.filename(model_name)
fsize = os.path.getsize(fpath)

if deploy:
if publish:
typer.secho(upload_msg, fg=typer.colors.GREEN)
with open(fpath, "rb") as fp:
upload_to_ckan(
Expand All @@ -236,7 +234,7 @@ def _publish_exposure(
)
else:
typer.secho(
f"would be {upload_msg} if --deploy",
f"would be {upload_msg} if --publish",
fg=typer.colors.MAGENTA,
)

Expand All @@ -247,7 +245,7 @@ def _publish_exposure(

geojsonl_fpath = os.path.join(tmpdir, f"{node.name}.geojsonl")

client = bigquery.Client(project=project)
client = bigquery.Client()
typer.secho(f"querying {node.schema_table}")
# TODO: this is not great but we have to work around how BigQuery removes overlapping line segments
df = client.query(
Expand Down Expand Up @@ -318,16 +316,6 @@ def _publish_exposure(
raise NotImplementedError


with open("./target/manifest.json") as f:
ExistingExposure = enum.Enum(
"ExistingExposure",
{
exposure.name: exposure.name
for exposure in Manifest(**json.load(f)).exposures.values()
},
)


# once https://github.com/samuelcolvin/pydantic/pull/2745 is merged, we don't need this
class ListOfStrings(BaseModel):
__root__: List[str]
Expand Down Expand Up @@ -403,14 +391,14 @@ class DictionaryRow(BaseModel):

@app.command()
def generate_exposure_documentation(
exposure: ExistingExposure,
exposure: str,
metadata_output: Path = "./metadata.csv",
dictionary_output: Path = "./dictionary.csv",
) -> None:
with open("./target/manifest.json") as f:
manifest = Manifest(**json.load(f))

exposure = manifest.exposures[f"exposure.calitp_warehouse.{exposure.value}"]
exposure = manifest.exposures[f"exposure.calitp_warehouse.{exposure}"]

typer.secho(
f"writing out {metadata_output} and {dictionary_output}",
Expand Down Expand Up @@ -506,45 +494,39 @@ def generate_exposure_documentation(

@app.command()
def publish_exposure(
exposure: ExistingExposure,
project: str = typer.Option("cal-itp-data-infra-staging"),
exposure: str,
bucket: str = typer.Option(
"gs://test-calitp-publish", help="The bucket in which artifacts are persisted."
),
manifest: str = "./target/manifest.json",
dry_run: bool = typer.Option(False, help="If True, skips writing out any data."),
deploy: bool = typer.Option(
False, help="If True, actually deploy to external systems."
publish: bool = typer.Option(
False, help="If True, actually publish to external systems."
),
) -> None:
"""
Only publish one exposure, by name.
"""
if deploy:
assert not dry_run, "cannot deploy during a dry run!"
assert not project.endswith(
"-staging"
), "cannot deploy from the staging project!"
assert not bucket.startswith("gs://test-"), "cannot deploy with a test bucket!"
if publish:
assert not dry_run, "cannot publish during a dry run!"
assert not bucket.startswith("gs://test-"), "cannot publish with a test bucket!"

if manifest.startswith("gs://"):
typer.secho(f"fetching manifest from {manifest}", fg=typer.colors.GREEN)
fs = gcsfs.GCSFileSystem(project=project)
fs = gcsfs.GCSFileSystem()
with fs.open(manifest) as f:
actual_manifest = Manifest(**json.load(f))

else:
with open(manifest) as f:
actual_manifest = Manifest(**json.load(f))

exposure = actual_manifest.exposures[f"exposure.calitp_warehouse.{exposure.value}"]
exposure = actual_manifest.exposures[f"exposure.calitp_warehouse.{exposure}"]

_publish_exposure(
project=project,
bucket=bucket,
exposure=exposure,
dry_run=dry_run,
deploy=deploy,
publish=publish,
)


Expand Down

0 comments on commit 68e3362

Please sign in to comment.