Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RT archiver v3 #1614

Merged
merged 55 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
be35482
get new calitp version
atvaccaro Jul 13, 2022
c290e10
import gcs models from calitp-py!
atvaccaro Jul 13, 2022
0d5865d
missed a couple
atvaccaro Jul 13, 2022
c966e03
get us going!
atvaccaro Jul 13, 2022
9dea837
fix: make airtable gcs operator use timestamps rather than time string
lauriemerrell Jul 14, 2022
cb48278
fix(timestamp partitions): update calitp version to get schedule part…
lauriemerrell Jul 14, 2022
0792b07
fix(timestamp partitions): explicitly use isoformat string
lauriemerrell Jul 14, 2022
f2303b9
use new calitp version
atvaccaro Jul 14, 2022
3b85740
start experimenting with task queue options and metrics
atvaccaro Jul 5, 2022
3596151
get this working and test performance with greenlets
atvaccaro Jul 5, 2022
4f60f24
couple more metrics
atvaccaro Jul 5, 2022
97e904c
wip testing with multiple consumers at high volume
atvaccaro Jul 5, 2022
d514dae
start optimizing for lots of small tasks; have to make redis interact…
atvaccaro Jul 6, 2022
d0a055a
fix key str format
atvaccaro Jul 6, 2022
6591c69
couple more libs
atvaccaro Jul 6, 2022
4e8f294
wip
atvaccaro Jul 14, 2022
f38bff9
wip on discussed changes
atvaccaro Jul 15, 2022
a4c1c04
get the keys from environ for now
atvaccaro Jul 18, 2022
33a2617
use new calitp py
atvaccaro Jul 18, 2022
4558b5b
print a bit more
atvaccaro Jul 19, 2022
ee59332
we are just gonna get stuff in the env
atvaccaro Jul 19, 2022
6a7cb85
commit this before I break anything
atvaccaro Jul 21, 2022
9a7c2fc
fmt
atvaccaro Jul 21, 2022
0eaf976
bump calitp-py
atvaccaro Jul 22, 2022
65bf7b1
lint
atvaccaro Jul 22, 2022
d678519
rename v2 to v3 since 2.X tags already exist
atvaccaro Jul 23, 2022
e726967
kinda make this runnable
atvaccaro Jul 23, 2022
5fe6979
new node pool just dropped
atvaccaro Jul 23, 2022
1667434
get running in docker compose to kick the tires
atvaccaro Jul 23, 2022
3637ea6
start on RT v3 k8s
atvaccaro Jul 23, 2022
692ba13
get the consumer working mostly?
atvaccaro Jul 23, 2022
012850c
label redis pod appropriately
atvaccaro Jul 23, 2022
be7e4a1
tell consumer about temp rt secrets
atvaccaro Jul 23, 2022
c584cd0
that was dumb
atvaccaro Jul 23, 2022
2e9a213
ticker k8s!
atvaccaro Jul 23, 2022
bb7c8e1
set expire time on the huey instance
atvaccaro Jul 23, 2022
5824e80
point consumer at svc account json
atvaccaro Jul 23, 2022
168e2b9
avoid pulling the stacktrace in
atvaccaro Jul 23, 2022
3571b58
scrape on 9102
atvaccaro Jul 23, 2022
73d64b7
bump to 16 workers per consumer
atvaccaro Jul 23, 2022
6b8d79a
bring in new calitp and fix tick rounding
atvaccaro Jul 25, 2022
45e863b
improve metrics and labels
atvaccaro Jul 25, 2022
5a6ad80
clean up labels
atvaccaro Jul 25, 2022
8a18743
get secrets from secret manager sdk before the consumer starts...
atvaccaro Jul 25, 2022
2a4bc47
missed this
atvaccaro Jul 25, 2022
cd41e69
fix secrets volume and adjust affinities
atvaccaro Jul 25, 2022
aa8d4c4
add content type header to bytes
atvaccaro Jul 25, 2022
67ba932
ugh whitespace
atvaccaro Jul 25, 2022
5527518
remove celery and gevent from pyproject deps
atvaccaro Jul 27, 2022
4d529e1
we might as well specify huey app name by env as well just in case we…
atvaccaro Jul 27, 2022
161f326
write to the prod bucket!
atvaccaro Jul 27, 2022
cfece5e
create a preprod version and deploy it
atvaccaro Jul 27, 2022
7f49141
run fewer workers in preprod
atvaccaro Jul 27, 2022
3594f55
move pull policies to patches, and only run 1 dev consumer
atvaccaro Jul 27, 2022
a2cdc49
add redis considerations to readme
atvaccaro Jul 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/airflow:2.1.4
FROM apache/airflow:2.1.4-python3.8

# install gcloud as root, then switch back to airflow
USER root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from pydantic.tools import parse_obj_as
from requests import Session
from typing import List, Dict
from calitp.storage import get_fs
from utils import (
from calitp.storage import (
get_fs,
AirtableGTFSDataExtract,
AirtableGTFSDataRecord,
AirtableGTFSDataRecordProcessingOutcome,
Expand Down Expand Up @@ -72,7 +72,7 @@ def download_feed(
config=record,
response_code=resp.status_code,
response_headers=resp.headers,
download_time=pendulum.now(),
ts=pendulum.now(),
)

extract.save_content(fs=get_fs(), content=resp.content)
Expand Down Expand Up @@ -135,8 +135,8 @@ def download_all(task_instance, execution_date, **kwargs):
), f"we somehow ended up with {len(outcomes)} outcomes from {len(records)} records"

result = DownloadFeedsResult(
start_time=start,
end_time=pendulum.now(),
ts=start,
end=pendulum.now(),
outcomes=outcomes,
filename="results.jsonl",
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pandas as pd
from calitp.config import is_development
from utils import prefix_bucket
from calitp.storage import prefix_bucket

# To add a macro, add its definition in the appropriate section
# And then add it to the dictionary at the bottom of this file
Expand Down
5 changes: 2 additions & 3 deletions airflow/plugins/operators/airtable_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from pyairtable import Table
from pydantic import BaseModel
from typing import Optional
from utils import make_name_bq_safe
from calitp.storage import get_fs
from calitp.storage import get_fs, make_name_bq_safe

from airflow.models import BaseOperator

Expand Down Expand Up @@ -98,7 +97,7 @@ def make_hive_path(self, bucket: str):
bucket,
f"{self.air_base_name}__{safe_air_table_name}",
f"dt={self.extract_time.to_date_string()}",
f"time={self.extract_time.to_time_string()}",
f"ts={self.extract_time.to_iso8601_string()}",
f"{safe_air_table_name}.jsonl.gz",
)

Expand Down
Loading