-
Notifications
You must be signed in to change notification settings - Fork 0
/
to_sql.py
89 lines (71 loc) · 2.72 KB
/
to_sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import argparse
import pandas as pd
import sqlite3
import yaml
from logging import getLogger,config
from pathlib import Path
def main(args):
input_dirname:str=args.input_dirname
output_dirname:str=args.output_dirname
email_freqs_db:bool=args.email_freqs_db
poh_freqs_db:bool=args.poh_freqs_db
personae_db:bool=args.personae_db
start_index:int=args.start_index
end_index:int=args.end_index
#Set up logger
with open("./logging_config.yaml","r",encoding="utf-8") as r:
logging_config=yaml.safe_load(r)
config.dictConfig(logging_config)
logger=getLogger(__name__)
logger.debug(args)
#Get all parquet files in the input directory
input_dir=Path(input_dirname)
input_files=list(input_dir.glob("*.parquet"))
input_files.sort()
logger.info(f"{len(input_files)} files exist in the input directory")
#Create output directory
output_dir=Path(output_dirname)
output_dir.mkdir(exist_ok=True,parents=True)
#Create a subset of the list if either the start or the end index is specified
start_index=start_index if start_index is not None else 0
end_index=end_index if end_index is not None else len(input_files)
input_files=input_files[start_index:end_index]
#Determine value column name and table name
val_column_name=""
table_name=""
if email_freqs_db:
val_column_name="email"
table_name="freqs"
elif poh_freqs_db:
val_column_name="poh"
table_name="freqs"
elif personae_db:
table_name="personae"
else:
logger.error("Must specify the type of DB to be created")
return
#Convert to sql
logger.info("Start converting to sql...")
for input_file in input_files:
logger.info(f"Processing '{input_file.name}'")
#Load input file
df=pd.read_parquet(input_file)
#Rename value column to "word"
if val_column_name!="":
df.rename(columns={val_column_name: "word"},inplace=True)
#Create connection and output to sql
db_file=output_dir.joinpath(f"{input_file.stem}.db")
with sqlite3.connect(db_file) as conn:
df.to_sql(table_name,conn,if_exists="replace")
logger.info("Finished converting to sql")
if __name__=="__main__":
parser=argparse.ArgumentParser()
parser.add_argument("-i","--input-dirname",type=str)
parser.add_argument("-o","--output-dirname",type=str)
parser.add_argument("--email-freqs-db",action="store_true")
parser.add_argument("--poh-freqs-db",action="store_true")
parser.add_argument("--personae-db",action="store_true")
parser.add_argument("--start-index",type=int)
parser.add_argument("--end-index",type=int)
args=parser.parse_args()
main(args)