From 7f95bb351f0a413f80caf442cbdc26f6fc9ca4cd Mon Sep 17 00:00:00 2001 From: Norbert Kwizera Date: Tue, 3 Dec 2024 19:38:21 +0200 Subject: [PATCH] Add comment to results sync --- ureport/backend/rapidpro.py | 53 +++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/ureport/backend/rapidpro.py b/ureport/backend/rapidpro.py index 50356c265..c64219325 100644 --- a/ureport/backend/rapidpro.py +++ b/ureport/backend/rapidpro.py @@ -171,9 +171,11 @@ def local_kwargs(self, org, remote): reporter_group = org.get_config("%s.reporter_group" % self.backend.slug, default="") contact_groups_names = [group.name.lower() for group in remote.groups] + # Only sync contact in the configured reporters group, skip others if reporter_group.lower() not in contact_groups_names: return None + # Ignore empty contacts, without URNs set if not remote.urns: return None @@ -313,6 +315,10 @@ def update_required(self, local, remote, local_kwargs): def create_local(self, remote_as_kwargs): obj = super().create_local(remote_as_kwargs) + # Contacts sync only on the reporters group and those are the only with demoegraphic data saved. + # Poll results sycn all poll response including those not in the reporter group, + # Here we update the recent(last 30 days) results to have the demographic data for the contact + # that is registered (part of reporters group) and syncs now one_month_ago = timezone.now() - timedelta(days=30) if obj.registered_on is not None and obj.registered_on > one_month_ago: recent_results = PollResult.objects.filter( @@ -609,6 +615,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= start = time.time() logger.info("Start fetching runs for poll #%d on org #%d" % (poll.pk, org.pk)) + # fetch runs from API, respecting rate limits poll_runs_query = client.get_runs( flow=poll.flow_uuid, after=latest_synced_obj_time, reverse=True, paths=True ) @@ -616,6 +623,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= try: fetch_start = time.time() + for fetch in fetches: logger.info( "RapidPro API fetch for poll #%d " @@ -629,6 +637,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= ) ) + # for each API page fetched, initiate the maps for quick lookups and as cache for the sync task (contacts_map, poll_results_map, poll_results_to_save_map) = self._initiate_lookup_maps( fetch, org, poll ) @@ -642,6 +651,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= ) contact_obj = contacts_map.get(temba_run.contact.uuid, None) + # for each run process, the results we can for the values and path for not yet responded self._process_run_poll_results( org, questions_uuids, @@ -656,6 +666,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= if progress_callback: progress_callback(stats_dict["num_synced"]) + # Save the objects to the DB for new objects in the respective map self._save_new_poll_results_to_database(poll_results_to_save_map) logger.info( @@ -666,12 +677,15 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= fetch_start = time.time() logger.info("=" * 40) + # Pause the sync for this poll when we have synced Poll.POLL_RESULTS_MAX_SYNC_RUNS runs this time if ( stats_dict["num_synced"] >= Poll.POLL_RESULTS_MAX_SYNC_RUNS or time.time() > lock_expiration ): + # rebuild the aggregated counts poll.rebuild_poll_results_counts() + # mark this poll as paused, so we can resume from the proper time later self._mark_poll_results_sync_paused(org, poll, latest_synced_obj_time) logger.info( @@ -698,8 +712,11 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= stats_dict["num_path_ignored"], ) except TembaRateExceededError: + + # rebuild the aggregated counts poll.rebuild_poll_results_counts() + # mark this poll as paused, so we can resume from the proper time later self._mark_poll_results_sync_paused(org, poll, latest_synced_obj_time) logger.info( @@ -726,6 +743,7 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= stats_dict["num_path_ignored"], ) + # mark this poll as completed, so we can fetch from the proper time for future results from that time self._mark_poll_results_sync_completed(poll, org, latest_synced_obj_time) # from django.db import connection as db_connection, reset_queries @@ -760,6 +778,12 @@ def pull_results(self, poll, modified_after, modified_before, progress_callback= ) def _initiate_lookup_maps(self, fetch, org, poll): + """ + - Lookup existing contact from the contact uuid in the runs batch + - Build look up maps (existing contacts, existing results) by contact UUID for quick finding the existing objects + - Make maps for new results object to add to the DB in bulk (by bulk_create). + """ + contact_uuids = [run.contact.uuid for run in fetch] contacts = Contact.objects.filter(org=org, uuid__in=contact_uuids) contacts_map = {c.uuid: c for c in contacts} @@ -783,6 +807,14 @@ def _process_run_poll_results( poll_results_to_save_map, stats_dict, ): + """ + This method is to extract results from the run, we fetch from the RapidPro API. + If uses the maps intiated by each API page request to reduce the DB queries. + - First look on values for results set on the run and save them + - Second loop on the path to save the path for which the contact is waiting for a response/result to be set + - For each case we only update the lookup maps + """ + flow_uuid = temba_run.flow.uuid contact_uuid = temba_run.contact.uuid completed = temba_run.exit_type == "completed" @@ -801,6 +833,8 @@ def _process_run_poll_results( gender = contact_obj.gender scheme = contact_obj.scheme + # Loop on values set for the run to save the results responses, + # used to get the number of responded reporters for a question for temba_value in sorted(temba_run.values.values(), key=lambda val: val.time): ruleset_uuid = temba_value.node category = temba_value.category @@ -916,6 +950,9 @@ def _process_run_poll_results( poll_results_to_save_map[contact_uuid][ruleset_uuid] = result_obj stats_dict["num_val_created"] += 1 + + # Loop on paths to save the results without responses, + # used to get the number of polled reporters for a question for temba_path in temba_run.path: ruleset_uuid = temba_path.node category = None @@ -1015,6 +1052,10 @@ def _process_run_poll_results( def _check_update_required( poll_obj, category, text, state, district, ward, born, gender, scheme, completed, value_date ): + """ + To check whether any value need to be updated in the DB, + if the syncing data is newer/changed to what we have in the DB + """ update_required = poll_obj.category != category or poll_obj.text != text update_required = update_required or poll_obj.state != state update_required = update_required or poll_obj.district != district @@ -1032,6 +1073,9 @@ def _check_update_required( @staticmethod def _save_new_poll_results_to_database(poll_results_to_save_map): + """ + Save all the new objects to create to the DB in bulk, by bulk_create + """ new_poll_results = [] for c_key in poll_results_to_save_map.keys(): for r_key in poll_results_to_save_map.get(c_key, dict()): @@ -1042,6 +1086,10 @@ def _save_new_poll_results_to_database(poll_results_to_save_map): @staticmethod def _mark_poll_results_sync_paused(org, poll, latest_synced_obj_time): + """ + Use redis to set the time when we paused, where we will resume from. + This is a way to allow sharing the API throttle rate to multiple polls, to allow each to progress + """ # update the time for this poll from which we fetch next time cache.set(Poll.POLL_RESULTS_LAST_PULL_CACHE_KEY % (org.pk, poll.flow_uuid), latest_synced_obj_time, None) @@ -1051,6 +1099,11 @@ def _mark_poll_results_sync_paused(org, poll, latest_synced_obj_time): @staticmethod def _mark_poll_results_sync_completed(poll, org, latest_synced_obj_time): + """ + Use Redis key to mark we finished to sync existing results. + And future sync will only look for newer results that the time in redis. + """ + # update the time for this poll from which we fetch next time cache.set(Poll.POLL_RESULTS_LAST_PULL_CACHE_KEY % (org.pk, poll.flow_uuid), latest_synced_obj_time, None)