-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparquet_loader.py
43 lines (32 loc) · 1.17 KB
/
parquet_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import pandas as pd
from etl.config.log_file import log_file_name
from etl.common.utils.logs import CustomLogger
from etl.common.utils.common import (
default_timestamp_str,
default_output_folder,
default_utc_datetime,
)
logger = CustomLogger(log_file_name(file=__file__))
class ParquetLoader:
@staticmethod
def run(dic):
extracted_files = []
param = dic["code"] + "-" + dic["codein"]
ts = default_timestamp_str()
df = pd.DataFrame([dic])
if df.empty:
logger.error("DataFrame is empty")
raise ValueError("DataFrame is empty")
# Add new columns to the DataFrame
df["symbol"] = param
# Add two columns with the current date and time
df["extracted_at"] = default_utc_datetime()
df["id"] = f"{param}-{ts}"
# Write the DataFrame to a Parquet file
try:
df.to_parquet(f"{default_output_folder()}{param}-{ts}.parquet")
except ValueError as e:
logger.error(f"Error writing parquet file: {e}")
# Append list with the file path
extracted_files.append(f"{param}-{ts}.parquet")
return extracted_files