-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversion script for LA Metro attachments #193
Changes from 4 commits
6ae8807
705fdb5
896da89
373bbee
3693b75
5077e3d
8741959
31ce23b
2b7f722
55c5762
b81a215
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import os | ||
import sys | ||
import subprocess | ||
import logging | ||
import logging.config | ||
import sqlalchemy as sa | ||
import datetime | ||
import signal | ||
import os | ||
import requests | ||
import textract | ||
import tempfile | ||
|
||
from django.core.management.base import BaseCommand | ||
from django.conf import settings | ||
from django.db.models import Max | ||
|
||
from councilmatic_core.models import BillDocument | ||
|
||
logging.config.dictConfig(settings.LOGGING) | ||
logging.getLogger("requests").setLevel(logging.WARNING) | ||
logger = logging.getLogger(__name__) | ||
|
||
DB_CONN = 'postgresql://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}' | ||
|
||
engine = sa.create_engine(DB_CONN.format(**settings.DATABASES['default']), | ||
convert_unicode=True, | ||
server_side_cursors=True) | ||
|
||
class Command(BaseCommand): | ||
help = 'Converts bill attachments into plain text' | ||
|
||
def add_arguments(self, parser): | ||
parser.add_argument( | ||
'--update_all', | ||
default=False, | ||
action='store_true', | ||
help='Add or update plain text for all bill attachments.') | ||
|
||
def handle(self, *args, **options): | ||
self.update_all = options['update_all'] | ||
self.connection = engine.connect() | ||
self.add_plain_text() | ||
|
||
def get_document_url(self): | ||
self.connection.execute("SET local timezone to '{}'".format(settings.TIME_ZONE)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do this here, instead of in the transactional context below? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do this at all? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes - this is unnecessary: we are not adding any timestamps to the instances of BillDocument. We can safely remove it. |
||
with engine.begin() as connection: | ||
# Only apply this query to most recently updated (or created) bill documents. | ||
max_updated = BillDocument.objects.all().aggregate(Max('updated_at'))['updated_at__max'] | ||
|
||
if max_updated is None or self.update_all: | ||
query = ''' | ||
SELECT id, url | ||
FROM councilmatic_core_billdocument | ||
WHERE document_type='A' | ||
AND full_text is null | ||
AND lower(url) similar to '%(.doc|.docx|.pdf)' | ||
ORDER BY updated_at DESC | ||
''' | ||
else: | ||
query = ''' | ||
SELECT id, url | ||
FROM councilmatic_core_billdocument | ||
WHERE updated_at >= :max_updated | ||
AND document_type='A' | ||
AND full_text is null | ||
AND lower(url) similar to '%(.doc|.docx|.pdf)' | ||
ORDER BY updated_at DESC | ||
''' | ||
|
||
result = connection.execution_options(stream_results=True).execute(sa.text(query), max_updated=max_updated) | ||
|
||
yield from result | ||
|
||
def convert_document(self): | ||
documents = self.get_document_url() | ||
|
||
logger.info('Converting document to plain text...') | ||
|
||
for document_data in documents: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto |
||
document_data = dict(document_data) | ||
url = document_data['url'] | ||
document_id = document_data['id'] | ||
response = requests.get(url) | ||
extension = os.path.splitext(url)[1] | ||
|
||
with tempfile.NamedTemporaryFile(suffix=extension) as tfp: | ||
tfp.write(response.content) | ||
plain_text = textract.process(tfp.name) | ||
|
||
yield {'plain_text': plain_text.decode('utf-8'), 'id': document_id} | ||
|
||
|
||
def add_plain_text(self): | ||
plain_text_results = self.convert_document() | ||
|
||
self.connection.execute("SET local timezone to '{}'".format(settings.TIME_ZONE)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we have to do this twice? Could we do it once, at the top of the |
||
query = ''' | ||
UPDATE councilmatic_core_billdocument as bill_docs | ||
SET full_text = :plain_text | ||
WHERE bill_docs.id = :id | ||
''' | ||
|
||
chunk = [] | ||
|
||
for doc_dict in plain_text_results: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It'd be a bit more readable here to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you be a bit more descriptive with the |
||
chunk.append(doc_dict) | ||
if len(chunk) == 20: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get that you want to do these updates in batches of 20 or less, but this control flow feels a little bit clunky. @fgregg, is there a more streamlined way to do this? If no, @reginafcompton, maybe a comment stating intention would make this a little easier to grok. :-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure 90% that
will do the right thing if plain_texts is a generator. |
||
with self.connection.begin() as trans: | ||
self.connection.execute(sa.text(query), chunk) | ||
|
||
chunk = [] | ||
|
||
if chunk: | ||
with self.connection.begin() as trans: | ||
self.connection.execute(sa.text(query), chunk) | ||
|
||
logger.info('SUCCESS') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opening a connection in this way, without subsequently closing it, relies on garbage collection to close it out for you. The SQLAlchemy docs caution against this approach, notably because it's unreliable and can lead to orphaned connections to your database hanging out forever until they ultimately cause a mysterious "You have too many connections open, sorry!!" error from Postgres.
Is there a reason you assigned
connection
as a class attribute rather than using thewith engine.begin()
context manager when you require a connection?