diff --git a/dcpy/connectors/edm/recipes.py b/dcpy/connectors/edm/recipes.py index 5f7420966..211234e83 100644 --- a/dcpy/connectors/edm/recipes.py +++ b/dcpy/connectors/edm/recipes.py @@ -4,6 +4,7 @@ import os import pandas as pd from pathlib import Path +from pyarrow import parquet import shutil from tempfile import TemporaryDirectory from typing import Callable @@ -137,6 +138,16 @@ def get_config(name: str, version="latest") -> library.Config | ingest.Config: return ingest.Config(**config) +def get_parquet_metadata(id: str, version="latest") -> parquet.FileMetaData: + s3_fs = s3.pyarrow_fs() + ds = parquet.ParquetDataset( + f"{BUCKET}/{DATASET_FOLDER}/{id}/{version}/{id}.parquet", filesystem=s3_fs + ) + + assert len(ds.fragments) == 1, "recipes does not support multi-fragment datasets" + return ds.fragments[0].metadata + + def get_latest_version(name: str) -> str: """Retrieve a recipe config from s3.""" return get_config(name).dataset.version diff --git a/dcpy/data/compare.py b/dcpy/data/compare.py index c854e8b8c..5b8274a1c 100644 --- a/dcpy/data/compare.py +++ b/dcpy/data/compare.py @@ -150,7 +150,9 @@ def spatial_query(column: str) -> str: SELECT {left_keys}, st_orderingequals({lc}, {rc}) AS "ordering_equal", - st_equals({lc}, {rc}) AS "spatially_equal" + st_equals({lc}, {rc}) AS "spatially_equal", + st_geometrytype({lc}) AS "left_geom_type", + st_geometrytype({rc}) AS "right_geom_type" FROM {left} AS "left" INNER JOIN {right} AS "right" ON {on} @@ -165,7 +167,14 @@ def spatial_query(column: str) -> str: if (column in left_geom_columns) and (column in right_geom_columns): comp_df = client.execute_select_query(spatial_query(column)) comp_df = comp_df.set_index(key_columns) - comp_df.columns = pd.Index(["ordering_equal", "spatially_equal"]) + comp_df.columns = pd.Index( + [ + "ordering_equal", + "spatially_equal", + "left_geom_type", + "right_geom_type", + ] + ) elif (column not in left_geom_columns) and (column not in right_geom_columns): comp_df = client.execute_select_query(query(column)) diff --git a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml index 881a5bb94..246844885 100644 --- a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml +++ b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml @@ -27,33 +27,3 @@ ingestion: insert_behavior: error missing_key_behavior: error mode: update_column - -library_dataset: - name: dcp_pop_acs2010_housing - version: "" - acl: public-read - source: - script: - name: excel - path: https://nyc3.digitaloceanspaces.com/edm-recipes/inbox/dcp_pop_acs2010/{{ version }}/dcp_pop_acs.xlsx - sheet_name: Housing0610 - geometry: - SRS: null - type: NONE - - destination: - geometry: - SRS: null - type: NONE - fields: [] - sql: null - - info: - description: | - ## 2010 ACS file from Population - This file is produced internally by the Population division. 2010 version is used as a reference dataset - for the latest ACS data, and occasionally is modified so these different subsections are archived as their - own recipe datasets so that they can easily be updated individually - - url: null - dependents: [] diff --git a/dcpy/lifecycle/scripts/validate_ingest.py b/dcpy/lifecycle/scripts/validate_ingest.py index 90828ae98..3992a3e7b 100644 --- a/dcpy/lifecycle/scripts/validate_ingest.py +++ b/dcpy/lifecycle/scripts/validate_ingest.py @@ -3,10 +3,12 @@ import shutil import typer from typing import Literal +import yaml from dcpy.utils import postgres from dcpy.utils.collections import indented_report from dcpy.models.data import comparison +from dcpy.models.base import SortedSerializedBase, YamlWriter from dcpy.data import compare from dcpy.connectors.edm import recipes from dcpy.lifecycle.ingest.run import TMP_DIR, run as run_ingest @@ -17,6 +19,67 @@ SCHEMA = build_metadata.build_name(os.environ.get("BUILD_NAME")) +class Converter(SortedSerializedBase, YamlWriter): + _exclude_falsey_values: bool = False + _exclude_none: bool = False + _head_sort_order: list[str] = [ + "id", + "acl", + "ingestion", + "columns", + "library_dataset", + ] + + id: str + acl: str + ingestion: dict + columns: list = [] + library_dataset: dict + + +def convert_template(dataset: str): + library_path = ( + Path(__file__).parent.parent.parent / "library" / "templates" / f"{dataset}.yml" + ) + ingest_path = ( + Path(__file__).parent.parent.parent + / "lifecycle" + / "ingest" + / "templates" + / f"{dataset}.yml" + ) + with open(library_path) as library_file: + library_template = yaml.safe_load(library_file) + converter = Converter( + id=library_template["dataset"]["name"], + acl=library_template["dataset"]["acl"], + ingestion={ + "source": { + "one_of": [ + {"type": "local_file", "path": ""}, + {"type": "file_download", "url": ""}, + {"type": "api", "endpoint": "", "format": ""}, + {"type": "s3", "bucket": "", "key": ""}, + { + "type": "socrata", + "org": "", + "uid": "", + "format": "", + }, + {"type": "edm_publishing_gis_dataset", "name": ""}, + ] + }, + "file_format": { + "type": "csv, json, xlsx, shapefile, geojson, geodatabase", + "crs": "EPSG:2263", + }, + "processing_steps": [], + }, + library_dataset=library_template["dataset"], + ) + converter.write_to_yaml(ingest_path) + + def library_archive(dataset: str, version: str | None = None, file_type="pgdump"): # BEWARE: once you import library, parquet file writing fails # Something to do with gdal's interaction with parquet file driver @@ -104,6 +167,11 @@ def compare_recipes_in_postgres( app = typer.Typer() +@app.command("convert") +def _convert(dataset: str = typer.Argument()): + convert_template(dataset) + + @app.command("run_single") def run_single( tool: str = typer.Argument(), diff --git a/dcpy/models/base.py b/dcpy/models/base.py index e85256656..64485cb2d 100644 --- a/dcpy/models/base.py +++ b/dcpy/models/base.py @@ -80,6 +80,8 @@ def _model_dump_ordered(self, handler): class YamlWriter(BaseModel): + _exclude_none: bool = True + class _YamlTopLevelSpacesDumper(yaml.SafeDumper): """YAML serializer that will insert lines between top-level entries, which is nice in longer files.""" @@ -106,7 +108,7 @@ def str_presenter(dumper, data): with open(path, "w", encoding="utf8") as f: f.write( yaml.dump( - self.model_dump(exclude_none=True), + self.model_dump(exclude_none=self._exclude_none), sort_keys=False, default_flow_style=False, Dumper=YamlWriter._YamlTopLevelSpacesDumper, diff --git a/dcpy/models/lifecycle/ingest.py b/dcpy/models/lifecycle/ingest.py index 9eeea4da2..ecff52135 100644 --- a/dcpy/models/lifecycle/ingest.py +++ b/dcpy/models/lifecycle/ingest.py @@ -7,7 +7,7 @@ from dcpy.utils.metadata import RunDetails from dcpy.models.connectors.edm import recipes, publishing from dcpy.models.connectors import web, socrata -from dcpy.models import library, file +from dcpy.models import file from dcpy.models.base import SortedSerializedBase @@ -88,9 +88,6 @@ class Template(BaseModel, extra="forbid"): ingestion: Ingestion columns: list[Column] = [] - ## this is the original library template, included just for reference while we build out our new templates - library_dataset: library.DatasetDefinition | None = None - @property def has_geom(self): match self.ingestion.file_format: diff --git a/dcpy/utils/s3.py b/dcpy/utils/s3.py index f627606a1..69fec72a7 100644 --- a/dcpy/utils/s3.py +++ b/dcpy/utils/s3.py @@ -6,6 +6,7 @@ from io import BytesIO import os from pathlib import Path +from pyarrow import fs import pytz from rich.progress import ( BarColumn, @@ -502,6 +503,14 @@ def get_file_as_text(bucket: str, path: str) -> str: raise Exception(f"No body found for file '{path}' in bucket '{bucket}'.") +def pyarrow_fs() -> fs.S3FileSystem: + return fs.S3FileSystem( + access_key=os.environ["AWS_ACCESS_KEY_ID"], + secret_key=os.environ["AWS_SECRET_ACCESS_KEY"], + endpoint_override=os.environ.get("AWS_S3_ENDPOINT"), + ) + + app = typer.Typer(add_completion=False)