Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/temp-cache-last-processed-file-to-avoid-repeat-processing #5856

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion datahub/company/tasks/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import environ
import requests

from dateutil import parser
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
Expand All @@ -23,6 +24,7 @@
from datahub.core.queues.job_scheduler import job_scheduler
from datahub.core.queues.scheduler import LONG_RUNNING_QUEUE
from datahub.core.realtime_messaging import send_realtime_message
from datahub.ingest.models import IngestedObject


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -206,8 +208,16 @@ def ingest(self):
)
return

if IngestedObject.objects.filter(object_key=file_key).exists():
logger.info(
'File %s has already been processed',
file_key,
)
return

try:
self.sync_file_with_database(s3_client, file_key)
IngestedObject.objects.create(object_key=file_key)
except Exception as exc:
logger.exception(
f'Error ingesting contact consent file {file_key}',
Expand Down Expand Up @@ -313,7 +323,12 @@ def sync_file_with_database(self, client, file_key):
consent_data=contact.consent_data,
consent_data_last_modified=contact.consent_data_last_modified,
)
logger.info('Updated contact consent data for email %s in row %s', email, i)
logger.info(
'Updated consent data for contact id %s with email %s in file row %s',
contact.id,
email,
i,
)

logger.info(
'Finished processing total %s rows for contact consent from file %s',
Expand Down
31 changes: 29 additions & 2 deletions datahub/company/test/tasks/test_contact_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from datahub.company.test.factories import CompanyFactory, ContactFactory
from datahub.core.queues.errors import RetryError
from datahub.core.test_utils import HawkMockJSONResponse
from datahub.ingest.models import IngestedObject
from datahub.ingest.test.factories import IngestedObjectFactory


def generate_hawk_response(payload):
Expand Down Expand Up @@ -527,11 +529,35 @@ def test_ingest_with_empty_s3_bucket_does_not_call_sync(self):

@mock_aws
@override_settings(S3_LOCAL_ENDPOINT_URL=None)
def test_ingest_calls_sync_with_newest_file_order(self, test_files):
def test_ingest_with_newest_file_key_equal_to_existing_file_key_does_not_call_sync(
self,
test_files,
):
"""
Test that the task returns when the latest file is equal to an existing ingested file
"""
setup_s3_bucket(BUCKET, test_files)
IngestedObjectFactory(object_key=test_files[-1])
task = ContactConsentIngestionTask()
with mock.patch.multiple(
task,
sync_file_with_database=mock.DEFAULT,
):
task.ingest()
task.sync_file_with_database.assert_not_called()

@mock_aws
@override_settings(S3_LOCAL_ENDPOINT_URL=None)
def test_ingest_calls_sync_with_newest_file_when_file_is_new(
self,
test_files,
):
"""
Test that the ingest calls the sync with the files in correct order
Test that the ingest calls the sync with the latest file when the file key does
not exist in the list of previously ingested files
"""
setup_s3_bucket(BUCKET, test_files)
IngestedObjectFactory()
task = ContactConsentIngestionTask()
with mock.patch.multiple(
task,
Expand All @@ -542,6 +568,7 @@ def test_ingest_calls_sync_with_newest_file_order(self, test_files):
mock.ANY,
test_files[-1],
)
assert IngestedObject.objects.filter(object_key=test_files[-1]).exists()

@mock_aws
def test_sync_file_without_contacts_stops_job_processing(self):
Expand Down
Loading