Skip to content

Commit

Permalink
Set RT validator version as metadata and fix a bug (#1732)
Browse files Browse the repository at this point in the history
* set rt validator version as metadata

* add validator version in metadata and put extract under a key

* fix schedule data exception string representation and assert after outcomes upload

* fix poetry in docker, lock

* use export and install

* update typer

* fix schedule downloading... also add url filter to cli

* get latest validator from github just in case, and keep name

* rename this here too

* address PR comments
  • Loading branch information
atvaccaro authored Sep 2, 2022
1 parent 592921d commit 308b541
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 150 deletions.
12 changes: 6 additions & 6 deletions jobs/gtfs-rt-parser-v2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ FROM openjdk:11
LABEL org.opencontainers.image.source https://github.com/cal-itp/data-infra

ENV GTFS_RT_VALIDATOR_JAR=/gtfs-realtime-validator.jar
ENV GTFS_VALIDATOR_VERSION=v1.0.0
ENV GTFS_RT_VALIDATOR_VERSION=v1.0.0

RUN apt-get update -y \
&& apt-get install -y python3 python3-pip
&& apt-get install -y python3 python3-pip python3-venv

RUN curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python3 -
ENV PATH="${PATH}:/root/.poetry/bin"
RUN curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py | python3 -
ENV PATH="/root/.local/bin:${PATH}"

# formerly the "1.0.0-SNAPSHOT" from S3
COPY ./rt-validator-v1.0.0_cli.jar ${GTFS_RT_VALIDATOR_JAR}
# from https://github.com/MobilityData/gtfs-realtime-validator/packages/1268973
COPY ./gtfs-realtime-validator-lib-1.0.0-20220223.003525-2.jar ${GTFS_RT_VALIDATOR_JAR}

WORKDIR /app

Expand Down
Binary file not shown.
51 changes: 36 additions & 15 deletions jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

RT_PARSED_BUCKET = os.environ["CALITP_BUCKET__GTFS_RT_PARSED"]
RT_VALIDATION_BUCKET = os.environ["CALITP_BUCKET__GTFS_RT_VALIDATION"]
GTFS_RT_VALIDATOR_VERSION = os.environ["GTFS_RT_VALIDATOR_VERSION"]


def make_dict_bq_safe(d: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -212,7 +213,7 @@ def table(self):
if self.step == RTProcessingStep.parse:
return self.feed_type
if self.step == RTProcessingStep.validate:
return f"{self.feed_type}_validations"
return f"{self.feed_type}_validation_notices"
raise RuntimeError("we should not be here")

@property
Expand Down Expand Up @@ -309,20 +310,19 @@ def download_gtfs_schedule_zip(
pbar=None,
) -> str:
# fetch and zip gtfs schedule
actual_dst_path = "/".join([dst_path, schedule_extract.filename])
log(
f"Fetching gtfs schedule data from {schedule_extract.path} to {actual_dst_path}",
f"Fetching gtfs schedule data from {schedule_extract.path} to {dst_path}",
pbar=pbar,
)
get_with_retry(fs, schedule_extract.path, actual_dst_path, recursive=True)
get_with_retry(fs, schedule_extract.path, dst_path, recursive=True)

# https://github.com/MobilityData/gtfs-realtime-validator/issues/92
# try:
# os.remove(os.path.join(dst_path, "areas.txt"))
# except FileNotFoundError:
# pass

return actual_dst_path
return "/".join([dst_path, schedule_extract.filename])


def execute_rt_validator(
Expand All @@ -346,7 +346,8 @@ def execute_rt_validator(
subprocess.run(
args,
capture_output=True,
).check_returncode()
check=True,
)


def validate_and_upload(
Expand All @@ -367,7 +368,7 @@ def validate_and_upload(
pbar=pbar,
)
except FileNotFoundError:
raise ScheduleDataNotFound(f"no schedule data found for {first_extract}")
raise ScheduleDataNotFound(f"no schedule data found for {first_extract.path}")

execute_rt_validator(
gtfs_zip,
Expand Down Expand Up @@ -407,8 +408,10 @@ def validate_and_upload(
records_to_upload.extend(
[
{
# back and forth so we can use pydantic serialization
"metadata": make_pydantic_model_bq_safe(extract),
"metadata": {
"gtfs_validator_version": GTFS_RT_VALIDATOR_VERSION,
"extract_path": extract.path,
},
**record,
}
for record in records
Expand Down Expand Up @@ -501,7 +504,9 @@ def parse_and_upload(
{
"header": parsed["header"],
# back and forth so we use pydantic serialization
"metadata": make_pydantic_model_bq_safe(extract),
"metadata": {
"extract_path": extract.path,
},
**copy.deepcopy(record),
}
)
Expand Down Expand Up @@ -583,6 +588,12 @@ def parse_and_validate(
fg=typer.colors.RED,
pbar=pbar,
)
if isinstance(e, subprocess.CalledProcessError):
log(
e.stderr,
fg=typer.colors.YELLOW,
pbar=pbar,
)

return [
RTFileProcessingOutcome(
Expand Down Expand Up @@ -619,6 +630,7 @@ def main(
threads: int = 4,
jar_path: Path = JAR_DEFAULT,
verbose: bool = False,
base64url: str = None,
):
pendulum_hour = pendulum.instance(hour, tz="Etc/UTC")
files: List[GTFSRTFeedExtract] = fetch_all_in_partition(
Expand Down Expand Up @@ -646,17 +658,25 @@ def main(
filename=f"{feed_type}{JSONL_GZIP_EXTENSION}",
feed_type=feed_type,
hour=hour,
base64_url=url,
base64_url=base64url,
extracts=files,
)
for (hour, url), files in rt_aggs.items()
for (hour, base64url), files in rt_aggs.items()
]

typer.secho(
f"found {len(files)} {feed_type} files in {len(aggregations_to_process)} aggregations to process",
fg=typer.colors.MAGENTA,
)

if base64url:
typer.secho(
f"url filter applied, only processing {base64url}", fg=typer.colors.YELLOW
)
aggregations_to_process = [
agg for agg in aggregations_to_process if agg.base64_url == base64url
]

if limit:
typer.secho(f"limit of {limit} feeds was set", fg=typer.colors.YELLOW)
aggregations_to_process = list(
Expand Down Expand Up @@ -708,9 +728,6 @@ def main(
if pbar:
del pbar

assert len(outcomes) == len(
files
), f"we ended up with {len(outcomes)} outcomes from {len(files)}"
result = GTFSRTJobResult(
# TODO: these seem weird...
hour=aggregations_to_process[0].hour,
Expand All @@ -721,6 +738,10 @@ def main(
)
save_job_result(get_fs(), result)

assert len(outcomes) == len(
files
), f"we ended up with {len(outcomes)} outcomes from {len(files)}"

if exceptions:
exc_str = "\n".join(str(tup) for tup in exceptions)
msg = f"got {len(exceptions)} exceptions from processing {len(aggregations_to_process)} feeds:\n{exc_str}"
Expand Down
Loading

0 comments on commit 308b541

Please sign in to comment.