-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixes the massive data usage problem
Fixes #22
- Loading branch information
Showing
3 changed files
with
150 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -221,7 +221,7 @@ def upload_data_to_endpoint(local_stop_signal): | |
with urllib.request.urlopen(req, timeout=30) as response: | ||
if response.status == 204: | ||
for p in payload: | ||
tc.execute('UPDATE measurements SET uploaded = ?, data = NULL WHERE id = ?;', (int(time.time()), p['row_id'])) | ||
tc.execute('DELETE FROM measurements WHERE id = ?;', (p['row_id'],)) | ||
thread_conn.commit() | ||
upload_delta = time.time() - start_time | ||
logging.debug(f"Uploaded. Took {upload_delta:.2f} seconds") | ||
|
@@ -407,6 +407,123 @@ def check_DB(local_stop_signal, stime: SharedTime): | |
thread_conn.close() | ||
|
||
|
||
def optimize_DB(local_stop_signal): | ||
while not local_stop_signal.is_set(): | ||
|
||
sleeper(local_stop_signal, 3600) # We only need to optimize every hour | ||
|
||
logging.debug("Starting DB optimization for power_measurements") | ||
|
||
thread_conn = sqlite3.connect(DATABASE_FILE) | ||
tc = thread_conn.cursor() | ||
|
||
# This is for legacy systems. We just make sure that there are no values left | ||
tc.execute('DELETE FROM measurements WHERE data IS NULL;') | ||
|
||
one_week_ago = int(time.time() * 1000) - 7 * 24 * 60 * 60 * 1000 # Adjusted for milliseconds | ||
|
||
aggregate_query = """ | ||
SELECT | ||
strftime('%s', date(time / 1000, 'unixepoch')) * 1000 AS day_epoch, | ||
SUM(combined_energy), | ||
SUM(cpu_energy), | ||
SUM(gpu_energy), | ||
SUM(ane_energy), | ||
SUM(energy_impact) | ||
FROM | ||
power_measurements | ||
WHERE | ||
time < ? | ||
GROUP BY | ||
day_epoch; | ||
""" | ||
tc.execute(aggregate_query, (one_week_ago,)) | ||
aggregated_data = tc.fetchall() | ||
|
||
tc.execute(""" | ||
CREATE TEMPORARY TABLE temp_power_measurements ( | ||
time INT, | ||
combined_energy INT, | ||
cpu_energy INT, | ||
gpu_energy INT, | ||
ane_energy INT, | ||
energy_impact INT | ||
); | ||
""") | ||
|
||
insert_temp_query = """ | ||
INSERT INTO temp_power_measurements (time, combined_energy, cpu_energy, gpu_energy, ane_energy, energy_impact) | ||
VALUES (?, ?, ?, ?, ?, ?); | ||
""" | ||
tc.executemany(insert_temp_query, aggregated_data) | ||
|
||
delete_query = """ | ||
DELETE FROM power_measurements WHERE time < ?; | ||
""" | ||
tc.execute(delete_query, (one_week_ago,)) | ||
|
||
insert_back_query = """ | ||
INSERT INTO power_measurements (time, combined_energy, cpu_energy, gpu_energy, ane_energy, energy_impact) | ||
SELECT * FROM temp_power_measurements; | ||
""" | ||
tc.execute(insert_back_query) | ||
|
||
tc.execute("DROP TABLE temp_power_measurements;") | ||
|
||
logging.debug("Starting DB optimization for top_processes") | ||
|
||
# Do the same with processes | ||
aggregate_query = """ | ||
SELECT | ||
name, | ||
SUM(energy_impact) AS total_energy_impact, | ||
AVG(cputime_per) AS average_cputime_per | ||
FROM | ||
top_processes | ||
WHERE | ||
time < ? | ||
GROUP BY | ||
name; | ||
""" | ||
tc.execute(aggregate_query, (one_week_ago,)) | ||
aggregated_data = tc.fetchall() | ||
|
||
tc.execute(""" | ||
CREATE TEMPORARY TABLE temp_top_processes ( | ||
name STRING, | ||
total_energy_impact INT, | ||
average_cputime_per INT | ||
); | ||
""") | ||
|
||
insert_temp_query = """ | ||
INSERT INTO temp_top_processes (name, total_energy_impact, average_cputime_per) | ||
VALUES (?, ?, ?); | ||
""" | ||
tc.executemany(insert_temp_query, aggregated_data) | ||
|
||
tc.execute("DELETE FROM top_processes WHERE time < ?;", (one_week_ago,)) | ||
|
||
insert_back_query = """ | ||
INSERT INTO top_processes (time, name, energy_impact, cputime_per) | ||
SELECT ?, name, total_energy_impact, average_cputime_per FROM temp_top_processes; | ||
""" | ||
tc.execute(insert_back_query, (one_week_ago,)) | ||
|
||
# Drop the temporary table | ||
tc.execute("DROP TABLE temp_top_processes;") | ||
|
||
thread_conn.commit() | ||
|
||
# We vacuum to actually reduce the file size. We probably don't need to vacuum this often but I would rather | ||
# do it here then have another thread. | ||
tc.execute("VACUUM;") | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
ArneTR
via email
Member
|
||
|
||
logging.debug("Ending DB optimization") | ||
|
||
thread_conn.close() | ||
|
||
|
||
def is_power_logger_running(): | ||
try: | ||
subprocess.check_output(['pgrep', '-f', sys.argv[0]]) | ||
|
@@ -550,6 +667,11 @@ def get_settings(debug = False): | |
ticker_thread.start() | ||
logging.debug('Ticker thread started') | ||
|
||
db_checker_thread = threading.Thread(target=optimize_DB, args=(stop_signal,), daemon=True) | ||
db_checker_thread.start() | ||
logging.debug('DB optimizer thread started') | ||
|
||
|
||
run_powermetrics(stop_signal, args.file) | ||
|
||
c.close() |
I think you need to do a VACCUUM FULL: https://postgrespro.com/docs/postgresql/9.4/sql-vacuum.html
"Plain VACUUM (without FULL) simply reclaims space and makes it available for re-use. This form of the command can operate in parallel with normal reading and writing of the table, as an exclusive lock is not obtained. However, extra space is not returned to the operating system (in most cases); it's just kept available for re-use within the same table. VACUUM FULL rewrites the entire contents of the table into a new disk file with no extra space, allowing unused space to be returned to the operating system. This form is much slower and requires an exclusive lock on each table while it is being processed.
"