From 26eac120bc5c48be31676fb3dfbc1969cd265880 Mon Sep 17 00:00:00 2001 From: Boris Marinho Date: Fri, 16 Feb 2024 18:16:52 -0300 Subject: [PATCH 1/2] acrescenta quantidade de veiculos com problema de acordo com servico no tooltip --- src/tasks.py | 109 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 23 deletions(-) diff --git a/src/tasks.py b/src/tasks.py index 48e682c..c1d555d 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -8,12 +8,12 @@ import os import geopandas as gpd +import pandas as pd import traceback import folium import branca.colormap as cm import folium import folium.plugins as plugins -import geopandas as gpd import pytz @@ -84,7 +84,7 @@ def create_map(data=None): "horario_leitura_estacao", "acumulado_chuva_15_min", "acumulado_chuva_1_h", - "servico", + "servicos", "indicador_veiculo_parado_10_min", "indicador_veiculo_parado_30_min", "indicador_veiculo_parado_1_hora", @@ -457,28 +457,90 @@ def main(): print(f"df columns are:\n{df.columns}") # Calcula os indicadores de cada tile - df_tile_indicators = ( - df - .loc[df.groupby(["tile_id", "servico", "id_veiculo"]).timestamp_gps.idxmax()] - .groupby(["tile_id", "tile", "horario_leitura_estacao"]).agg( - { - "acumulado_chuva_15_min": "max", - "acumulado_chuva_1_h": "max", - "acumulado_chuva_4_h": "max", - "id_veiculo": "count", - "servico": lambda x: ", ".join(list(set(x))), - "indicador_veiculo_parado_10_min": "sum", - "indicador_veiculo_fora_rota_10_min": "sum", - "indicador_veiculo_parado_30_min": "sum", - "indicador_veiculo_fora_rota_30_min": "sum", - "indicador_veiculo_parado_1_hora": "sum", - "indicador_veiculo_fora_rota_1_hora": "sum", - } - ).reset_index() - ) + df_tile_indicators = df.loc[df.groupby(["tile_id", "servico", "id_veiculo"]).timestamp_gps.idxmax()].reset_index() + df_tile_indicators = df_tile_indicators.loc[df_tile_indicators.groupby(["tile_id", "servico", "id_veiculo"]).horario_leitura_estacao.idxmax()].reset_index() + df_tile_indicators = df_tile_indicators.groupby(["tile_id", "tile", "servico"]).agg( + { + "horario_leitura_estacao": "max", + "acumulado_chuva_15_min": "max", + "acumulado_chuva_1_h": "max", + "acumulado_chuva_4_h": "max", + "indicador_veiculo_parado_10_min": "sum", + "indicador_veiculo_fora_rota_10_min": "sum", + "indicador_veiculo_parado_30_min": "sum", + "indicador_veiculo_fora_rota_30_min": "sum", + "indicador_veiculo_parado_1_hora": "sum", + "indicador_veiculo_fora_rota_1_hora": "sum", + } + ).reset_index() + + df_tile_indicators['total_veiculo_problema'] = ( + df_tile_indicators.indicador_veiculo_parado_10_min + + df_tile_indicators.indicador_veiculo_parado_30_min + + df_tile_indicators.indicador_veiculo_parado_1_hora + + df_tile_indicators.indicador_veiculo_fora_rota_10_min + + df_tile_indicators.indicador_veiculo_fora_rota_30_min + + df_tile_indicators.indicador_veiculo_fora_rota_1_hora) + + servicos_dict = {} + + for i, row in df_tile_indicators.iterrows(): + if row['tile_id'] not in servicos_dict: + servicos_dict[row['tile_id']] = [] + servicos_dict[row['tile_id']].append((row['servico'], row['total_veiculo_problema'])) + + for tile_id in servicos_dict: + lista = servicos_dict[tile_id] + lista.sort(key=lambda x : x[1], reverse=True) + s = '' + for servico, total in lista: + s += f'{servico}: {total}, ' + servicos_dict[tile_id] = s + + + servicos_df = pd.DataFrame(servicos_dict.items(), columns=['tile_id', 'servicos']) + df_tile_indicators = df_tile_indicators.merge(servicos_df, how='inner', on='tile_id') + # display(servicos_df) + df_tile_indicators = df_tile_indicators.groupby(["tile_id", "tile"]).agg( + { + "servicos": "max", + "horario_leitura_estacao": "max", + "acumulado_chuva_15_min": "max", + "acumulado_chuva_1_h": "max", + "acumulado_chuva_4_h": "max", + "indicador_veiculo_parado_10_min": "sum", + "indicador_veiculo_fora_rota_10_min": "sum", + "indicador_veiculo_parado_30_min": "sum", + "indicador_veiculo_fora_rota_30_min": "sum", + "indicador_veiculo_parado_1_hora": "sum", + "indicador_veiculo_fora_rota_1_hora": "sum", + } + ).reset_index() + + + + # df_tile_indicators = ( + # df + # .loc[df.groupby(["tile_id", "servico", "id_veiculo"]).timestamp_gps.idxmax()] + # .groupby(["tile_id", "tile", "horario_leitura_estacao"]).agg( + # { + # "acumulado_chuva_15_min": "max", + # "acumulado_chuva_1_h": "max", + # "acumulado_chuva_4_h": "max", + # "id_veiculo": "count", + # "servico": lambda x: ", ".join(list(set(x))), + # "indicador_veiculo_parado_10_min": "sum", + # "indicador_veiculo_fora_rota_10_min": "sum", + # "indicador_veiculo_parado_30_min": "sum", + # "indicador_veiculo_fora_rota_30_min": "sum", + # "indicador_veiculo_parado_1_hora": "sum", + # "indicador_veiculo_fora_rota_1_hora": "sum", + # } + # ).reset_index() + # ) # Filtra a última medida da estacao - df_tile_indicators = df_tile_indicators.loc[df_tile_indicators.groupby(["tile_id"]).horario_leitura_estacao.idxmax()] + # df_tile_indicators = df_tile_indicators.loc[df_tile_indicators.groupby(["tile_id"]).horario_leitura_estacao.idxmax()] # df_tile_indicators["horario_leitura_estacao"] = df_tile_indicators.horario_leitura_estacao.astype(str) df_tile_indicators["horario_leitura_estacao"] = df_tile_indicators.horario_leitura_estacao.dt.total_seconds().apply(lambda s: f'{s // 3600:02.0f}:{(s % 3600) // 60:02.0f}') df_tile_indicators['geometry'] = df_tile_indicators['tile'].dropna().astype(str).apply(loads) @@ -512,4 +574,5 @@ def main(): redis.set('last_crash', last_crash) if __name__ == '__main__': - main() \ No newline at end of file + main() + cache_mapa() From 9f084c76a53978033de3cfe7624649a3a04e1b27 Mon Sep 17 00:00:00 2001 From: Boris Marinho Date: Tue, 20 Aug 2024 15:17:40 -0300 Subject: [PATCH 2/2] aponta pra tabela gps_sppo_15_minutos do projeto de producao --- src/tasks.py | 670 +++++++++++++++++++++++++++------------------------ 1 file changed, 354 insertions(+), 316 deletions(-) diff --git a/src/tasks.py b/src/tasks.py index c1d555d..a38d1b3 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -17,172 +17,181 @@ import pytz -app = Celery('main', broker=os.getenv('REDIS_CELERY')) -app.conf.timezone = 'UTC' +app = Celery("main", broker=os.getenv("REDIS_CELERY")) +app.conf.timezone = "UTC" + @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): - sender.add_periodic_task(180.0, main.s(), name='Update data every 3 minutes') - sender.add_periodic_task(180.0, cache_mapa.s(), name='Update map every 3 minutes') + sender.add_periodic_task(180.0, main.s(), name="Update data every 3 minutes") + sender.add_periodic_task(180.0, cache_mapa.s(), name="Update map every 3 minutes") + # V1: loading shapes in every update was very slow # New shapes should be parsed beforehand to geojson format with # the function below # def load_shapes(): - # # Carrega dados de rotas (shapes) - # shapes = pd.read_csv("src/data/shapes.txt", dtype={ - # 'shape_id': 'str', - # 'shape_pt_lat': 'float', - # 'shape_pt_lon': 'float', - # 'shape_pt_sequence': 'Int64', - # 'shape_dist_traveled': 'float', - # }) - # shapes = gpd.GeoDataFrame(shapes, - # geometry=gpd.points_from_xy(shapes.shape_pt_lon, shapes.shape_pt_lat) - # ).set_crs(epsg=4326) - # shapes.sort_values(['shape_id','shape_pt_sequence'], inplace=True) - # shapes = ( - # shapes[["shape_id", "shape_pt_lat", "shape_pt_lon"]] - # .groupby("shape_id") - # .agg(list) - # .apply(lambda x: LineString(zip(x[1], x[0])), axis=1) - # ) - - # shapes = gpd.GeoDataFrame( - # data=shapes.index, - # geometry = shapes.values, - # crs=4326 - # ) - # shapes['shape_id'] = shapes.shape_id.astype(str) - # shapes.to_json() - # return shapes +# # Carrega dados de rotas (shapes) +# shapes = pd.read_csv("src/data/shapes.txt", dtype={ +# 'shape_id': 'str', +# 'shape_pt_lat': 'float', +# 'shape_pt_lon': 'float', +# 'shape_pt_sequence': 'Int64', +# 'shape_dist_traveled': 'float', +# }) +# shapes = gpd.GeoDataFrame(shapes, +# geometry=gpd.points_from_xy(shapes.shape_pt_lon, shapes.shape_pt_lat) +# ).set_crs(epsg=4326) +# shapes.sort_values(['shape_id','shape_pt_sequence'], inplace=True) +# shapes = ( +# shapes[["shape_id", "shape_pt_lat", "shape_pt_lon"]] +# .groupby("shape_id") +# .agg(list) +# .apply(lambda x: LineString(zip(x[1], x[0])), axis=1) +# ) + +# shapes = gpd.GeoDataFrame( +# data=shapes.index, +# geometry = shapes.values, +# crs=4326 +# ) +# shapes['shape_id'] = shapes.shape_id.astype(str) +# shapes.to_json() +# return shapes + def load_shapes(): - return gpd.read_file('data/shapes.geojson') + return gpd.read_file("data/shapes.geojson") + def create_map(data=None): - df_geo = data + df_geo = data # df_geo = gpd.read_file('dataframe.geojson') # Instancia o mapa - shapes = load_shapes() - m = folium.Map(location=[-22.917690, -43.413861], zoom_start=11) - - - # Adiciona dados de indice de chuva dos tiles - colormap = cm.LinearColormap( - ["green", "yellow", "red"], - vmin=0, - vmax=100, - caption="Acumulado de chuva na última hora (mm)" - ) - colormap.add_to(m) - colorscale_dict = df_geo.set_index("tile_id")["acumulado_chuva_1_h"].sort_values() - - - popup = folium.GeoJsonPopup( - fields=[ - "horario_leitura_estacao", - "acumulado_chuva_15_min", - "acumulado_chuva_1_h", - "servicos", - "indicador_veiculo_parado_10_min", - "indicador_veiculo_parado_30_min", - "indicador_veiculo_parado_1_hora", - "indicador_veiculo_fora_rota_10_min", - "indicador_veiculo_fora_rota_30_min", - "indicador_veiculo_fora_rota_1_hora", - "tile_id" - ], - aliases=[ - "🌧️ Hora da leitura:", - "🌧️ Acumulado 15min:", - "🌧️ Acumulado 1h:", - "🚍 Serviços em operação: ", - "🛑 Veículos parados 10 min:", - "🛑 Veículos parados 30 min:", - "🛑 Veículos parados 1 hora:", - "⤴️ Veículos desviados 10 min:", - "⤴️ Veículos desviados 30 min:", - "⤴️ Veículos desviados 1 hora:", - "📍 ID do polígono: " - ], - localize=True, - labels=True, - # style="background-color: yellow;", - ) - - folium.GeoJson( - df_geo, - style_function=lambda feature: { - "fillColor": colormap(colorscale_dict[feature["properties"]["tile_id"]]), - "color": "black", - "weight": .2, - "fillOpacity": 0.7, - }, - color='acumulado_chuva_15_min', - weight=2.5, - opacity=1, - popup=popup - ).add_to(m) - - df_geo['total_veiculo_problema'] = ( - df_geo.indicador_veiculo_parado_10_min + - df_geo.indicador_veiculo_parado_30_min + - df_geo.indicador_veiculo_parado_1_hora + - df_geo.indicador_veiculo_fora_rota_10_min + - df_geo.indicador_veiculo_fora_rota_30_min + - df_geo.indicador_veiculo_fora_rota_1_hora) - - # Adiciona icones de qtd de veiculos parados/fora da rota - for i in range(0, len(df_geo)): - - pin_count = df_geo.iloc[i].total_veiculo_problema - - pin = False - - if pin_count > 10: - pin = True - pin_color = "#085d73" - - elif pin_count > 5: - pin = True - pin_color = "#1f97b5" - - elif pin_count > 0: - pin = True - pin_color = "#5cdafa" - - if pin == True: - folium.Marker( - location=[df_geo.iloc[i].geometry.centroid.y, df_geo.iloc[i].geometry.centroid.x], - icon=plugins.BeautifyIcon( - icon="arrow-down", - icon_shape="marker", - number=str(pin_count), - border_color=pin_color, - background_color=pin_color - ) - ).add_to(m) - - # Adiciona rotas ao mapa - folium.GeoJson(shapes['geometry'], color='gray', weight=1.5, opacity=.8).add_to(m) - - # Ajusta camadas - folium.TileLayer('cartodbpositron').add_to(m) - folium.LayerControl().add_to(m) - return m + shapes = load_shapes() + m = folium.Map(location=[-22.917690, -43.413861], zoom_start=11) + + # Adiciona dados de indice de chuva dos tiles + colormap = cm.LinearColormap( + ["green", "yellow", "red"], + vmin=0, + vmax=100, + caption="Acumulado de chuva na última hora (mm)", + ) + colormap.add_to(m) + colorscale_dict = df_geo.set_index("tile_id")["acumulado_chuva_1_h"].sort_values() + + popup = folium.GeoJsonPopup( + fields=[ + "horario_leitura_estacao", + "acumulado_chuva_15_min", + "acumulado_chuva_1_h", + "servicos", + "indicador_veiculo_parado_10_min", + "indicador_veiculo_parado_30_min", + "indicador_veiculo_parado_1_hora", + "indicador_veiculo_fora_rota_10_min", + "indicador_veiculo_fora_rota_30_min", + "indicador_veiculo_fora_rota_1_hora", + "tile_id", + ], + aliases=[ + "🌧️ Hora da leitura:", + "🌧️ Acumulado 15min:", + "🌧️ Acumulado 1h:", + "🚍 Serviços em operação: ", + "🛑 Veículos parados 10 min:", + "🛑 Veículos parados 30 min:", + "🛑 Veículos parados 1 hora:", + "⤴️ Veículos desviados 10 min:", + "⤴️ Veículos desviados 30 min:", + "⤴️ Veículos desviados 1 hora:", + "📍 ID do polígono: ", + ], + localize=True, + labels=True, + # style="background-color: yellow;", + ) + + folium.GeoJson( + df_geo, + style_function=lambda feature: { + "fillColor": colormap(colorscale_dict[feature["properties"]["tile_id"]]), + "color": "black", + "weight": 0.2, + "fillOpacity": 0.7, + }, + color="acumulado_chuva_15_min", + weight=2.5, + opacity=1, + popup=popup, + ).add_to(m) + + df_geo["total_veiculo_problema"] = ( + df_geo.indicador_veiculo_parado_10_min + + df_geo.indicador_veiculo_parado_30_min + + df_geo.indicador_veiculo_parado_1_hora + + df_geo.indicador_veiculo_fora_rota_10_min + + df_geo.indicador_veiculo_fora_rota_30_min + + df_geo.indicador_veiculo_fora_rota_1_hora + ) + + # Adiciona icones de qtd de veiculos parados/fora da rota + for i in range(0, len(df_geo)): + + pin_count = df_geo.iloc[i].total_veiculo_problema + + pin = False + + if pin_count > 10: + pin = True + pin_color = "#085d73" + + elif pin_count > 5: + pin = True + pin_color = "#1f97b5" + + elif pin_count > 0: + pin = True + pin_color = "#5cdafa" + + if pin == True: + folium.Marker( + location=[ + df_geo.iloc[i].geometry.centroid.y, + df_geo.iloc[i].geometry.centroid.x, + ], + icon=plugins.BeautifyIcon( + icon="arrow-down", + icon_shape="marker", + number=str(pin_count), + border_color=pin_color, + background_color=pin_color, + ), + ).add_to(m) + + # Adiciona rotas ao mapa + folium.GeoJson(shapes["geometry"], color="gray", weight=1.5, opacity=0.8).add_to(m) + + # Ajusta camadas + folium.TileLayer("cartodbpositron").add_to(m) + folium.LayerControl().add_to(m) + return m + @app.task def cache_mapa(): - datahora = datetime.now(tz=pytz.timezone('America/Sao_Paulo')).replace(second=0, microsecond=0, tzinfo=None) - redis = RedisSR.from_url(os.getenv('CACHE_OPERACAO_CHUVA')) - try: - data = redis.get('data') - mapa = create_map(data=data) - redis.set('last_map', mapa) - redis.set('last_map_timestamp', str(datahora)) - except: - pass + datahora = datetime.now(tz=pytz.timezone("America/Sao_Paulo")).replace( + second=0, microsecond=0, tzinfo=None + ) + redis = RedisSR.from_url(os.getenv("CACHE_OPERACAO_CHUVA")) + try: + data = redis.get("data") + mapa = create_map(data=data) + redis.set("last_map", mapa) + redis.set("last_map_timestamp", str(datahora)) + except: + pass def load_gps(datahora, data_versao_gtfs): @@ -205,7 +214,7 @@ def load_gps(datahora, data_versao_gtfs): END AS indicador_veiculo_parado FROM - `rj-smtr-dev.br_rj_riodejaneiro_veiculos.gps_sppo_15_minutos` + `rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo_15_minutos` WHERE DATA = "{datahora.date()}" AND timestamp_gps BETWEEN "{datahora - timedelta(hours=1)}" @@ -306,11 +315,12 @@ def load_gps(datahora, data_versao_gtfs): * FROM gps_acumulado """ - client = bigquery.Client(project='rj-smtr-dev') + client = bigquery.Client(project="rj-smtr") return client.query(gps).to_dataframe() + def load_tiles(datahora): - + geo_tiles = f""" -- 5. Puxa camada de hexagonos que cobrem a cidade with geometria AS ( @@ -381,18 +391,22 @@ def load_tiles(datahora): FROM geo_precipitacao_acumulada """ - client = bigquery.Client(project='rj-smtr-dev') + client = bigquery.Client(project="rj-smtr") return client.query(geo_tiles).to_dataframe() -def get_gps_data_last_update(): - query = """ + +def get_gps_data_last_update(datahora): + query = f""" SELECT MAX(timestamp_gps) FROM - `rj-smtr-dev.br_rj_riodejaneiro_veiculos.gps_sppo_15_minutos` + `rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo_15_minutos` + WHERE + data >= "{(datahora - timedelta(hours=24)).date()}" """ - client = bigquery.Client(project='rj-smtr-dev') - return client.query(query=query).to_dataframe().iloc[0,0] + client = bigquery.Client(project="rj-smtr") + + return client.query(query=query).to_dataframe().iloc[0, 0] def get_rain_data_last_update(datahora): @@ -405,174 +419,198 @@ def get_rain_data_last_update(datahora): data_particao = "{datahora.date()}" AND horario between "{(datahora - timedelta(hours=1)).time()}" and "{datahora.time()}" """ - client = bigquery.Client(project='rj-smtr-dev') - return client.query(query=query).to_dataframe().iloc[0,0] + client = bigquery.Client(project="rj-smtr") + return client.query(query=query).to_dataframe().iloc[0, 0] + @app.task def main(): try: - redis = RedisSR.from_url(os.getenv('CACHE_OPERACAO_CHUVA')) - # Carrega dados da operação - data_versao_gtfs = "2024-01-02" # TODO: atualizar para jan/24 - datahora_atual = datetime.now(tz=pytz.timezone('America/Sao_Paulo')).replace(second=0, microsecond=0, tzinfo=None) - minutos_arredondados = datahora_atual.minute - (datahora_atual.minute % 15) - datahora_arredondada = datahora_atual.replace( - minute=minutos_arredondados, second=0, microsecond=0 - ) - - if datahora_arredondada > datahora_atual - timedelta(minutes=6): - datahora = datahora_arredondada - timedelta(minutes=15) - else: - datahora = datahora_arredondada - - # datahora -= timedelta(hours=3) - - print(">>> Loading gps:", datetime.now()) - df_gps = load_gps(datahora=datahora, data_versao_gtfs=data_versao_gtfs) - gps_data_last_update = get_gps_data_last_update() - df_gps.posicao_veiculo = df_gps.posicao_veiculo.astype(str).apply(loads) - df_gps_geo = gpd.GeoDataFrame( - data=df_gps, - geometry=df_gps.posicao_veiculo, - crs=4326 - ) - if len(df_gps) != 0: - redis.set('last_df_gps', df_gps) - redis.set('last_df_gps_timestamp', gps_data_last_update.strftime("%d/%m/%Y %H:%M")) - print(f'Built gps geo!\nColumns:{df_gps_geo.columns}\nSize:{len(df_gps_geo)}') - print('Loading tiles') - df_tiles=load_tiles(datahora=datahora) - df_tiles.tile = df_tiles.tile.astype(str).apply(loads) - df_tiles.horario_leitura_estacao = df_tiles.horario_leitura_estacao.astype("timedelta64[ns]") - df_tiles_geo = gpd.GeoDataFrame( - data=df_tiles, - geometry=df_tiles.tile, - crs=4326 - ) - print(f'Built tiles geo!\nColumns:{df_tiles_geo.columns}\nSize:{len(df_tiles_geo)}') - df = df_gps_geo.sjoin(df_tiles_geo, how='left', predicate='intersects') - df.tile = df.tile.astype(str) - df.posicao_veiculo = df.posicao_veiculo.astype(str) - print(f'Joined gps and tiles, got data:\n{df.head(10)}\n df size is {len(df)}') - print(f"df columns are:\n{df.columns}") - - # Calcula os indicadores de cada tile - df_tile_indicators = df.loc[df.groupby(["tile_id", "servico", "id_veiculo"]).timestamp_gps.idxmax()].reset_index() - df_tile_indicators = df_tile_indicators.loc[df_tile_indicators.groupby(["tile_id", "servico", "id_veiculo"]).horario_leitura_estacao.idxmax()].reset_index() - df_tile_indicators = df_tile_indicators.groupby(["tile_id", "tile", "servico"]).agg( - { - "horario_leitura_estacao": "max", - "acumulado_chuva_15_min": "max", - "acumulado_chuva_1_h": "max", - "acumulado_chuva_4_h": "max", - "indicador_veiculo_parado_10_min": "sum", - "indicador_veiculo_fora_rota_10_min": "sum", - "indicador_veiculo_parado_30_min": "sum", - "indicador_veiculo_fora_rota_30_min": "sum", - "indicador_veiculo_parado_1_hora": "sum", - "indicador_veiculo_fora_rota_1_hora": "sum", - } - ).reset_index() - - df_tile_indicators['total_veiculo_problema'] = ( - df_tile_indicators.indicador_veiculo_parado_10_min + - df_tile_indicators.indicador_veiculo_parado_30_min + - df_tile_indicators.indicador_veiculo_parado_1_hora + - df_tile_indicators.indicador_veiculo_fora_rota_10_min + - df_tile_indicators.indicador_veiculo_fora_rota_30_min + - df_tile_indicators.indicador_veiculo_fora_rota_1_hora) - - servicos_dict = {} - - for i, row in df_tile_indicators.iterrows(): - if row['tile_id'] not in servicos_dict: - servicos_dict[row['tile_id']] = [] - servicos_dict[row['tile_id']].append((row['servico'], row['total_veiculo_problema'])) - - for tile_id in servicos_dict: + redis = RedisSR.from_url(os.getenv("CACHE_OPERACAO_CHUVA")) + # Carrega dados da operação + data_versao_gtfs = "2024-01-02" # TODO: atualizar para jan/24 + datahora_atual = datetime.now(tz=pytz.timezone("America/Sao_Paulo")).replace( + second=0, microsecond=0, tzinfo=None + ) + minutos_arredondados = datahora_atual.minute - (datahora_atual.minute % 15) + datahora_arredondada = datahora_atual.replace( + minute=minutos_arredondados, second=0, microsecond=0 + ) + + if datahora_arredondada > datahora_atual - timedelta(minutes=6): + datahora = datahora_arredondada - timedelta(minutes=15) + else: + datahora = datahora_arredondada + + # datahora -= timedelta(hours=3) + + print(">>> Loading gps:", datetime.now()) + df_gps = load_gps(datahora=datahora, data_versao_gtfs=data_versao_gtfs) + gps_data_last_update = get_gps_data_last_update(datahora) + df_gps.posicao_veiculo = df_gps.posicao_veiculo.astype(str).apply(loads) + df_gps_geo = gpd.GeoDataFrame( + data=df_gps, geometry=df_gps.posicao_veiculo, crs=4326 + ) + if len(df_gps) != 0: + redis.set("last_df_gps", df_gps) + redis.set( + "last_df_gps_timestamp", gps_data_last_update.strftime("%d/%m/%Y %H:%M") + ) + print(f"Built gps geo!\nColumns:{df_gps_geo.columns}\nSize:{len(df_gps_geo)}") + print("Loading tiles") + df_tiles = load_tiles(datahora=datahora) + df_tiles.tile = df_tiles.tile.astype(str).apply(loads) + df_tiles.horario_leitura_estacao = df_tiles.horario_leitura_estacao.astype( + "timedelta64[ns]" + ) + df_tiles_geo = gpd.GeoDataFrame(data=df_tiles, geometry=df_tiles.tile, crs=4326) + print( + f"Built tiles geo!\nColumns:{df_tiles_geo.columns}\nSize:{len(df_tiles_geo)}" + ) + df = df_gps_geo.sjoin(df_tiles_geo, how="left", predicate="intersects") + df.tile = df.tile.astype(str) + df.posicao_veiculo = df.posicao_veiculo.astype(str) + print(f"Joined gps and tiles, got data:\n{df.head(10)}\n df size is {len(df)}") + print(f"df columns are:\n{df.columns}") + + # Calcula os indicadores de cada tile + df_tile_indicators = df.loc[ + df.groupby(["tile_id", "servico", "id_veiculo"]).timestamp_gps.idxmax() + ].reset_index() + df_tile_indicators = df_tile_indicators.loc[ + df_tile_indicators.groupby( + ["tile_id", "servico", "id_veiculo"] + ).horario_leitura_estacao.idxmax() + ].reset_index() + df_tile_indicators = ( + df_tile_indicators.groupby(["tile_id", "tile", "servico"]) + .agg( + { + "horario_leitura_estacao": "max", + "acumulado_chuva_15_min": "max", + "acumulado_chuva_1_h": "max", + "acumulado_chuva_4_h": "max", + "indicador_veiculo_parado_10_min": "sum", + "indicador_veiculo_fora_rota_10_min": "sum", + "indicador_veiculo_parado_30_min": "sum", + "indicador_veiculo_fora_rota_30_min": "sum", + "indicador_veiculo_parado_1_hora": "sum", + "indicador_veiculo_fora_rota_1_hora": "sum", + } + ) + .reset_index() + ) + + df_tile_indicators["total_veiculo_problema"] = ( + df_tile_indicators.indicador_veiculo_parado_10_min + + df_tile_indicators.indicador_veiculo_parado_30_min + + df_tile_indicators.indicador_veiculo_parado_1_hora + + df_tile_indicators.indicador_veiculo_fora_rota_10_min + + df_tile_indicators.indicador_veiculo_fora_rota_30_min + + df_tile_indicators.indicador_veiculo_fora_rota_1_hora + ) + + servicos_dict = {} + + for i, row in df_tile_indicators.iterrows(): + if row["tile_id"] not in servicos_dict: + servicos_dict[row["tile_id"]] = [] + servicos_dict[row["tile_id"]].append( + (row["servico"], row["total_veiculo_problema"]) + ) + + for tile_id in servicos_dict: lista = servicos_dict[tile_id] - lista.sort(key=lambda x : x[1], reverse=True) - s = '' + lista.sort(key=lambda x: x[1], reverse=True) + s = "" for servico, total in lista: - s += f'{servico}: {total}, ' + s += f"{servico}: {total}, " servicos_dict[tile_id] = s - - servicos_df = pd.DataFrame(servicos_dict.items(), columns=['tile_id', 'servicos']) - df_tile_indicators = df_tile_indicators.merge(servicos_df, how='inner', on='tile_id') - # display(servicos_df) - df_tile_indicators = df_tile_indicators.groupby(["tile_id", "tile"]).agg( - { - "servicos": "max", - "horario_leitura_estacao": "max", - "acumulado_chuva_15_min": "max", - "acumulado_chuva_1_h": "max", - "acumulado_chuva_4_h": "max", - "indicador_veiculo_parado_10_min": "sum", - "indicador_veiculo_fora_rota_10_min": "sum", - "indicador_veiculo_parado_30_min": "sum", - "indicador_veiculo_fora_rota_30_min": "sum", - "indicador_veiculo_parado_1_hora": "sum", - "indicador_veiculo_fora_rota_1_hora": "sum", - } - ).reset_index() - - - - # df_tile_indicators = ( - # df - # .loc[df.groupby(["tile_id", "servico", "id_veiculo"]).timestamp_gps.idxmax()] - # .groupby(["tile_id", "tile", "horario_leitura_estacao"]).agg( - # { - # "acumulado_chuva_15_min": "max", - # "acumulado_chuva_1_h": "max", - # "acumulado_chuva_4_h": "max", - # "id_veiculo": "count", - # "servico": lambda x: ", ".join(list(set(x))), - # "indicador_veiculo_parado_10_min": "sum", - # "indicador_veiculo_fora_rota_10_min": "sum", - # "indicador_veiculo_parado_30_min": "sum", - # "indicador_veiculo_fora_rota_30_min": "sum", - # "indicador_veiculo_parado_1_hora": "sum", - # "indicador_veiculo_fora_rota_1_hora": "sum", - # } - # ).reset_index() - # ) - - # Filtra a última medida da estacao - # df_tile_indicators = df_tile_indicators.loc[df_tile_indicators.groupby(["tile_id"]).horario_leitura_estacao.idxmax()] - # df_tile_indicators["horario_leitura_estacao"] = df_tile_indicators.horario_leitura_estacao.astype(str) - df_tile_indicators["horario_leitura_estacao"] = df_tile_indicators.horario_leitura_estacao.dt.total_seconds().apply(lambda s: f'{s // 3600:02.0f}:{(s % 3600) // 60:02.0f}') - df_tile_indicators['geometry'] = df_tile_indicators['tile'].dropna().astype(str).apply(loads) - - df_geo = gpd.GeoDataFrame( - data=df_tile_indicators, - geometry=df_tile_indicators.geometry, - crs=4326 - ).drop(columns=["tile"]) - - - - if len(df_geo) == 0: - redis.set('last_empty_data', str(datahora)) - - else: - redis.set('data', df_geo) - redis.set('last_update', gps_data_last_update.strftime("%d/%m/%Y %H:%M")) - redis.set('last_rain_update', datahora.strftime("%d/%m/%Y") + ' ' + str(get_rain_data_last_update(datahora))) - - - - - - except Exception: - + servicos_df = pd.DataFrame( + servicos_dict.items(), columns=["tile_id", "servicos"] + ) + df_tile_indicators = df_tile_indicators.merge( + servicos_df, how="inner", on="tile_id" + ) + # display(servicos_df) + df_tile_indicators = ( + df_tile_indicators.groupby(["tile_id", "tile"]) + .agg( + { + "servicos": "max", + "horario_leitura_estacao": "max", + "acumulado_chuva_15_min": "max", + "acumulado_chuva_1_h": "max", + "acumulado_chuva_4_h": "max", + "indicador_veiculo_parado_10_min": "sum", + "indicador_veiculo_fora_rota_10_min": "sum", + "indicador_veiculo_parado_30_min": "sum", + "indicador_veiculo_fora_rota_30_min": "sum", + "indicador_veiculo_parado_1_hora": "sum", + "indicador_veiculo_fora_rota_1_hora": "sum", + } + ) + .reset_index() + ) + + # df_tile_indicators = ( + # df + # .loc[df.groupby(["tile_id", "servico", "id_veiculo"]).timestamp_gps.idxmax()] + # .groupby(["tile_id", "tile", "horario_leitura_estacao"]).agg( + # { + # "acumulado_chuva_15_min": "max", + # "acumulado_chuva_1_h": "max", + # "acumulado_chuva_4_h": "max", + # "id_veiculo": "count", + # "servico": lambda x: ", ".join(list(set(x))), + # "indicador_veiculo_parado_10_min": "sum", + # "indicador_veiculo_fora_rota_10_min": "sum", + # "indicador_veiculo_parado_30_min": "sum", + # "indicador_veiculo_fora_rota_30_min": "sum", + # "indicador_veiculo_parado_1_hora": "sum", + # "indicador_veiculo_fora_rota_1_hora": "sum", + # } + # ).reset_index() + # ) + + # Filtra a última medida da estacao + # df_tile_indicators = df_tile_indicators.loc[df_tile_indicators.groupby(["tile_id"]).horario_leitura_estacao.idxmax()] + # df_tile_indicators["horario_leitura_estacao"] = df_tile_indicators.horario_leitura_estacao.astype(str) + df_tile_indicators[ + "horario_leitura_estacao" + ] = df_tile_indicators.horario_leitura_estacao.dt.total_seconds().apply( + lambda s: f"{s // 3600:02.0f}:{(s % 3600) // 60:02.0f}" + ) + df_tile_indicators["geometry"] = ( + df_tile_indicators["tile"].dropna().astype(str).apply(loads) + ) + + df_geo = gpd.GeoDataFrame( + data=df_tile_indicators, geometry=df_tile_indicators.geometry, crs=4326 + ).drop(columns=["tile"]) + + if len(df_geo) == 0: + redis.set("last_empty_data", str(datahora)) + + else: + redis.set("data", df_geo) + redis.set("last_update", gps_data_last_update.strftime("%d/%m/%Y %H:%M")) + redis.set( + "last_rain_update", + datahora.strftime("%d/%m/%Y") + + " " + + str(get_rain_data_last_update(datahora)), + ) + + except Exception as e: now = str(datahora) stack_trace = traceback.format_exc() last_crash = {now: stack_trace} - redis.set('last_crash', last_crash) + redis.set("last_crash", last_crash) + -if __name__ == '__main__': - main() - cache_mapa() +if __name__ == "__main__": + main() + cache_mapa()