Skip to content
This repository has been archived by the owner on Dec 6, 2022. It is now read-only.

Commit

Permalink
Rework migration for clarity and speed
Browse files Browse the repository at this point in the history
  • Loading branch information
david-wm-sanders committed Feb 16, 2020
1 parent 7987104 commit 59a9fe3
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 110 deletions.
72 changes: 3 additions & 69 deletions commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
from rwrtrack.average import avg, diffavg
from rwrtrack.rank import rank, diffrank
from rwrtrack.filter import filter_
from rwrtrack.util import process_numeric_dates, update_db_from_stats
from rwrtrack.util import process_numeric_dates
from rwrtrack.exceptions import NoAccountError, NoRecordError
from rwrtrack.tablify import render_analysis_table
from rwrtrack.logging import _configure_logging
from rwrtrack.migrate import migrate


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -162,73 +162,7 @@ def _dbinfo():


def _db_migrate_csv(csv_hist_path):
# Modify the logger level to INFO for FileHandler(s) to avoid debug logging every new record insertion
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.FileHandler):
handler.setLevel(logging.INFO)

logger.info("Migrating CSV to database...")
# Put the db in writable mode
_set_db_writable()
# Get all CSV files and filter
logger.info("Finding CSV files for migration...")
csv_file_paths = sorted(list(csv_hist_path.glob("*.csv")))
# Filter out CSV files that are not being migrated (for reasons...)
csv_file_paths = filter(lambda x: "2017" not in x.stem and "2018" not in x.stem,
csv_file_paths)
# TODO: Rework, just get the filter as list and peek instead...
try:
# Attempt to access the DbInfo
db_info = get_dbinfo()
except NoResultFound:
# If no DbInfo, db is blank, initialise from origin CSV file
logger.info("Blank database found - beginning full migration...")
csv_file_path = next(csv_file_paths)
logger.info(f"Processing '{csv_file_path}'...")
# Fix dates
d = datetime.strptime(csv_file_path.stem, "%Y-%m-%d").date()
d = d - timedelta(days=1)
d = int(d.strftime("%Y%m%d"))
# Populate _dbinfo table with the initial CSV in migration
db_info = DbInfo(date=d)
sesh.add(db_info)
# Add stats from the first file in the filter generator
stats = load_stats_from_csv(csv_file_path)
update_db_from_stats(stats, d)
else:
logger.info("Existing database found - continuing migration...")
# Step the csv_file_paths filter until we are at the first new file
while True:
try:
csv_file_path = next(csv_file_paths)
except StopIteration:
logger.info("No new CSV files to migrate")
sys.exit(1)
d = datetime.strptime(csv_file_path.stem, "%Y-%m-%d").date()
d = d - timedelta(days=1)
d = int(d.strftime("%Y%m%d"))
if (d > db_info.latest_date):
# Update latest_date in _dbinfo table
db_info.latest_date = d
# Add stats from the first new file in the filter generator
logger.info(f"Processing '{csv_file_path}'...")
stats = load_stats_from_csv(csv_file_path)
update_db_from_stats(stats, d)
break
finally:
for csv_file_path in csv_file_paths:
logger.info(f"Processing '{csv_file_path}'...")
stats = load_stats_from_csv(csv_file_path)
# Fix dates
d = datetime.strptime(csv_file_path.stem, "%Y-%m-%d").date()
d = d - timedelta(days=1)
d = int(d.strftime("%Y%m%d"))
# Update latest_date in _dbinfo table
db_info.latest_date = d
update_db_from_stats(stats, d)

# Return the db to readonly mode
_set_db_readonly()
migrate(csv_hist_path)


def _interact():
Expand Down
147 changes: 147 additions & 0 deletions rwrtrack/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import csv
import logging
import sys
import time
from datetime import datetime, timedelta

from .csv import load_stats_from_csv
from .db import engine, sesh, _set_db_readonly, _set_db_writable
from .dbinfo import DbInfo, get_dbinfo
from .account import Account
from .record import Record


logger = logging.getLogger(__name__)
# Configure blacklist for troublesome usernames
USERNAME_BLACKLIST = {"RAIOORIGINAL"}


def _mod_logging_handlers(handler_type, handler_level):
for handler in logging.getLogger().handlers:
if isinstance(handler, handler_type):
logger.info(f"Setting {handler} to {logging.getLevelName(handler_level)}")
handler.setLevel(handler_level)


def _find_csv_files(csv_hist_dir):
# Find CSV files in csv_hist_dir
csv_paths = sorted(csv_hist_dir.glob("*.csv"))
# Filter out CSV files that are never being migrated (for reasons...)
csv_paths = filter(lambda x: "2017" not in x.stem and "2018" not in x.stem, csv_paths)
return list(csv_paths)


def _fix_csv_date(csv_path):
# Construct a datetime from the csv_path stem (name sans extension)
d = datetime.strptime(csv_path.stem, "%Y-%m-%d").date()
# Subtract a day because the CSV files are named for the date of capture not the date of the data
d = d - timedelta(days=1)
# Return the adjusted dt mangled into the format used by the database
return int(d.strftime("%Y%m%d"))


def _increment(i):
while True: yield i; i += 1


def migrate(csv_hist_dir):
t0 = time.time()

logger.info("Starting database migration...")
# Modify FileHandler(s) to log at INFO to avoid debug logging every new record insertion
_mod_logging_handlers(logging.FileHandler, logging.INFO)
# Set the database to writable mode
_set_db_writable()

logger.info(f"Finding CSV files in '{csv_hist_dir}'...")
csv_paths = _find_csv_files(csv_hist_dir)

logger.info(f"Inspecting database at '{engine.url}'...")
dbinfo = get_dbinfo(error=False)
if not dbinfo:
# Database doesn't exist
logger.info(f"Blank database found, starting new migration...")
# Create dbinfo using the fixed date of the first of the CSV files
dbinfo = DbInfo(date=_fix_csv_date(csv_paths[0]))
sesh.add(dbinfo)
# Create an empty account_map and instantiate an _increment generator, starting at 1
account_map, account_id_gen = {}, _increment(1)
else:
# Database exists
logger.info(f"Populated database found, continuing existing migration...")
# Filter csv_paths for path where the fixed date is greater than the latest_date in dbinfo
latest_date = dbinfo.latest_date
csv_paths = [p for p in csv_paths if _fix_csv_date(p) > latest_date]
# Load accounts from db and populate an account_map from them
account_map = {}
accounts_in_db = sesh.query(Account).all()
for a in accounts_in_db:
account_map[a.username] = a._id
last_account_id = a._id
# Instantiate an _increment generator, starting at last_account_id + 1
account_id_gen = _increment(last_account_id + 1)

if not csv_paths:
logger.info(f"No new CSV files to migrate... Exiting.")
sys.exit(1)

logger.info(f"Migrating '{csv_paths[0].name}' -> '{csv_paths[-1].name}'...")

for csv_path in csv_paths:
t1 = time.time()
record_date = _fix_csv_date(csv_path)
logger.info(f"Processing '{csv_path.name}' as '{record_date}'...")

new_accounts, updated_accounts, new_records = [], [], []
with csv_path.open("r", encoding="utf-8") as csv_file:
csv_reader = csv.DictReader(csv_file)
for r in csv_reader:
username = r["username"]
# Skip row if username is in the blacklist
if username in USERNAME_BLACKLIST:
continue
if username not in account_map:
# Get the next account_id from the generator and map username: account_id in account_map
account_id = next(account_id_gen)
account_map[username] = account_id
# Create the new account dict for bulk_insert_mappings of Accounts
new_account = {"_id": account_id, "username": username,
"first_date": record_date, "latest_date": record_date}
new_accounts.append(new_account)
else:
# Get the existing account_id for the username from account_map
account_id = account_map[username]
# Create the updated account dict for bulk_update_mappings of Accounts
updated_account = {"_id": account_id, "latest_date": record_date}
updated_accounts.append(updated_account)

# Create a new_record from the row in the CSV file
new_record = dict(date=record_date, account_id=account_id, username=username, xp=r["xp"],
time_played=r["time_played"], kills=r["kills"], deaths=r["deaths"],
kill_streak=r["kill_streak"], targets_destroyed=r["targets_destroyed"],
vehicles_destroyed=r["vehicles_destroyed"], soldiers_healed=r["soldiers_healed"],
team_kills=r["team_kills"], distance_moved=r["distance_moved"],
shots_fired=r["shots_fired"], throwables_thrown=r["throwables_thrown"])
new_records.append(new_record)

na, ua, nr = len(new_accounts), len(updated_accounts), len(new_records)
logger.info(f"Discovered {na}/{ua} new/updated accounts across {nr} records in {(time.time() - t1):.2f}s")

t2 = time.time()
# Update dbinfo latest_date
dbinfo.latest_date = record_date
# Bulk insert new accounts
sesh.bulk_insert_mappings(Account, new_accounts)
# Bulk update existing accounts
sesh.bulk_update_mappings(Account, updated_accounts)
# Bulk insert new records
sesh.bulk_insert_mappings(Record, new_records)
# Commit all changes to the database atomically
sesh.commit()

logger.info(f"Entered mappings into database in {(time.time() - t2):.2f}s")
logger.info(f"Migrated '{csv_path.name}' in {(time.time() - t1):.2f}s")

_set_db_readonly()
migration_time = time.time() - t0
logger.info(f"Migration took {migration_time:.2f} seconds")
41 changes: 0 additions & 41 deletions rwrtrack/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

logger = logging.getLogger(__name__)

# Configure blacklist for troublesome usernames
username_blacklist = set()
username_blacklist.add("RAIOORIGINAL")


def process_numeric_dates(date_string):
if date_string.isnumeric():
Expand All @@ -26,40 +22,3 @@ def process_numeric_dates(date_string):
sys.exit(1)
# return "range", (d_older, d_newer)
return "range", (d_newer, d_older)


def update_db_from_stats(stats, d):
t0 = time.time()

account_usernames = set()
usernames = sesh.query(Account.username).all()
for u in usernames:
account_usernames.add(u[0])

recs = []
for s in stats:
# If username in blacklist, skip...
if s.username in username_blacklist:
continue
if s.username not in account_usernames:
account_usernames.add(s.username)
# Create a new Account for the username
account = Account(username=s.username, date=d)
sesh.add(account)
# Need to flush so that account._id is populated
sesh.flush()
else:
# Update Account for the username
account = sesh.query(Account).filter_by(username=s.username).one()
account.latest_date = d
# Create a history entry for this stat record
rec = dict(date=d, account_id=account._id, username=s.username, xp=s.xp, time_played=s.time_played,
kills=s.kills, deaths=s.deaths, kill_streak=s.kill_streak,
targets_destroyed=s.targets_destroyed, vehicles_destroyed=s.vehicles_destroyed,
soldiers_healed=s.soldiers_healed, team_kills=s.team_kills, distance_moved=s.distance_moved,
shots_fired=s.shots_fired, throwables_thrown=s.throwables_thrown)
recs.append(rec)

sesh.bulk_insert_mappings(Record, recs)
sesh.commit()
logger.info(f"db update for {d} took {(time.time() - t0):.2f} seconds")

0 comments on commit 59a9fe3

Please sign in to comment.