Skip to content

Commit

Permalink
Bump v1.0 -> v1.1
Browse files Browse the repository at this point in the history
- Remove synchronous method to upload data to cloud
- Add Threading feature by using `concurrent.futures` for upload
   and query function calls
- Change `conf.json` to adapt new configuration
  • Loading branch information
Shantanoo Desai committed Oct 31, 2019
1 parent 6457f63 commit ce97e56
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 153 deletions.
34 changes: 17 additions & 17 deletions conf.json
Original file line number Diff line number Diff line change
@@ -1,55 +1,55 @@
{
"batch_size": 1000,
"batch_size": 165,
"local": {
"host": "localhost",
"port": 8086,
"database": "RIT"
"database": "edgeDB"
},
"cloud": {
"host": "db5.ips.biba.uni-bremen.de",
"host": "test.server.influxdb.com",
"port": 8086,
"secure": true,
"username": "des",
"password": "lincoln",
"database": "ritTest"
"username": "username",
"password": "password",
"database": "finalDB"
},
"gps": {
"fields": ["lat", "lon", "sog", "cog"],
"measurement": "gps",
"tags": {
"type": "RMC"
},
"limit": 100
"limit": 5
},
"ciss": {
"fields": ["accX", "accY", "accZ", "gyroX", "gyroY", "gyroZ", "magX", "magY", "magZ"],
"measurement": "ciss",
"tags": {
"node": "CISS1"
},
"limit": 300
"limit": 50
},
"imu": {
"fields": ["yaw", "pitch", "roll", "laX", "laY", "laZ", "gX", "gY", "gZ"],
"measurement": "imu",
"tags": {
"node":"BNO055"
},
"limit": 300
"limit": 50
},
"button": {
"fields": ["pressed"],
"measurement": "button",
"limit": 10
"limit": 5
},
"n2k_1": {
"fields": ["v"],
"measurement": "\"env-temp\"",
"limit": 50
"EnvironmentalParameters": {
"fields": ["Atmospheric Pressure", "Temperature"],
"measurement": "EnvironmentalParameters",
"limit": 5
},
"n2k_2": {
"fields": ["v"],
"measurement": "\"env-humid\"",
"Rudder": {
"fields": ["Position"],
"measurement": "Rudder",
"limit": 50
}
}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup

setup(name='uploader',
version=1.0,
version=1.1,
description='Migrate values from local InfluxDB instance to InfluxDB Server/Cloud',
url='https://gitlab.ips.biba.uni-bremen.de/usg/umg-uploader',
author='Shantanoo Desai',
Expand Down
224 changes: 89 additions & 135 deletions uploader/uploader.py
Original file line number Diff line number Diff line change
@@ -1,166 +1,120 @@
import sys
#!/usr/bin/env python3
import json
import pprint
import time
import logging

import concurrent.futures
from influxdb import InfluxDBClient
from influxdb.client import InfluxDBClientError
from requests import ConnectionError
# import urllib3
# urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import requests.packages.urllib3 as urllib3

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

handler = logging.FileHandler("/var/log/uploader.log")
handler.setLevel(logging.ERROR)
CONF_PATH = '/etc/umg/upload/conf.json'
CONF = dict()

formatter = logging.Formatter('%(asctime)s-%(name)s-%(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
STOP = False


CONF_PATH = '/etc/umg/upload/conf.json'
CONF = dict()
LOCAL_DB = None
CLOUD_DB = None
BATCH = []
BATCH_SIZE = 0
with open(CONF_PATH) as cFile:
CONF = json.load(cFile)

def read_conf_file(path):
with open(path) as cFile:
_conf = json.load(cFile)
return _conf
local_db_conf = CONF['local']
cloud_db_conf = CONF['cloud']

conf_keys = list(CONF.keys())
conf_keys.remove('local')
conf_keys.remove('cloud')
conf_keys.remove('batch_size')

def upload_batch(datapoints):
for point in datapoints:
for field in point['fields']:
if point['measurement'] != 'button':
point['fields'][field] = float(point['fields'][field])
else:
point['fields'][field] = int(point['fields'][field])
point['fields']['status'] = int(1)
try:
logger.info('sending data to cloud')
if connected_to_cloud():
if CLOUD_DB.write_points(datapoints, time_precision='ms'):
logger.info('UPLOAD Completed')

try:
logger.info('re-writing batch to Local InfluxDB')
if LOCAL_DB.write_points(datapoints, time_precision='ms'):
logger.info('LOCAL DB Updated with status field = 1')
global BATCH
BATCH = []
except InfluxDBClientError as e:
logger.exception('Exception during re-writing batch to Local InfluxDB')
logger.exception(e)
else:
logger.error('UPLOAD Failed')
except Exception as e:
logger.exception('exception during complete upload_batch function')
logger.exception(e)
pass
LOCAL_DB = InfluxDBClient(host=local_db_conf['host'], port=local_db_conf['port'], database=local_db_conf['database'])
CLOUD_DB = InfluxDBClient(host=cloud_db_conf['host'], port=cloud_db_conf['port'], ssl=True, verify_ssl=False, username=cloud_db_conf['username'], password=cloud_db_conf['password'], database=cloud_db_conf['database'])

def get_points(conf):
QUERY = 'SELECT "{}" FROM {} WHERE "status"=0 LIMIT {}'.format(
'","'.join(conf['fields']),
conf['measurement'],
conf['limit'])
logger.debug(QUERY)
global LOCAL_DB
try:
query_results = LOCAL_DB.query(QUERY, epoch='ms')
except InfluxDBClientError as e:
logger.exception('exception during querying of: {}'.format(conf['measurement']))
logger.exception(e)
pass
if len(list(query_results)) == 0:
logger.info('No Results for QUERY')
else:
global BATCH
for points in list(query_results)[0]:
upload_json_body = {
'measurement': conf['measurement'],
'fields': {}
}
if 'tags' in conf:
upload_json_body['tags'] = conf['tags']
upload_json_body['time'] = points['time']
upload_json_body['fields'] = points
del points['time']
BATCH.append(upload_json_body)
logger.debug('current batch length: {}'.format(len(BATCH)))
global BATCH_SIZE
if len(BATCH) >= BATCH_SIZE:
logger.info('Uploading Batch Now since limit reached')
upload_batch(BATCH)

def connected_to_cloud():
cloud_connected = False
while not cloud_connected:
try:
cloud_version = CLOUD_DB.ping()
logger.debug('connected to InfluxDB v: {}'.format(cloud_version))
print('Cloud Version %s' %cloud_version)
cloud_connected = True
return cloud_connected
except ConnectionError as e:
logger.info('cannot connect to Cloud Server')
logger.exception(e)
logger.info('Trying again after 1.0 Minute')
except Exception as e:
print(e)
time.sleep(60.0)

def main():
logger.info('reading Configuration file')
global CONF
CONF = read_conf_file(CONF_PATH)
logger.debug('Config File: {}'.format(CONF))
global BATCH_SIZE
BATCH_SIZE = CONF['batch_size']
local_hostname = CONF['local']['host']
local_port = CONF['local']['port']
local_db = CONF['local']['database']

cloud_hostname = CONF['cloud']['host']
cloud_port = CONF['cloud']['port']
cloud_db = CONF['cloud']['database']
cloud_username = CONF['cloud']['username']
cloud_password = CONF['cloud']['password']
def query_and_upload(conf):
while not STOP:
print("For conf: {}".format(conf['measurement']))
QUERY = 'SELECT "{}" FROM {} WHERE "status"=0 LIMIT {}'.format('","'.join(conf['fields']), conf['measurement'], conf['limit'])
print(QUERY)
query_results = LOCAL_DB.query(QUERY, epoch='ms')
if len(list(query_results)) == 0:
print('No Results')
else:
batch = []
for points in list(query_results)[0]:

upload_json_body = {'measurement': conf['measurement'], 'fields': {'status': 1.0}}
if 'tags' in conf:
upload_json_body['tags'] = conf['tags']
upload_json_body['time'] = points['time']
# del points['time']
for field in conf['fields']:
upload_json_body['fields'][field] = points[field]
batch.append(upload_json_body)
# print(batch)
print('Length of Batch: {}'.format(len(batch)))
for point in batch:
for field in point['fields']:
if point['fields'][field] is not None:
if isinstance(point['fields'][field], int):
point['fields'][field] = float("%.3f" % point['fields'][field])
else:
point['fields'][field] = float(point['fields'][field])
else:
break
if connected_to_cloud():
print('Cloud Connected')
try:
if CLOUD_DB.write_points(batch, time_precision='ms'):
print('UPLOAD DONE')
if LOCAL_DB.write_points(batch, time_precision='ms'):
print('LOCAL UPDATE DONE')

else:
print('FAILED')
return 'Fail'
except Exception as e:
print(e)
time.sleep(1.0)

global LOCAL_DB
LOCAL_DB = InfluxDBClient(host=local_hostname, port=local_port, database=local_db)

global CLOUD_DB
CLOUD_DB = InfluxDBClient(host=cloud_hostname,
port=cloud_port,
ssl=True,
verify_ssl=False,
username=cloud_username,
password=cloud_password,
database=cloud_db)
def main():

try:
_ = LOCAL_DB.ping()

except ConnectionError as e:
logger.exception('Cannot connect to Local Server')
except Exception as e:
raise(e)

if connected_to_cloud():
try:
while True:
for config in CONF.keys():
if (config != 'local') and (config != 'cloud') and (config != 'batch_size'):
print(config)
get_points(CONF[config])
time.sleep(5.0)
except KeyboardInterrupt:
LOCAL_DB.close()
CLOUD_DB.close()
sys.exit(0)
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_to_upload = [executor.submit(query_and_upload, CONF[conf]) for conf in conf_keys]
for future in concurrent.futures.as_completed(future_to_upload):

try:
data = future.result()
except Exception as e:
print(e)
else:
print(data)
while (not future.done() for future in future_to_upload):
time.sleep(1.0)
except KeyboardInterrupt:
global STOP
STOP = True
LOCAL_DB.close()
CLOUD_DB.close()



if __name__ == "__main__":
main()
main()

0 comments on commit ce97e56

Please sign in to comment.