-
Notifications
You must be signed in to change notification settings - Fork 1
/
gps.py
executable file
·98 lines (79 loc) · 2.82 KB
/
gps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import os
import logging
from sanji.core import Sanji
from sanji.core import Route
from sanji.connection.mqtt import Mqtt
from sanji.model_initiator import ModelInitiator
from voluptuous import Schema
from voluptuous import Any, Extra
# TODO: logger should be defined in sanji package?
logger = logging.getLogger()
class Gps(Sanji):
"""
A model to handle GPS configuration.
Attributes:
model: GPS' database with json format.
"""
def init(self, *args, **kwargs):
try: # pragma: no cover
bundle_env = kwargs["bundle_env"]
except KeyError:
bundle_env = os.getenv("BUNDLE_ENV", "debug")
path_root = os.path.abspath(os.path.dirname(__file__))
if bundle_env == "debug": # pragma: no cover
path_root = "%s/tests" % path_root
try:
self.load(path_root)
except:
self.stop()
raise IOError("Cannot load any configuration.")
def load(self, path):
"""
Load the configuration. If configuration is not installed yet,
initialise them with default value.
Args:
path: Path for the bundle, the configuration should be located
under "data" directory.
"""
self.model = ModelInitiator("gps", path, backup_interval=-1)
if not self.model.db:
raise IOError("Cannot load any configuration.")
self.save()
def save(self):
"""
Save and backup the configuration.
"""
self.model.save_db()
self.model.backup_db()
@Route(methods="get", resource="/system/gps")
def get(self, message, response):
return response(data=self.model.db)
put_schema = Schema({
"lat": Any(int, float),
"lon": Any(int, float),
Extra: object
}, required=True)
@Route(methods="put", resource="/system/gps", schema=put_schema)
def put(self, message, response):
# TODO: should be removed when schema worked for unittest
if not hasattr(message, "data"):
return response(code=400,
data={"message": "Invalid input."})
# TODO: should be removed when schema worked for unittest
try:
Gps.put_schema(message.data)
except Exception as e:
return response(code=400,
data={"message": "Invalid input: %s." % e})
self.model.db["lat"] = message.data["lat"]
self.model.db["lon"] = message.data["lon"]
self.save()
return response(data=self.model.db)
if __name__ == "__main__":
FORMAT = "%(asctime)s - %(levelname)s - %(lineno)s - %(message)s"
logging.basicConfig(level=0, format=FORMAT)
logger = logging.getLogger("GPS")
gps = Gps(connection=Mqtt())
gps.start()