Skip to content

Commit

Permalink
Add datetime-range filtering options to tarball-dump script.
Browse files Browse the repository at this point in the history
Also change default tarball size to avoid unreasonable RAM requirements,
document typical RAM usage.
  • Loading branch information
timstaley committed Sep 15, 2016
1 parent 1b28993 commit 8ef265b
Showing 1 changed file with 93 additions and 17 deletions.
110 changes: 93 additions & 17 deletions voeventdb/server/bin/voeventdb_dump_tarball.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
#!/usr/bin/env python

import sys
import os
import argparse
import datetime
import gc
import logging
import glob
import os
import sys
import textwrap

import iso8601
import pytz
import voeventdb.server.database.config as dbconfig
from argparse import RawTextHelpFormatter
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from voeventdb.server.database import db_utils
import voeventdb.server.database.config as dbconfig
from voeventdb.server.database.models import Voevent
from voeventdb.server.utils.filestore import write_tarball

Expand All @@ -20,6 +24,11 @@
logger = logging.getLogger("voeventdb-dump")


class MyFormatter(argparse.ArgumentDefaultsHelpFormatter,
argparse.RawDescriptionHelpFormatter):
pass


def handle_args():
"""
Default values are defined here.
Expand All @@ -28,20 +37,63 @@ def handle_args():
default_database_name = dbconfig.testdb_corpus_url.database
parser = argparse.ArgumentParser(
prog=os.path.basename(__file__),
formatter_class=argparse.ArgumentDefaultsHelpFormatter, )
# formatter_class=argparse.ArgumentDefaultsHelpFormatter,
formatter_class=MyFormatter,
description=textwrap.dedent("""\
Dump the raw VOEvents as XML files, collected into bzip2'd tarballs.
If start or end times are specified, then the range is start-inclusive
end-exclusive, i.e.
start <= author_datetime < end
NB when writing compressed tarballs in Python, the entire file is
composed in memory before writing to file. This means that setting
`nsplit` too large will result in very high memory usage! The default
value of 5000 seems to peak at <250MB of RAM (though this varies
according to the size of the VOEvent packets, and assumes
`prefetch=False`). Some quick tests suggest typical RAM usage
~= 40MB + 30MB*(nsplit/1000) .
"""),

)
parser.add_argument('tarfile_pathstem',
help='Path to tarball to create. '
help='Path to tarball to create, e.g. `foobar`. '
'Suffix ``.tar.bz2`` will be appended.'
)

parser.add_argument('-d', '--dbname', nargs='?',
default=str(default_database_name),
help='Database name')

parser.add_argument('-s', '--split',
parser.add_argument('-n', '--nsplit',
type=int,
help="Output multiple files, `SPLIT` packets per tarball."
"Naming convention is `<stem>.01.tar.bz2, <stem>.02.tar.bz2, ...`")
default=5000,
help=
"Output multiple files, `NSPLIT` packets per tarball."
"Naming convention is `<stem>.001.tar.bz2, <stem>.002.tar.bz2, ...`"
)
parser.add_argument('-s', '--start',
type=iso8601.parse_date,
default=None,
help="Filter events by author_date>=`START`")
parser.add_argument('-e', '--end',
type=iso8601.parse_date,
default=datetime.datetime.now(tz=pytz.UTC),
help=
"Filter events by author_date<`END`")
parser.add_argument('-p', '--prefetch', action='store_true', default=False,
help=
"Bulk-fetch XML packets from DB (~3x faster, but "
"uses considerably more RAM, depending on value of "
"`nsplit`.)"
)
parser.add_argument('-a', '--all', action='store_true', default=False,
help=
"Ignore any datetime filters, dump **all** VOEvents."
"(This option is provided to allow dumping of VOEvents"
"with author_datetime=Null, which are otherwise ignored.)"
)
return parser.parse_args()


Expand All @@ -55,23 +107,47 @@ def main():
n_packets_written = 0

def get_tarfile_path():
if args.split:
suffix = '.{0:02d}.tar.bz2'.format(filecount)
if args.nsplit:
suffix = '.{0:03d}.tar.bz2'.format(filecount)
else:
suffix = '.tar.bz2'
return args.tarfile_pathstem + suffix

session = Session(bind=create_engine(dburl))
qry = session.query(Voevent).order_by(Voevent.id)
if args.prefetch:
qry = session.query(Voevent.ivorn, Voevent.xml)
else:
qry = session.query(Voevent)

if args.all:
logger.info("Dumping **all** packets currently in database")
else:
qry = qry.filter(Voevent.author_datetime < args.end)
if args.start is not None:
qry = qry.filter(Voevent.author_datetime >= args.start)
logger.info("Fetching packets from {}".format(args.start))
else:
logger.info("Fetching packets from beginning of time")
logger.info("...until: {}".format(args.end))
qry = qry.order_by(Voevent.id)

n_matching = qry.count()
logger.info("Dumping {} packets".format(n_matching))

start_time = datetime.datetime.now()
while n_packets_written < n_matching:
voevents = qry.limit(args.split).offset(n_packets_written).all()
logger.debug("Fetching batch of up to {} packets".format(args.nsplit))
voevents = qry.limit(args.nsplit).offset(n_packets_written).all()
ivorn_xml_tuples_gen = ((v.ivorn, v.xml) for v in voevents)
n_packets_written += write_tarball(ivorn_xml_tuples_gen, get_tarfile_path())
n_packets_written += write_tarball(ivorn_xml_tuples_gen,
get_tarfile_path())
elapsed = (datetime.datetime.now() - start_time).total_seconds()
logger.info(
"{} packets dumped so far, in {} ({:.0f} kilopacket/s)".format(
n_packets_written,
elapsed,
n_packets_written / elapsed
))
filecount += 1

session.close()
logger.info("Wrote {} packets".format(n_packets_written))
return 0
Expand Down

0 comments on commit 8ef265b

Please sign in to comment.