This repository has been archived by the owner on Dec 6, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommands.py
206 lines (174 loc) · 6.46 KB
/
commands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""This module defines rwrtrack.py commands."""
import code
import logging
import sys
from datetime import datetime, timedelta
from sqlalchemy.util import KeyedTuple
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy import func, distinct, text
from rwrtrack.get import get_stats
from rwrtrack.csv import load_stats_from_csv, write_stats_to_csv
from rwrtrack.db import sesh, _set_db_readonly, _set_db_writable
from rwrtrack.dbinfo import DbInfo, get_dbinfo
from rwrtrack.account import Account, get_account_by_name
from rwrtrack.record import Record, get_records_on_date
from rwrtrack.difference import Diff, difference
from rwrtrack.sum import sum_, diffsum
from rwrtrack.average import avg, diffavg
from rwrtrack.rank import rank, diffrank
from rwrtrack.filter import filter_
from rwrtrack.exceptions import NoAccountError, NoRecordError, NoCsvError
from rwrtrack.tablify import render_analysis_table
from rwrtrack.migrate import migrate
logger = logging.getLogger(__name__)
def _prettify_int_or_float(x):
if isinstance(x, int):
return f"{x:,d}"
elif isinstance(x, float):
return f"{x:,.2f}"
else:
raise Exception("Can't pretty '{x}' as it is not an int or float!")
def process_numeric_dates(date_string):
"""Identify and convert date or date range to int(s)."""
if date_string.isnumeric():
return "single", int(date_string)
else:
# Handle date ranges
dates = date_string.split("-")
d_older = int(dates[0])
d_newer = int(dates[1])
if (d_older > d_newer):
logger.error("Dates must be older-newer!")
sys.exit(1)
return "range", (d_newer, d_older)
def _get(csv_hist_dir, args):
# TODO: Rewrite to use write to db as well
num_pages = int(args["<pages>"]) if args["<pages>"] else 10
stats = get_stats(num_pages)
write_stats_to_csv(csv_hist_dir, stats)
def _analyse(args):
username = args["<name>"]
dates = args["<dates>"]
try:
account = get_account_by_name(username)
except NoAccountError as e:
logger.error(e)
sys.exit(1)
logger.info(f"Performing individual analysis for '{username}'...")
try:
if not dates:
print(f"'{account.username}' on {account.latest_date}:")
render_analysis_table(account.latest_record)
else:
dt, d = process_numeric_dates(dates)
if dt == "single":
record = account.on_date(d)
print(f"'{account.username}' on {record.date}:")
render_analysis_table(record)
elif dt == "range":
record_newer = account.on_date(d[0])
record_older = account.on_date(d[1])
diff = record_newer - record_older
print(f"'{account.username}' from {record_older.date} to {record_newer.date}:")
render_analysis_table(diff)
except NoRecordError as e:
logger.error(e)
sys.exit(1)
def _average(args):
dates = args["<dates>"]
rf, df = args["--record-filters"], args["--diff-filters"]
if not dates:
try:
db_info = get_dbinfo()
a = avg(db_info.latest_date, record_filters=rf)
except NoResultFound:
logger.info("Empty database! Exit.")
sys.exit(1)
else:
dt, d = process_numeric_dates(dates)
if dt == "single":
a = avg(d, record_filters=rf)
elif dt == "range":
a = diffavg(d[0], d[1], record_filters=rf, diff_filters=df)
# TODO: add nice table for averages
print(a)
def _rank(args):
metric = args["<metric>"]
try:
if args["--limit"]:
limit = abs(int(args["--limit"]))
if not limit > 0:
raise ValueError
else:
limit = 5
except ValueError as e:
logger.error(f"Limit must be an integer greater than or equal to 1")
sys.exit(1)
dates = args["<dates>"]
rf, df = args["--record-filters"], args["--diff-filters"]
if not dates:
try:
db_info = get_dbinfo()
ranking = rank(db_info.latest_date, metric, record_filters=rf)
except NoResultFound:
logger.info("Empty database! Exit.")
sys.exit(1)
else:
dt, d = process_numeric_dates(dates)
if dt == "single":
print(f"Ranking by '{metric}' on {d}:")
ranking = rank(d, metric, record_filters=rf)
elif dt == "range":
print(f"Ranking by '{metric}' between {d[1]} and {d[0]}:")
ranking = diffrank(d[0], d[1], metric, record_filters=rf, diff_filters=df)
ranking = ranking.limit(limit)
for x, r in enumerate(ranking.all()):
if isinstance(r, Record):
# render in the form "#12...MR. BANG...8,752,631"
v = _prettify_int_or_float(getattr(r, metric))
print(f"#{x+1:<8}{r.username:<24}{v}")
else:
# print(r._asdict())
v = _prettify_int_or_float(getattr(r, metric))
print(f"#{x+1:<8}{r.username:<24}{v}")
# print("\n", end="")
def _sum(args):
dates = args["<dates>"]
rf, df = args["--record-filters"], args["--diff-filters"]
if not dates:
try:
db_info = get_dbinfo()
s = sum_(db_info.latest_date, record_filters=rf)
except NoResultFound:
logger.info("Empty database! Exit.")
sys.exit(1)
else:
dt, d = process_numeric_dates(dates)
if dt == "single":
s = sum_(d, record_filters=rf)
elif dt == "range":
s = diffsum(d[0], d[1], record_filters=rf, diff_filters=df)
# TODO: add nice table for sums
print(s)
def _dbinfo():
try:
db_info = get_dbinfo()
except NoResultFound:
logger.info("Empty database! Exit.")
sys.exit(1)
print(f"First date: {db_info.first_date} Latest date: {db_info.latest_date}")
num_accounts = sesh.query(func.count(Account._id)).scalar()
print(f"Accounts recorded: {num_accounts}")
num_days = sesh.query(func.count(distinct(Record.date))).scalar()
print(f"Days recorded: {num_days}")
total_records = sesh.query(func.count(Record.date)).scalar()
print(f"Number of records: {total_records}")
def _db_migrate_csv(csv_hist_dir):
try:
migrate(csv_hist_dir)
except NoCsvError as e:
logger.error(e)
sys.exit(1)
def _interact():
print("Entering interactive mode...")
code.interact(local=globals(), banner="", exitmsg="")