Skip to content
This repository was archived by the owner on Feb 28, 2018. It is now read-only.

Commit

Permalink
Delegate create/update to Celery on data import
Browse files Browse the repository at this point in the history
  • Loading branch information
cuducos committed Aug 26, 2017
1 parent 026ebd6 commit 12b569d
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 90 deletions.
48 changes: 10 additions & 38 deletions jarbas/core/management/commands/reimbursements.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import lzma

from django.utils.timezone import now
from reprint import output

from jarbas.core.management.commands import LoadCommand
from jarbas.core.models import Reimbursement
from jarbas.core.tasks import create_or_update_reimbursement


class Command(LoadCommand):
Expand All @@ -15,17 +15,10 @@ def handle(self, *args, **options):
self.started_at = now()
self.path = options['dataset']

existing = Reimbursement.objects.count()
print('Starting with {:,} reimbursements'.format(existing))
self.count = {'updated': 0, 'created': 0, 'skip': 0}

if options.get('drop', False):
self.drop_all(Reimbursement)

with output() as status:
status.change(self.status)
self.create_or_update(self.reimbursements, status)

self.create_or_update(self.reimbursements)
self.mark_not_updated_reimbursements()

@property
Expand Down Expand Up @@ -77,40 +70,19 @@ def serialize(self, reimbursement):
for key in floats:
reimbursement[key] = self.to_number(reimbursement[key])

reimbursement['issue_date'] = self.to_date(reimbursement['issue_date'])
issue_date = self.to_date(reimbursement['issue_date'])
reimbursement['issue_date'] = issue_date.strftime('%Y-%m-%d')

return reimbursement

def create_or_update(self, reimbursements_as_dicts, status):
for reimbursement in reimbursements_as_dicts:
document_id = reimbursement.get('document_id')

if not document_id:
self.count['skip'] += 1
status.change(self.status)
continue
def create_or_update(self, reimbursements_as_dicts):
for index, reimbursement in enumerate(reimbursements_as_dicts):
create_or_update_reimbursement.delay(reimbursement)
count = index + 1
self.print_count(Reimbursement, count=count)

_, created = Reimbursement.objects.update_or_create(
document_id=document_id,
defaults=reimbursement
)

key = 'created' if created else 'updated'
self.count[key] += 1
status.change(self.status)
print('{} reimbursements scheduled to creation/update'.format(count))

def mark_not_updated_reimbursements(self):
qs = Reimbursement.objects.filter(last_update__lt=self.started_at)
qs.update(available_in_latest_dataset=False)

@property
def status(self):
label = '{}s'.format(self.get_model_name(Reimbursement)).lower()
total = sum(map(self.count.get, self.count.keys()))
output = [
'Processed: {} lines'.format(total),
'Updated: {} {}'.format(self.count['updated'], label),
'Created: {} {}'.format(self.count['created'], label),
'Skip: {} {}'.format(self.count['skip'], label),
]
return output
13 changes: 13 additions & 0 deletions jarbas/core/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from celery import shared_task

from jarbas.core.models import Reimbursement


@shared_task
def create_or_update_reimbursement(data):
"""
:param data: (dict) reimbursement data (keys and data types must mach
Reimbursement model)
"""
kwargs = dict(document_id=data['document_id'], defaults=data)
Reimbursement.objects.update_or_create(**kwargs)
26 changes: 26 additions & 0 deletions jarbas/core/tests/test_reimbursement_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from django.forms.models import model_to_dict
from django.test import TestCase
from mixer.backend.django import mixer

from jarbas.core.models import Reimbursement
from jarbas.core.tasks import create_or_update_reimbursement


class TestCreateOrUpdateTask(TestCase):

def test_create(self):
with mixer.ctx(commit=False):
fixture = model_to_dict(mixer.blend(Reimbursement))

self.assertEqual(0, Reimbursement.objects.count())
create_or_update_reimbursement(fixture)
self.assertEqual(1, Reimbursement.objects.count())

def test_update(self):
self.assertEqual(0, Reimbursement.objects.count())

fixture = model_to_dict(mixer.blend(Reimbursement))
self.assertEqual(1, Reimbursement.objects.count())

create_or_update_reimbursement(fixture)
self.assertEqual(1, Reimbursement.objects.count())
63 changes: 11 additions & 52 deletions jarbas/core/tests/test_reimbursements_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_serializer(self):
'document_type': 7,
'document_value': 8.90,
'installment': 7,
'issue_date': date(1970, 1, 1),
'issue_date': '1970-01-01',
'leg_of_the_trip': '8',
'month': 1,
'net_values': '1.99,2.99',
Expand Down Expand Up @@ -91,45 +91,16 @@ def test_serializer(self):

class TestCreate(TestCommand):

@patch.object(Reimbursement.objects, 'update_or_create')
@patch('jarbas.core.management.commands.reimbursements.output')
def test_create_or_update(self, output, create):
status = MagicMock()
@patch.object(Command, 'print_count')
@patch('jarbas.core.management.commands.reimbursements.print')
@patch('jarbas.core.management.commands.reimbursements.create_or_update_reimbursement')
def test_create_or_update(self, create, print_, print_count):
reimbursements = (
dict(ahoy=42, document_id=1),
dict(ahoy=21, document_id=''),
dict(ahoy=84, document_id=2)
)
create.side_effect = ((True, True), (False, False))
self.command.count = dict(zip(('updated', 'created', 'skip'), [0] * 3))
self.command.create_or_update(reimbursements, status)

# assert update_or_create was called
create.assert_has_calls((
call(document_id=1, defaults=reimbursements[0]),
call(document_id=2, defaults=reimbursements[-1])
))

# assert status.change was called
self.assertEqual(3, status.change.call_count)

# assert self.count was updated
expected = {
'updated': 1,
'created': 1,
'skip': 1
}
self.assertEqual(expected, self.command.count)

def test_status(self):
expected = [
'Processed: 42 lines',
'Updated: 36 reimbursements',
'Created: 4 reimbursements',
'Skip: 2 reimbursements'
]
self.command.count = {'updated': 36, 'created': 4, 'skip': 2}
self.assertEqual(expected, self.command.status)
self.command.create_or_update(reimbursements)
create.delay.assert_has_calls((call(r) for r in reimbursements))


class TestMarkNonUpdated(TestCommand):
Expand All @@ -144,36 +115,24 @@ def test_mark_available_in_latest_dataset(self, filter_):

class TestConventionMethods(TestCommand):

@patch('jarbas.core.management.commands.reimbursements.print')
@patch('jarbas.core.management.commands.reimbursements.Command.reimbursements')
@patch('jarbas.core.management.commands.reimbursements.Command.create_or_update')
@patch('jarbas.core.management.commands.reimbursements.output')
@patch('jarbas.core.management.commands.reimbursements.Command.mark_not_updated_reimbursements')
def test_handler_without_options(self, mark, output, create, reimbursements, print_):
status = MagicMock()
output.return_value.__enter__.return_value = status
def test_handler_without_options(self, mark, create, reimbursements):
reimbursements.return_value = (1, 2, 3)
self.command.handle(dataset='reimbursements.xz')
print_.assert_called_once_with('Starting with 0 reimbursements')
create.assert_called_once_with(reimbursements, status)
output.assert_called_once_with()
status.change.assert_called_once_with(self.command.status)
create.assert_called_once_with(reimbursements)
self.assertEqual('reimbursements.xz', self.command.path)
mark.assert_called_once_with()

@patch('jarbas.core.management.commands.reimbursements.print')
@patch('jarbas.core.management.commands.reimbursements.Command.reimbursements')
@patch('jarbas.core.management.commands.reimbursements.Command.create_or_update')
@patch('jarbas.core.management.commands.reimbursements.Command.drop_all')
@patch('jarbas.core.management.commands.reimbursements.output')
@patch('jarbas.core.management.commands.reimbursements.Command.mark_not_updated_reimbursements')
def test_handler_with_options(self, mark, output, drop_all, create, reimbursements, print_):
status = MagicMock()
output.return_value.__enter__.return_value = status
def test_handler_with_options(self, mark, drop_all, create, reimbursements):
self.command.handle(dataset='reimbursements.xz', drop=True)
print_.assert_called_once_with('Starting with 0 reimbursements')
drop_all.assert_called_once_with(Reimbursement)
create.assert_called_once_with(reimbursements, status)
create.assert_called_once_with(reimbursements)
mark.assert_called_once_with()


Expand Down

0 comments on commit 12b569d

Please sign in to comment.