Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gtfs schedule unzip v2 #1696

Merged
merged 13 commits into from
Aug 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
get_fs,
AirtableGTFSDataExtract,
AirtableGTFSDataRecord,
GTFSFeedExtractInfo,
GTFSScheduleFeedExtract,
GTFSFeedType,
download_feed,
ProcessingOutcome,
Expand All @@ -34,7 +34,7 @@

class AirtableGTFSDataRecordProcessingOutcome(ProcessingOutcome):
airtable_record: AirtableGTFSDataRecord
extract: Optional[GTFSFeedExtractInfo]
extract: Optional[GTFSScheduleFeedExtract]


class DownloadFeedsResult(PartitionedGCSArtifact):
Expand Down
200 changes: 200 additions & 0 deletions airflow/dags/unzip_and_validate_gtfs_schedule/unzip_gtfs_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# ---
# python_callable: airflow_unzip_extracts
# provide_context: true
# ---
import logging
import os
import pendulum
import zipfile

from io import BytesIO
from typing import ClassVar, List, Optional, Tuple

from calitp.storage import (
fetch_all_in_partition,
GTFSScheduleFeedExtract,
get_fs,
GTFSFeedType,
PartitionedGCSArtifact,
ProcessingOutcome,
)

SCHEDULE_UNZIPPED_BUCKET = os.environ["CALITP_BUCKET__GTFS_SCHEDULE_UNZIPPED"]
SCHEDULE_RAW_BUCKET = os.environ["CALITP_BUCKET__GTFS_SCHEDULE_RAW"]


class GTFSScheduleFeedFile(PartitionedGCSArtifact):
bucket: ClassVar[str] = SCHEDULE_UNZIPPED_BUCKET
partition_names: ClassVar[List[str]] = GTFSScheduleFeedExtract.partition_names
ts: pendulum.DateTime
base64_url: str
zipfile_path: str
original_filename: str

# if you try to set table directly, you get an error because it "shadows a BaseModel attribute"
# so set as a property instead
@property
def table(self) -> str:
return self.filename

@property
def dt(self) -> pendulum.Date:
return self.ts.date()


class GTFSScheduleFeedExtractUnzipOutcome(ProcessingOutcome):
zipfile_extract_path: str
zipfile_extract_md5hash: Optional[str]
zipfile_files: Optional[List[str]]
zipfile_dirs: Optional[List[str]]
extracted_files: Optional[List[str]]


class ScheduleUnzipResult(PartitionedGCSArtifact):
lauriemerrell marked this conversation as resolved.
Show resolved Hide resolved
bucket: ClassVar[str] = SCHEDULE_UNZIPPED_BUCKET
table: ClassVar[str] = "unzipping_results"
partition_names: ClassVar[List[str]] = ["dt"]
dt: pendulum.Date
outcomes: List[GTFSScheduleFeedExtractUnzipOutcome]

@property
def successes(self) -> List[GTFSScheduleFeedExtractUnzipOutcome]:
return [outcome for outcome in self.outcomes if outcome.success]

@property
def failures(self) -> List[GTFSScheduleFeedExtractUnzipOutcome]:
return [outcome for outcome in self.outcomes if not outcome.success]

def save(self, fs):
self.save_content(
fs=fs,
content="\n".join(o.json() for o in self.outcomes).encode(),
exclude={"outcomes"},
)


def summarize_zip_contents(
zip: zipfile.ZipFile, at: str = ""
) -> Tuple[List[str], List[str], bool]:
files = []
directories = []
for entry in zip.namelist():
if zipfile.Path(zip, at=entry).is_file():
files.append(entry)
if zipfile.Path(zip, at=entry).is_dir():
directories.append(entry)
logging.info(f"Found files: {files} and directories: {directories}")
# the only valid case for any directory inside the zipfile is if there's exactly one and it's the only item at the root of the zipfile
# (in which case we can treat that directory's contents as the feed)
is_valid = (not directories) or (
(len(directories) == 1)
and ([item.is_dir() for item in zipfile.Path(zip, at="").iterdir()] == [True])
)
lauriemerrell marked this conversation as resolved.
Show resolved Hide resolved
return files, directories, is_valid


def process_feed_files(
fs,
extract: GTFSScheduleFeedExtract,
zip: zipfile.ZipFile,
files: List[str],
directories: List[str],
is_valid: bool,
):
zipfile_files = []
if not is_valid:
raise ValueError(
"Unparseable zip: File/directory structure within zipfile cannot be unpacked"
)

for file in files:
# make a proper path to access the .name attribute later
file_path = zipfile.Path(zip, at=file)
with zip.open(file) as f:
file_content = f.read()
file_extract = GTFSScheduleFeedFile(
ts=extract.ts,
base64_url=extract.base64_url,
zipfile_path=extract.path,
original_filename=file,
# only replace slashes so that this is a mostly GCS-filepath-safe string
# if we encounter something else, we will address: https://cloud.google.com/storage/docs/naming-objects
filename=file_path.name.replace("/", "__"),
)
file_extract.save_content(content=file_content, fs=fs)
zipfile_files.append(file_extract)
return zipfile_files


def unzip_individual_feed(
fs, extract: GTFSScheduleFeedExtract
) -> GTFSScheduleFeedExtractUnzipOutcome:
logging.info(f"Processing {extract.name}")
zipfile_md5_hash = ""
files = []
directories = []
try:
with fs.open(extract.path) as f:
zipfile_md5_hash = f.info()["md5Hash"]
zip = zipfile.ZipFile(BytesIO(f.read()))
files, directories, is_valid = summarize_zip_contents(zip)
zipfile_files = process_feed_files(
fs, extract, zip, files, directories, is_valid
)
except Exception as e:
logging.warn(f"Can't process {extract.path}: {e}")
return GTFSScheduleFeedExtractUnzipOutcome(
success=False,
zipfile_extract_md5hash=zipfile_md5_hash,
lauriemerrell marked this conversation as resolved.
Show resolved Hide resolved
zipfile_extract_path=extract.path,
exception=e,
zipfile_files=files,
zipfile_dirs=directories,
)
logging.info(f"Successfully unzipped {extract.path}")
return GTFSScheduleFeedExtractUnzipOutcome(
success=True,
zipfile_extract_md5hash=zipfile_md5_hash,
zipfile_extract_path=extract.path,
zipfile_files=files,
zipfile_dirs=directories,
extracted_files=[file.path for file in zipfile_files],
)


def unzip_extracts(day: pendulum.datetime):
lauriemerrell marked this conversation as resolved.
Show resolved Hide resolved
fs = get_fs()
day = pendulum.instance(day).date()
extracts = fetch_all_in_partition(
cls=GTFSScheduleFeedExtract,
bucket=SCHEDULE_RAW_BUCKET,
table=GTFSFeedType.schedule,
fs=get_fs(),
partitions={
"dt": day,
},
verbose=True,
)

logging.info(f"Identified {len(extracts)} records for {day}")
outcomes = []
for extract in extracts:
outcome = unzip_individual_feed(fs, extract)
outcomes.append(outcome)

result = ScheduleUnzipResult(filename="results.jsonl", dt=day, outcomes=outcomes)
result.save(fs)

assert len(extracts) == len(
result.outcomes
), f"ended up with {len(outcomes)} outcomes from {len(extracts)} extracts"


def airflow_unzip_extracts(task_instance, execution_date, **kwargs):
unzip_extracts(execution_date)


if __name__ == "__main__":
# for testing
logging.basicConfig(level=logging.INFO)
unzip_extracts(pendulum.datetime(1901, 1, 1))
1 change: 1 addition & 0 deletions airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ x-airflow-common:
# CALITP_BUCKET__GTFS_RT_VALIDATION: "gs://test-calitp-gtfs-rt-validation"
CALITP_BUCKET__GTFS_SCHEDULE_RAW: "gs://test-calitp-gtfs-schedule-raw"
CALITP_BUCKET__GTFS_SCHEDULE_VALIDATION: "gs://test-calitp-gtfs-schedule-validation"
CALITP_BUCKET__GTFS_SCHEDULE_UNZIPPED: "gs://test-calitp-gtfs-schedule-unzipped"

# TODO: this can be removed once we've confirmed it's no longer in Airtable
GRAAS_SERVER_URL: $GRAAS_SERVER_URL
Expand Down
2 changes: 1 addition & 1 deletion airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
calitp==2022.8.18.1
calitp==2022.8.25
fsspec==2022.5.0
intake==0.6.1
ipdb==0.13.4
Expand Down