Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROD] Added env. var + progress bar #14

Merged
merged 5 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ RUN python -m pip install --upgrade pip
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

# Gerar o arquivo .env
RUN echo "SERVER_URL=https://economia.awesomeapi.com.br" > .env

COPY . .

CMD [ "python", "./etl/main.py" ]
12 changes: 9 additions & 3 deletions etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import requests
from etl.utils.constants import ENDPOINT_LIST_AVALIABLE_PARITYS
from etl.utils.logs import loggingWarn
from etl.jobs.ExtractApiData.ApiToParquetFile import extraction
from dotenv import load_dotenv
import os

load_dotenv()

SRV_URL = str(os.getenv("SERVER_URL"))
""" Reference for Server URL from enviroment variable """

mdName = "extract_prepare"

Expand Down Expand Up @@ -52,7 +58,7 @@ def ValidParamsForCall(self) -> list:

"""
valParams = []
AvaliableList = requests.get(ENDPOINT_LIST_AVALIABLE_PARITYS).json()
AvaliableList = requests.get(SRV_URL + '/json/available').json()

for param in self.params:
if param in AvaliableList:
Expand All @@ -64,7 +70,7 @@ def ValidParamsForCall(self) -> list:
return valParams
else:
raise KeyError(
f"The informed params: {self.params} are not available for extract, see available list in: {ENDPOINT_LIST_AVALIABLE_PARITYS}"
f"The informed params: {self.params} are not avaliable for extract, see available list in: {SRV_URL + '/json/available'}"
)

def pipelineExecute(self, ValidParameters: list):
Expand Down
54 changes: 23 additions & 31 deletions etl/jobs/ExtractApiData/ApiToParquetFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
,DefaultOutputFolder
,DefaultTimestampStr
,DefaultUTCDatetime
,ENDPOINT_QUOTES_AWESOME_API, WORK_DIR
,SRV_URL
,WORK_DIR
)

import concurrent.futures
import threading
import time
from tqdm import tqdm
import os

counter = 0

counter = 0
retry_time = 2
class extraction:
def __init__(self, ValidParams: list) -> None:
"""
Expand All @@ -37,42 +40,35 @@ def PipelineRun(self, params: list) -> list:
list: A list of extracted file paths.
"""
## extract Data
maked_endpoint = ENDPOINT_QUOTES_AWESOME_API + ','.join(params)
loggingInfo(f"Sending request: {maked_endpoint}", WORK_DIR)
maked_endpoint = SRV_URL + '/last/' + ','.join(params)
loggingInfo(f"Sending request to: {SRV_URL + '/last/'} :: 1 of 3", WORK_DIR)
response = requests.get(maked_endpoint)

for tryNumber in range(3):
try:
if response.ok:
loggingInfo(f"Request finished", WORK_DIR)
json_data = response.json()
break
else:
raise ConnectionError(f"endpoint connection: {ENDPOINT_QUOTES_AWESOME_API}.status_code: {response.status_code}")
except ConnectionError as e:
if response.ok:
loggingInfo(f"Request finished with status {response.status_code}", WORK_DIR)
json_data = response.json()
break
else:
if tryNumber <2:
loggingWarn(f"{e}, retrying again in 5 seconds...", WORK_DIR)
time.sleep(5)
loggingWarn(f"response error, status_code {response.status_code}. Retrying in {retry_time} seconds...", WORK_DIR)
for _ in tqdm(range(100), total=100, desc=f"loading"):
time.sleep(retry_time / 100)
loggingInfo(f"Sending request to: {SRV_URL + '/last/'} :: {tryNumber + 2} of 3", WORK_DIR)
else:
raise e
loggingWarn("Attempt limits exceeded", WORK_DIR)
raise ConnectionError(f"Could not connect to the server after 3 attempts. Please try again later. Response status code: {response.status_code}")

output_path = DefaultOutputFolder()
insert_timestamp = DefaultTimestampStr()
extracted_files = []
totalParams = len(params)

def process_param(args):
global counter

index, param = args
dic = json_data[param.replace("-", "")]

with threading.Lock():
thread_num = counter
counter += 1

loggingInfo(f"{index + 1} of {totalParams} - {param} - Transforming using thread: {thread_num}", WORK_DIR)

# Convert 'dic' to a Pandas DataFrame
df = pd.DataFrame([dic])

Expand All @@ -82,21 +78,17 @@ def process_param(args):
# Add two columns with the current date and time
df["extracted_at"] = DefaultUTCDatetime()

loggingInfo(f"{index + 1} of {totalParams} - {param} - Loading using thread: {thread_num}", WORK_DIR)

# Write the DataFrame to a Parquet file
df.to_parquet(f"{output_path}{param}-{insert_timestamp}.parquet")

# Append list with the file path
extracted_files.append(f"{output_path}{param}-{insert_timestamp}.parquet")

loggingInfo(f"{index + 1} of {totalParams} - {param} - saved file using thread: {thread_num}", WORK_DIR)

## Parallel Processing data
with concurrent.futures.ThreadPoolExecutor(4) as executor:
list(executor.map(process_param, enumerate(params)))
with concurrent.futures.ThreadPoolExecutor(os.cpu_count()) as executor:
list(tqdm(executor.map(process_param, enumerate(params)), total=totalParams, desc="Processing files"))

loggingInfo(f"All files extracted in: {output_path}", WORK_DIR)
loggingInfo(f"{totalParams} files extracted in: {output_path}", WORK_DIR)

return extracted_files

Expand Down
8 changes: 7 additions & 1 deletion etl/jobs/ExtractApiData/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
# Custom Logs
from etl.utils.logs import loggingInfo, loggingError, loggingWarn
from etl.utils.common import DefaultTimestampStr, DefaultOutputFolder, DefaultUTCDatetime
from etl.utils.constants import ENDPOINT_QUOTES_AWESOME_API
import pandas
import os
from dotenv import load_dotenv

load_dotenv()

SRV_URL = str(os.getenv("SERVER_URL"))
""" Reference for Server URL from enviroment variable """


current_dir = os.path.dirname(os.path.relpath(__file__))
WORK_DIR = current_dir.split("/")[-1:][0]
16 changes: 5 additions & 11 deletions etl/logs/ExtractApiData.log
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
2024-04-26 09:35:00 :: INFO :: Sending request: https://economia.awesomeapi.com.br/last/USD-BIF,USD-BND
2024-04-26 09:35:00 :: DEBUG :: Starting new HTTPS connection (1): economia.awesomeapi.com.br:443
2024-04-26 09:35:02 :: DEBUG :: https://economia.awesomeapi.com.br:443 "GET /last/USD-BIF,USD-BND HTTP/1.1" 200 None
2024-04-26 09:35:02 :: INFO :: Request finished
2024-04-26 09:35:02 :: INFO :: 1 of 2 - USD-BIF - Transforming using thread: 0
2024-04-26 09:35:02 :: INFO :: 2 of 2 - USD-BND - Transforming using thread: 1
2024-04-26 09:35:02 :: INFO :: 1 of 2 - USD-BIF - Loading using thread: 0
2024-04-26 09:35:02 :: INFO :: 2 of 2 - USD-BND - Loading using thread: 1
2024-04-26 09:35:02 :: INFO :: 1 of 2 - USD-BIF - saved file using thread: 0
2024-04-26 09:35:02 :: INFO :: 2 of 2 - USD-BND - saved file using thread: 1
2024-04-26 09:35:02 :: INFO :: All files extracted in: /Users/ivbarauna/repos/ETL-awesome-api/etl/utils/../../data/
2024-05-09 12:37:34 :: INFO :: Sending request to: https://economia.awesomeapi.com.br/last/ :: 1 of 3
2024-05-09 12:37:34 :: DEBUG :: Starting new HTTPS connection (1): economia.awesomeapi.com.br:443
2024-05-09 12:37:37 :: DEBUG :: https://economia.awesomeapi.com.br:443 "GET /last/USD-JOD,USD-KWD,USD-HKD,USD-SAR,USD-INR,USD-KRW,FJD-USD,GHS-USD,KYD-USD,SGD-USD,USD-ALL,USD-AMD,USD-ANG,USD-ARS,USD-AUD,USD-BBD,USD-BDT,USD-BGN,USD-BHD,USD-BIF,USD-BND,USD-BOB,USD-BSD,USD-BWP,USD-BZD,USD-CLP,USD-CNY,USD-COP,USD-CRC,USD-CUP,USD-DJF,USD-DOP,USD-DZD,USD-EGP,USD-ETB,USD-FJD,USD-GBP,USD-GEL,USD-GHS,USD-GMD,USD-GNF,USD-GTQ,USD-HNL,USD-HRK,USD-HTG,USD-IDR,USD-IQD,USD-IRR,USD-ISK,USD-JMD,USD-KES,USD-KHR,USD-KMF,USD-KZT,USD-LAK,USD-LBP,USD-LKR,USD-LSL,USD-LYD,USD-MAD,USD-MDL,USD-MGA,USD-MKD,USD-MMK,USD-MOP,USD-MRO,USD-MUR,USD-MVR,USD-MWK,USD-MYR,USD-NAD,USD-NGN,USD-NIO,USD-NPR,USD-NZD,USD-OMR,USD-PAB,USD-PEN,USD-PGK,USD-PHP,USD-PKR,USD-PYG,USD-QAR,USD-RON,USD-RSD,USD-RWF,USD-SCR,USD-SDG,USD-SOS,USD-STD,USD-SVC,USD-SYP,USD-SZL,USD-TND,USD-TTD,USD-TWD,USD-TZS,USD-UAH,USD-UGX,USD-UYU,USD-UZS,USD-VEF,USD-VND,USD-VUV,USD-XAF,USD-XCD,USD-XOF,USD-XPF,USD-YER,USD-ZMK,AED-USD,DKK-USD,HKD-USD,MXN-USD,NOK-USD,PLN-USD,RUB-USD,SAR-USD,SEK-USD,TRY-USD,TWD-USD,VEF-USD,ZAR-USD,UYU-USD,PYG-USD,CLP-USD,COP-USD,PEN-USD,NIO-USD,BOB-USD,KRW-USD,EGP-USD,USD-BYN,USD-MZN,INR-USD,JOD-USD,KWD-USD,USD-AZN,USD-CNH,USD-KGS,USD-TJS,USD-RUB,MYR-USD,UAH-USD,HUF-USD,IDR-USD,USD-AOA,VND-USD,BYN-USD,XBR-USD,THB-USD,PHP-USD,USD-TMT,XAGG-USD,USD-MNT,USD-AFN,AFN-USD,SYP-USD,IRR-USD,IQD-USD,USD-NGNI,USD-ZWL,BRL-ARS,BRL-AUD,BRL-CAD,BRL-CHF,BRL-CLP,BRL-DKK,BRL-HKD,BRL-JPY,BRL-MXN,BRL-SGD,SGD-BRL,AED-BRL,BRL-AED,BRL-BBD,BRL-BHD,BRL-CNY,BRL-CZK,BRL-EGP,BRL-GBP,BRL-HUF,BRL-IDR,BRL-ILS,BRL-INR,BRL-ISK,BRL-JMD,BRL-JOD,BRL-KES,BRL-KRW,BRL-LBP,BRL-LKR,BRL-MAD,BRL-MYR,BRL-NAD,BRL-NOK,BRL-NPR,BRL-NZD,BRL-OMR,BRL-PAB,BRL-PHP,BRL-PKR,BRL-PLN,BRL-QAR,BRL-RON,BRL-RUB,BRL-SAR,BRL-SEK,BRL-THB,BRL-TRY,BRL-VEF,BRL-XAF,BRL-XCD,BRL-XOF,BRL-ZAR,BRL-TWD,DKK-BRL,HKD-BRL,MXN-BRL,NOK-BRL,NZD-BRL,PLN-BRL,SAR-BRL,SEK-BRL,THB-BRL,TRY-BRL,TWD-BRL,VEF-BRL,ZAR-BRL,BRL-PYG,BRL-UYU,BRL-COP,BRL-PEN,BRL-BOB,CLP-BRL,PYG-BRL,UYU-BRL,COP-BRL,PEN-BRL,BOB-BRL,RUB-BRL,INR-BRL,EUR-GBP,EUR-JPY,EUR-CHF,EUR-AUD,EUR-CAD,EUR-NOK,EUR-DKK,EUR-PLN,EUR-NZD,EUR-SEK,EUR-ILS,EUR-TRY,EUR-THB,EUR-ZAR,EUR-MXN,EUR-SGD,EUR-HUF,EUR-HKD,EUR-CZK,EUR-KRW,BHD-EUR,EUR-AED,EUR-AFN,EUR-ALL,EUR-ANG,EUR-ARS,EUR-BAM,EUR-BBD,EUR-BDT,EUR-BGN,EUR-BHD,EUR-BIF,EUR-BND,EUR-BOB,EUR-BSD,EUR-BWP,EUR-BYN,EUR-BZD,EUR-CLP,EUR-CNY,EUR-COP,EUR-CRC,EUR-CUP,EUR-CVE,EUR-DJF,EUR-DOP,EUR-DZD,EUR-EGP,EUR-ETB,EUR-FJD,EUR-GHS,EUR-GMD,EUR-GNF,EUR-GTQ,EUR-HNL,EUR-HRK,EUR-HTG,EUR-IDR HTTP/1.1" 200 None
2024-05-09 12:37:37 :: INFO :: Request finished with status 200
2024-05-09 12:37:37 :: INFO :: 300 files extracted in: /Users/ivsouza/repos/personal_repos/ETL-awesome-api/etl/utils/../../data/
15 changes: 7 additions & 8 deletions etl/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import time
import sys
import os
import requests
import random
from dotenv import load_dotenv

load_dotenv()

SRV_URL = str(os.getenv("SERVER_URL"))

WORK_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(WORK_DIR))

start_time = time.time()

from etl import ExecutePipeline
from etl.utils.constants import ENDPOINT_LIST_AVALIABLE_PARITYS

def GenerateRandomParams(ParamsQty: int) -> list:
"""
Expand All @@ -22,13 +23,11 @@ def GenerateRandomParams(ParamsQty: int) -> list:
Returns:
list: A list of randomly generated parameters.
"""
AvaliableList = list(requests.get(ENDPOINT_LIST_AVALIABLE_PARITYS).json())
AvaliableList = list(requests.get(SRV_URL + '/json/available').json())
min = random.randint(0, len(AvaliableList) - ParamsQty)
max = min + ParamsQty
return AvaliableList[min:max]

if __name__ == "__main__":
NewExec = ExecutePipeline(*GenerateRandomParams(2))
NewExec = ExecutePipeline(*GenerateRandomParams(300))

end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")
Loading