-
Notifications
You must be signed in to change notification settings - Fork 0
/
db_utils.py
118 lines (94 loc) · 3.73 KB
/
db_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import logging
import sys
import traceback
from mysql.connector import errorcode
import mysql.connector
class DatabaseInconsistentError(Exception):
pass
class InvalidFilenameError(Exception):
pass
class DataInvariantException(Exception):
pass
class DbUtils:
def __init__(self, database_user, database_password, database_port, database_host, database_name):
self.logger = logging.getLogger(f'Client.DbUtils') # must hardcode to see base class name
self.database_user = database_user
self.database_password = database_password
self.database_port = database_port
self.database_host = database_host
self.database_name = database_name
self.cnx = None
def reset_connection(self):
self.logger.info(f"Resetting connection to {self.database_host}")
if self.cnx:
try:
self.cnx.close()
except Exception:
pass
self.cnx = None
self.connect()
def connect(self):
if self.cnx is None:
self.logger.debug(f"Connecting to db {self.database_host}...")
try:
self.cnx = mysql.connector.connect(user=self.database_user,
password=self.database_password,
host=self.database_host,
port=self.database_port,
database=self.database_name)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
self.logger.error(f"Starting client...")
self.logger.error("SQL: Access denied")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
self.logger.error("Database does not exist")
else:
self.logger.error(err)
sys.exit(1)
except Exception as err:
self.logger.error(f"Unknown exception: {err}")
sys.exit(1)
self.logger.info("Db connected")
else:
try:
self.cnx.ping(reconnect=True)
except Exception as e:
self.logger.warning(f"connection not responsive with Exception as {e}, "
f"attempting to reset connection")
self.reset_connection()
# self.logger.debug(f"Already connected db {self.database_host}...")
# added buffered = true so will work properly with forloops
def get_one_record(self, sql):
cursor = self.get_cursor(buffered=True)
try:
cursor.execute(sql)
retval = cursor.fetchone()
except Exception as e:
print(f"Exception thrown while processing sql: {sql}\n{e}\n", file=sys.stderr, flush=True)
self.logger.error(traceback.format_exc())
retval = None
if retval is None:
self.logger.info(f"Info: No results from: \n\n{sql}\n")
else:
retval = retval[0]
cursor.close()
return retval
def get_records(self, query):
cursor = self.get_cursor()
cursor.execute(query)
record_list = list(cursor.fetchall())
self.logger.debug(f"get records SQL: {query}")
cursor.close()
return record_list
def get_cursor(self, buffered=False):
self.connect()
cursor = self.cnx.cursor(buffered=buffered)
return cursor
def execute(self, sql):
cursor = self.get_cursor()
self.logger.debug(f"SQL: {sql}")
cursor.execute(sql)
self.cnx.commit()
cursor.close()
def commit(self):
self.cnx.commit()