Skip to content

Commit

Permalink
Add comment to results sync
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Dec 3, 2024
1 parent d3f3954 commit ac22f83
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions ureport/backend/rapidpro.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ def local_kwargs(self, org, remote):
reporter_group = org.get_config("%s.reporter_group" % self.backend.slug, default="")
contact_groups_names = [group.name.lower() for group in remote.groups]

# Only sync contact in the configured reporters group, skip others
if reporter_group.lower() not in contact_groups_names:
return None

# Ignore empty contacts, without URNs set
if not remote.urns:
return None

Expand Down Expand Up @@ -313,6 +315,10 @@ def update_required(self, local, remote, local_kwargs):
def create_local(self, remote_as_kwargs):
obj = super().create_local(remote_as_kwargs)

# Contacts sync only on the reporters group and those are the only with demographic data saved.
# Poll results sync all poll response including those not in the reporter group,
# Here we update the recent(last 30 days) results to have the demographic data for the contact
# that is registered (part of reporters group) and syncs now
one_month_ago = timezone.now() - timedelta(days=30)
if obj.registered_on is not None and obj.registered_on > one_month_ago:
recent_results = PollResult.objects.filter(
Expand Down Expand Up @@ -609,13 +615,15 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback=
start = time.time()
logger.info("Start fetching runs for poll #%d on org #%d" % (poll.pk, org.pk))

# fetch runs from API, respecting rate limits
poll_runs_query = client.get_runs(
flow=poll.flow_uuid, after=latest_synced_obj_time, reverse=True, paths=True
)
fetches = poll_runs_query.iterfetches(retry_on_rate_exceed=True)

try:
fetch_start = time.time()

for fetch in fetches:
logger.info(
"RapidPro API fetch for poll #%d "
Expand All @@ -629,6 +637,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback=
)
)

# for each API page fetched, initiate the maps for quick lookups and as cache for the sync task
(contacts_map, poll_results_map, poll_results_to_save_map) = self._initiate_lookup_maps(
fetch, org, poll
)
Expand All @@ -642,6 +651,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback=
)

contact_obj = contacts_map.get(temba_run.contact.uuid, None)
# for each run process, the results we can for the values and path for not yet responded
self._process_run_poll_results(
org,
questions_uuids,
Expand All @@ -656,6 +666,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback=
if progress_callback:
progress_callback(stats_dict["num_synced"])

# Save the objects to the DB for new objects in the respective map
self._save_new_poll_results_to_database(poll_results_to_save_map)

logger.info(
Expand All @@ -666,12 +677,15 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback=
fetch_start = time.time()
logger.info("=" * 40)

# Pause the sync for this poll when we have synced Poll.POLL_RESULTS_MAX_SYNC_RUNS runs this time
if (
stats_dict["num_synced"] >= Poll.POLL_RESULTS_MAX_SYNC_RUNS
or time.time() > lock_expiration
):
# rebuild the aggregated counts
poll.rebuild_poll_results_counts()

# mark this poll as paused, so we can resume from the proper time later
self._mark_poll_results_sync_paused(org, poll, latest_synced_obj_time)

logger.info(
Expand All @@ -698,8 +712,11 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback=
stats_dict["num_path_ignored"],
)
except TembaRateExceededError:

# rebuild the aggregated counts
poll.rebuild_poll_results_counts()

# mark this poll as paused, so we can resume from the proper time later
self._mark_poll_results_sync_paused(org, poll, latest_synced_obj_time)

logger.info(
Expand All @@ -726,6 +743,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback=
stats_dict["num_path_ignored"],
)

# mark this poll as completed, so we can fetch from the proper time for future results from that time
self._mark_poll_results_sync_completed(poll, org, latest_synced_obj_time)

# from django.db import connection as db_connection, reset_queries
Expand Down Expand Up @@ -760,6 +778,12 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback=
)

def _initiate_lookup_maps(self, fetch, org, poll):
"""
- Lookup existing contact from the contact uuid in the runs batch
- Build look up maps (existing contacts, existing results) by contact UUID for quick finding the existing objects
- Make maps for new results object to add to the DB in bulk (by bulk_create).
"""

contact_uuids = [run.contact.uuid for run in fetch]
contacts = Contact.objects.filter(org=org, uuid__in=contact_uuids)
contacts_map = {c.uuid: c for c in contacts}
Expand All @@ -783,6 +807,14 @@ def _process_run_poll_results(
poll_results_to_save_map,
stats_dict,
):
"""
This method is to extract results from the run, we fetch from the RapidPro API.
If uses the maps intiated by each API page request to reduce the DB queries.
- First look on values for results set on the run and save them
- Second loop on the path to save the path for which the contact is waiting for a response/result to be set
- For each case we only update the lookup maps
"""

flow_uuid = temba_run.flow.uuid
contact_uuid = temba_run.contact.uuid
completed = temba_run.exit_type == "completed"
Expand All @@ -801,6 +833,8 @@ def _process_run_poll_results(
gender = contact_obj.gender
scheme = contact_obj.scheme

# Loop on values set for the run to save the results responses,
# used to get the number of responded reporters for a question
for temba_value in sorted(temba_run.values.values(), key=lambda val: val.time):
ruleset_uuid = temba_value.node
category = temba_value.category
Expand All @@ -812,6 +846,7 @@ def _process_run_poll_results(
poll_result_to_save = poll_results_to_save_map.get(contact_uuid, dict()).get(ruleset_uuid, None)

if existing_poll_result is not None:
# exising obj in the DB, check whether that needs update and update both the DB and the maps obj fields
update_required = self._check_update_required(
existing_poll_result,
category,
Expand Down Expand Up @@ -860,6 +895,7 @@ def _process_run_poll_results(
stats_dict["num_val_ignored"] += 1

elif poll_result_to_save is not None:
# exising obj in the maps, check whether that needs to be replaced in the maps, that will be saved later in bulk to the DB
replace_save_map = self._check_update_required(
poll_result_to_save,
category,
Expand Down Expand Up @@ -896,6 +932,7 @@ def _process_run_poll_results(

stats_dict["num_val_ignored"] += 1
else:
# completely new results to save to the DB, add that to the maps now to be saved in DB later in bulk
result_obj = PollResult(
org=org,
flow=flow_uuid,
Expand All @@ -916,6 +953,9 @@ def _process_run_poll_results(
poll_results_to_save_map[contact_uuid][ruleset_uuid] = result_obj

stats_dict["num_val_created"] += 1

# Loop on paths to save the results without responses,
# used to get the number of polled reporters for a question
for temba_path in temba_run.path:
ruleset_uuid = temba_path.node
category = None
Expand All @@ -928,6 +968,9 @@ def _process_run_poll_results(
poll_result_to_save = poll_results_to_save_map.get(contact_uuid, dict()).get(ruleset_uuid, None)

if existing_poll_result is not None:

# exiting obj in the DB, check whether that need to be updated when the non response happened after 5 seconds
# sometimes the path is the same or close time as the value(result) time
if existing_poll_result.date is None or value_date > (
existing_poll_result.date + timedelta(seconds=5)
):
Expand Down Expand Up @@ -964,6 +1007,8 @@ def _process_run_poll_results(
stats_dict["num_path_ignored"] += 1

elif poll_result_to_save is not None:
# Only replace existing results in maps when the non response happened after 5 seconds
# sometimes the path is the same or close time as the value(result) time
if value_date > (poll_result_to_save.date + timedelta(seconds=5)):
result_obj = PollResult(
org=org,
Expand All @@ -987,6 +1032,7 @@ def _process_run_poll_results(
stats_dict["num_path_ignored"] += 1

else:
# new obj to add, add in the maps first to save to DB later
result_obj = PollResult(
org=org,
flow=flow_uuid,
Expand Down Expand Up @@ -1015,6 +1061,10 @@ def _process_run_poll_results(
def _check_update_required(
poll_obj, category, text, state, district, ward, born, gender, scheme, completed, value_date
):
"""
To check whether any value need to be updated in the DB,
if the syncing data is newer/changed to what we have in the DB
"""
update_required = poll_obj.category != category or poll_obj.text != text
update_required = update_required or poll_obj.state != state
update_required = update_required or poll_obj.district != district
Expand All @@ -1032,6 +1082,9 @@ def _check_update_required(

@staticmethod
def _save_new_poll_results_to_database(poll_results_to_save_map):
"""
Save all the new objects to create to the DB in bulk, by bulk_create
"""
new_poll_results = []
for c_key in poll_results_to_save_map.keys():
for r_key in poll_results_to_save_map.get(c_key, dict()):
Expand All @@ -1042,6 +1095,10 @@ def _save_new_poll_results_to_database(poll_results_to_save_map):

@staticmethod
def _mark_poll_results_sync_paused(org, poll, latest_synced_obj_time):
"""
Use redis to set the time when we paused, where we will resume from.
This is a way to allow sharing the API throttle rate to multiple polls, to allow each to progress
"""
# update the time for this poll from which we fetch next time
cache.set(Poll.POLL_RESULTS_LAST_PULL_CACHE_KEY % (org.pk, poll.flow_uuid), latest_synced_obj_time, None)

Expand All @@ -1051,6 +1108,11 @@ def _mark_poll_results_sync_paused(org, poll, latest_synced_obj_time):

@staticmethod
def _mark_poll_results_sync_completed(poll, org, latest_synced_obj_time):
"""
Use Redis key to mark we finished to sync existing results.
And future sync will only look for newer results that the time in redis.
"""

# update the time for this poll from which we fetch next time
cache.set(Poll.POLL_RESULTS_LAST_PULL_CACHE_KEY % (org.pk, poll.flow_uuid), latest_synced_obj_time, None)

Expand Down

0 comments on commit ac22f83

Please sign in to comment.