Skip to content

Commit

Permalink
Alterar Lógica Captura GPS Validador Jaé (#588)
Browse files Browse the repository at this point in the history
* criar schedule 5min

* mudar logica gps

* adicionar paginação get_raw_db

* alterar schedule gps / skip if running

* corrigir query gps

* adicionar retry na função de get data db

* corrigir state handler

* teste project

* corrigir erro se page_size for None

* add docstring

* corrigir doc

* alterar interval minutes

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pixuimpou and mergify[bot] authored Dec 7, 2023
1 parent 5dec18c commit 4de9367
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 56 deletions.
90 changes: 49 additions & 41 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)


from pipelines.utils.utils import set_default_parameters
from pipelines.utils.utils import set_default_parameters, skip_if_running_handler

# SMTR Imports #

Expand All @@ -36,7 +36,12 @@

from pipelines.rj_smtr.constants import constants

from pipelines.rj_smtr.schedules import every_hour, every_minute, every_day_hour_five
from pipelines.rj_smtr.schedules import (
every_hour,
every_minute,
every_day_hour_five,
every_5_minutes,
)

# Flows #

Expand Down Expand Up @@ -74,7 +79,10 @@
| constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
)

# bilhetagem_tracking_captura.schedule = every_minute
bilhetagem_tracking_captura.state_handlers.append(skip_if_running_handler)


bilhetagem_tracking_captura.schedule = every_5_minutes

# BILHETAGEM RESSARCIMENTO - SUBFLOW PARA RODAR DIARIAMENTE #

Expand Down Expand Up @@ -309,44 +317,44 @@
bilhetagem_transacao_tratamento.schedule = every_hour


with Flow(
"SMTR: Bilhetagem GPS Validador - Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
) as bilhetagem_gps_tratamento:
timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_transacao_tratamento.name + " ",
now_time=timestamp,
)

LABELS = get_current_flow_labels()

# Recaptura GPS

run_recaptura_gps = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
)

wait_recaptura_gps = wait_for_flow_run(
run_recaptura_gps,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)


bilhetagem_gps_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_gps_tratamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
# bilhetagem_gps_tratamento.schedule = every_hour
# with Flow(
# "SMTR: Bilhetagem GPS Validador - Tratamento",
# code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
# ) as bilhetagem_gps_tratamento:
# timestamp = get_rounded_timestamp(
# interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
# )

# rename_flow_run = rename_current_flow_run_now_time(
# prefix=bilhetagem_transacao_tratamento.name + " ",
# now_time=timestamp,
# )

# LABELS = get_current_flow_labels()

# # Recaptura GPS

# run_recaptura_gps = create_flow_run(
# flow_name=bilhetagem_recaptura.name,
# project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
# labels=LABELS,
# parameters=constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
# )

# wait_recaptura_gps = wait_for_flow_run(
# run_recaptura_gps,
# stream_states=True,
# stream_logs=True,
# raise_final_state=True,
# )


# bilhetagem_gps_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
# bilhetagem_gps_tratamento.run_config = KubernetesRun(
# image=emd_constants.DOCKER_IMAGE.value,
# labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
# )
# # bilhetagem_gps_tratamento.schedule = every_hour

with Flow(
"SMTR: Bilhetagem Ordem Pagamento - Captura/Tratamento",
Expand Down
7 changes: 4 additions & 3 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
tracking_detalhe
WHERE
data_tracking BETWEEN '{start}'
AND '{end}'
id > {last_id} AND id <= {max_id}
""",
"page_size": 1000,
"max_pages": 100,
},
"primary_key": ["id"],
"interval_minutes": 1,
"interval_minutes": 5,
}

BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS = [
Expand Down
14 changes: 14 additions & 0 deletions pipelines/rj_smtr/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@
]
)

every_5_minutes = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=5),
start_date=datetime(
2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
]
)

every_10_minutes = Schedule(
clocks=[
IntervalClock(
Expand Down
65 changes: 60 additions & 5 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from prefect import task
from pytz import timezone
import requests
from pandas_gbq.exceptions import GenericGBQException

from pipelines.rj_smtr.constants import constants
from pipelines.rj_smtr.utils import (
Expand Down Expand Up @@ -657,16 +658,70 @@ def create_request_params(
]
request_url = database["host"]

datetime_range = get_datetime_range(
timestamp=timestamp, interval=timedelta(minutes=interval_minutes)
)

request_params = {
"database": extract_params["database"],
"engine": database["engine"],
"query": extract_params["query"].format(**datetime_range),
}

if table_id == constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value["table_id"]:
project = bq_project(kind="bigquery_staging")
log(f"project = {project}")
try:
logs_query = f"""
SELECT
timestamp_captura
FROM
`{project}.{dataset_id}_staging.{table_id}_logs`
WHERE
data <= '{timestamp.strftime("%Y-%m-%d")}'
AND sucesso = "True"
ORDER BY
timestamp_captura DESC
"""
last_success_dates = bd.read_sql(
query=logs_query, billing_project_id=project
)
last_success_dates = last_success_dates.iloc[:, 0].to_list()
for success_ts in last_success_dates:
success_ts = datetime.fromisoformat(success_ts)
last_id_query = f"""
SELECT
MAX(id)
FROM
`{project}.{dataset_id}_staging.{table_id}`
WHERE
data = '{success_ts.strftime("%Y-%m-%d")}'
and hora = "{success_ts.hour}";
"""

last_captured_id = bd.read_sql(
query=last_id_query, billing_project_id=project
)
last_captured_id = last_captured_id.iloc[0][0]
if last_captured_id is None:
print("ID is None, trying next timestamp")
else:
log(f"last_captured_id = {last_captured_id}")
break
except GenericGBQException as err:
if "404 Not found" in str(err):
log("Table Not found, returning id = 0")
last_captured_id = 0

request_params["query"] = extract_params["query"].format(
last_id=last_captured_id,
max_id=int(last_captured_id)
+ extract_params["page_size"] * extract_params["max_pages"],
)
request_params["page_size"] = extract_params["page_size"]
request_params["max_pages"] = extract_params["max_pages"]
else:
datetime_range = get_datetime_range(
timestamp=timestamp, interval=timedelta(minutes=interval_minutes)
)

request_params["query"] = extract_params["query"].format(**datetime_range)

elif dataset_id == constants.GTFS_DATASET_ID.value:
request_params = extract_params["filename"]

Expand Down
89 changes: 82 additions & 7 deletions pipelines/rj_smtr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,34 @@ def get_raw_data_gcs(
return error, data, filetype


def close_db_connection(connection, engine: str):
"""
Safely close a database connection
Args:
connection: the database connection
engine (str): The datase management system
"""
if engine == "postgresql":
if not connection.closed:
connection.close()
log("Database connection closed")
elif engine == "mysql":
if connection.open:
connection.close()
log("Database connection closed")
else:
raise NotImplementedError(f"Engine {engine} not supported")


def get_raw_data_db(
query: str, engine: str, host: str, secret_path: str, database: str
query: str,
engine: str,
host: str,
secret_path: str,
database: str,
page_size: int = None,
max_pages: int = None,
) -> tuple[str, str, str]:
"""
Get data from Databases
Expand All @@ -653,6 +679,9 @@ def get_raw_data_db(
host (str): The database host
secret_path (str): Secret path to get credentials
database (str): The database to connect
page_size (int, Optional): The maximum number of rows returned by the paginated query
if you set a value for this argument, the query will have LIMIT and OFFSET appended to it
max_pages (int, Optional): The maximum number of paginated queries to execute
Returns:
tuple[str, str, str]: Error, data and filetype
Expand All @@ -666,22 +695,68 @@ def get_raw_data_db(
error = None
filetype = "json"

try:
credentials = get_vault_secret(secret_path)["data"]
if max_pages is None:
max_pages = 1

full_data = []
paginated_query = query
credentials = get_vault_secret(secret_path)["data"]
if page_size is not None:
paginated_query = paginated_query + f"LIMIT {page_size} OFFSET {{offset}}"

with connector_mapping[engine](
connector = connector_mapping[engine]

try:
connection = connector(
host=host,
user=credentials["user"],
password=credentials["password"],
database=database,
) as connection:
data = pd.read_sql(sql=query, con=connection).to_dict(orient="records")
)
for page in range(max_pages):
retries = 10
formatted_query = paginated_query
if page_size is not None:
formatted_query = formatted_query.format(offset=page * page_size)

for retry in range(retries):
try:
log(f"Executing query:\n{formatted_query}")
data = pd.read_sql(sql=formatted_query, con=connection).to_dict(
orient="records"
)
break
except Exception as err:
log(f"[ATTEMPT {retry}]: {err}")
close_db_connection(connection=connection, engine=engine)
connection = connector(
host=host,
user=credentials["user"],
password=credentials["password"],
database=database,
)
if retry == retries - 1:
raise err

full_data += data

log(f"Returned {len(data)} rows")

if page_size is None or len(data) < page_size:
log("Database Extraction Finished")
break

except Exception:
full_data = []
error = traceback.format_exc()
log(f"[CATCHED] Task failed with error: \n{error}", level="error")
finally:
try:
close_db_connection(connection=connection, engine=engine)
except Exception:
pass

return error, data, filetype
return error, full_data, filetype


def save_treated_local_func(
Expand Down

0 comments on commit 4de9367

Please sign in to comment.