-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathInfluxDB.py
71 lines (56 loc) · 2.14 KB
/
InfluxDB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from influxdb import InfluxDBClient, DataFrameClient, exceptions
class InfluxDB(object):
"""Wrapper class for InfluxDB
Args:
object ([type]): [description]
"""
def __init__(self, host='localhost', port=8086) -> None:
"""init constructor
Args:
host (str, optional): [description]. Defaults to 'localhost'.
port (int, optional): [description]. Defaults to 8086.
Raises:
TypeError: [description]
Exception: [description]
Exception: [description]
"""
self.user = 'root'
self.password = 'root'
self.dbname = 'datadb'
if not (isinstance(host, str) or isinstance(port, str) or port.isdigit()
or isinstance(self.dbname, str) or isinstance(self.user, str)
or isinstance(self.password, str)):
raise TypeError("Invalid Argument.")
try:
self.clientNormal = InfluxDBClient(host, port, self.user, self.password, self.dbname)
self.clientDF = DataFrameClient(host, port, self.user, self.password, self.dbname)
self.clientNormal.create_database(self.dbname)
self.clientNormal.create_retention_policy('awesome_policy', '1d', 3, default=True)
except exceptions.InfluxDBClientError:
raise Exception("Can't connect to InluxDB. Request Error")
except exceptions.InfluxDBServerError:
raise Exception("Server Error")
def write_db(self, data):
"""[summary]
Args:
data ([type]): [description]
"""
self.clientNormal.write_points(data, time_precision='ms', protocol='json')
def readDF_db(self, querry):
"""Returns dataframe
Args:
querry ([type]): [description]
Returns:
[type]: [description]
"""
data = self.clientDF.query(querry)
return data
def read_db(self, querry):
"""Returns dictionary
Args:
querry ([type]): [description]
Returns:
[type]: [description]
"""
data = self.clientNormal.query(querry)
return data