Skip to content

Commit

Permalink
Gtfs schedule unzip v2 (#1696)
Browse files Browse the repository at this point in the history
* gtfs loader v2 wip

* gtfs unzipper v2: semi-working WIP -- can unzip at least one zipfile

* address initial review comments

* bump calitp version

* gtfs unzipper v2: working version with required functionality

* update calitp and make the downloader run with it

* gtfs unzipper v2: get working in airflow; use logging

* rename to distinguish zipfile from extracted files within zipfile

* resolve reviewer comments

* gtfs unzipper v2: refactor to raise exceptions on unparseable zips

* gtfs unzipper: further simplify exception handling

* final tweaks -- refactor of checking for invalid zip structure, tighten up processing of valid files

* comment typos/clarifications

Co-authored-by: Andrew Vaccaro <[email protected]>
  • Loading branch information
lauriemerrell and atvaccaro authored Aug 26, 2022
1 parent 60cf2bf commit 66dd13d
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
get_fs,
AirtableGTFSDataExtract,
AirtableGTFSDataRecord,
GTFSFeedExtractInfo,
GTFSScheduleFeedExtract,
GTFSFeedType,
download_feed,
ProcessingOutcome,
Expand All @@ -34,7 +34,7 @@

class AirtableGTFSDataRecordProcessingOutcome(ProcessingOutcome):
airtable_record: AirtableGTFSDataRecord
extract: Optional[GTFSFeedExtractInfo]
extract: Optional[GTFSScheduleFeedExtract]


class DownloadFeedsResult(PartitionedGCSArtifact):
Expand Down
200 changes: 200 additions & 0 deletions airflow/dags/unzip_and_validate_gtfs_schedule/unzip_gtfs_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# ---
# python_callable: airflow_unzip_extracts
# provide_context: true
# ---
import logging
import os
import pendulum
import zipfile

from io import BytesIO
from typing import ClassVar, List, Optional, Tuple

from calitp.storage import (
fetch_all_in_partition,
GTFSScheduleFeedExtract,
get_fs,
GTFSFeedType,
PartitionedGCSArtifact,
ProcessingOutcome,
)

SCHEDULE_UNZIPPED_BUCKET = os.environ["CALITP_BUCKET__GTFS_SCHEDULE_UNZIPPED"]
SCHEDULE_RAW_BUCKET = os.environ["CALITP_BUCKET__GTFS_SCHEDULE_RAW"]


class GTFSScheduleFeedFile(PartitionedGCSArtifact):
bucket: ClassVar[str] = SCHEDULE_UNZIPPED_BUCKET
partition_names: ClassVar[List[str]] = GTFSScheduleFeedExtract.partition_names
ts: pendulum.DateTime
base64_url: str
zipfile_path: str
original_filename: str

# if you try to set table directly, you get an error because it "shadows a BaseModel attribute"
# so set as a property instead
@property
def table(self) -> str:
return self.filename

@property
def dt(self) -> pendulum.Date:
return self.ts.date()


class GTFSScheduleFeedExtractUnzipOutcome(ProcessingOutcome):
zipfile_extract_path: str
zipfile_extract_md5hash: Optional[str]
zipfile_files: Optional[List[str]]
zipfile_dirs: Optional[List[str]]
extracted_files: Optional[List[str]]


class ScheduleUnzipResult(PartitionedGCSArtifact):
bucket: ClassVar[str] = SCHEDULE_UNZIPPED_BUCKET
table: ClassVar[str] = "unzipping_results"
partition_names: ClassVar[List[str]] = ["dt"]
dt: pendulum.Date
outcomes: List[GTFSScheduleFeedExtractUnzipOutcome]

@property
def successes(self) -> List[GTFSScheduleFeedExtractUnzipOutcome]:
return [outcome for outcome in self.outcomes if outcome.success]

@property
def failures(self) -> List[GTFSScheduleFeedExtractUnzipOutcome]:
return [outcome for outcome in self.outcomes if not outcome.success]

def save(self, fs):
self.save_content(
fs=fs,
content="\n".join(o.json() for o in self.outcomes).encode(),
exclude={"outcomes"},
)


def summarize_zip_contents(
zip: zipfile.ZipFile, at: str = ""
) -> Tuple[List[str], List[str], bool]:
files = []
directories = []
for entry in zip.namelist():
if zipfile.Path(zip, at=entry).is_file():
files.append(entry)
if zipfile.Path(zip, at=entry).is_dir():
directories.append(entry)
logging.info(f"Found files: {files} and directories: {directories}")
# the only valid case for any directory inside the zipfile is if there's exactly one and it's the only item at the root of the zipfile
# (in which case we can treat that directory's contents as the feed)
is_valid = (not directories) or (
(len(directories) == 1)
and ([item.is_dir() for item in zipfile.Path(zip, at="").iterdir()] == [True])
)
return files, directories, is_valid


def process_feed_files(
fs,
extract: GTFSScheduleFeedExtract,
zip: zipfile.ZipFile,
files: List[str],
directories: List[str],
is_valid: bool,
):
zipfile_files = []
if not is_valid:
raise ValueError(
"Unparseable zip: File/directory structure within zipfile cannot be unpacked"
)

for file in files:
# make a proper path to access the .name attribute later
file_path = zipfile.Path(zip, at=file)
with zip.open(file) as f:
file_content = f.read()
file_extract = GTFSScheduleFeedFile(
ts=extract.ts,
base64_url=extract.base64_url,
zipfile_path=extract.path,
original_filename=file,
# only replace slashes so that this is a mostly GCS-filepath-safe string
# if we encounter something else, we will address: https://cloud.google.com/storage/docs/naming-objects
filename=file_path.name.replace("/", "__"),
)
file_extract.save_content(content=file_content, fs=fs)
zipfile_files.append(file_extract)
return zipfile_files


def unzip_individual_feed(
fs, extract: GTFSScheduleFeedExtract
) -> GTFSScheduleFeedExtractUnzipOutcome:
logging.info(f"Processing {extract.name}")
zipfile_md5_hash = ""
files = []
directories = []
try:
with fs.open(extract.path) as f:
zipfile_md5_hash = f.info()["md5Hash"]
zip = zipfile.ZipFile(BytesIO(f.read()))
files, directories, is_valid = summarize_zip_contents(zip)
zipfile_files = process_feed_files(
fs, extract, zip, files, directories, is_valid
)
except Exception as e:
logging.warn(f"Can't process {extract.path}: {e}")
return GTFSScheduleFeedExtractUnzipOutcome(
success=False,
zipfile_extract_md5hash=zipfile_md5_hash,
zipfile_extract_path=extract.path,
exception=e,
zipfile_files=files,
zipfile_dirs=directories,
)
logging.info(f"Successfully unzipped {extract.path}")
return GTFSScheduleFeedExtractUnzipOutcome(
success=True,
zipfile_extract_md5hash=zipfile_md5_hash,
zipfile_extract_path=extract.path,
zipfile_files=files,
zipfile_dirs=directories,
extracted_files=[file.path for file in zipfile_files],
)


def unzip_extracts(day: pendulum.datetime):
fs = get_fs()
day = pendulum.instance(day).date()
extracts = fetch_all_in_partition(
cls=GTFSScheduleFeedExtract,
bucket=SCHEDULE_RAW_BUCKET,
table=GTFSFeedType.schedule,
fs=get_fs(),
partitions={
"dt": day,
},
verbose=True,
)

logging.info(f"Identified {len(extracts)} records for {day}")
outcomes = []
for extract in extracts:
outcome = unzip_individual_feed(fs, extract)
outcomes.append(outcome)

result = ScheduleUnzipResult(filename="results.jsonl", dt=day, outcomes=outcomes)
result.save(fs)

assert len(extracts) == len(
result.outcomes
), f"ended up with {len(outcomes)} outcomes from {len(extracts)} extracts"


def airflow_unzip_extracts(task_instance, execution_date, **kwargs):
unzip_extracts(execution_date)


if __name__ == "__main__":
# for testing
logging.basicConfig(level=logging.INFO)
unzip_extracts(pendulum.datetime(1901, 1, 1))
1 change: 1 addition & 0 deletions airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ x-airflow-common:
CALITP_BUCKET__GTFS_RT_VALIDATION: "gs://test-calitp-gtfs-rt-validation"
CALITP_BUCKET__GTFS_SCHEDULE_RAW: "gs://test-calitp-gtfs-schedule-raw"
CALITP_BUCKET__GTFS_SCHEDULE_VALIDATION: "gs://test-calitp-gtfs-schedule-validation"
CALITP_BUCKET__GTFS_SCHEDULE_UNZIPPED: "gs://test-calitp-gtfs-schedule-unzipped"

# TODO: this can be removed once we've confirmed it's no longer in Airtable
GRAAS_SERVER_URL: $GRAAS_SERVER_URL
Expand Down
2 changes: 1 addition & 1 deletion airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
calitp==2022.8.18.1
calitp==2022.8.25
fsspec==2022.5.0
intake==0.6.1
ipdb==0.13.4
Expand Down

0 comments on commit 66dd13d

Please sign in to comment.