Skip to content

Commit

Permalink
Improve performance of recommender job
Browse files Browse the repository at this point in the history
This commit improves the performance of the recommender job:
- It now only groups individuals that are directly related. This means
that if A = {B} and B = {A, C}, it won't create a recommendation
between A and C.
- The recommendations are now created between individuals instead of
identities. This results in fewer recommendations but is more useful if you
only need to merge individuals. If you need recommendations between
identities, set verbose to True.
- It no longer queries the database when a recommendation is created;
it follows the style "Easier to Ask Forgiveness Than Permission" (EAFP).
- The recommendations internally now return the mk of an identity, avoiding
unnecessary database queries.

Signed-off-by: Jose Javier Merchante <[email protected]>
  • Loading branch information
jjmerchante committed Nov 16, 2023
1 parent 998c9eb commit 13c2d64
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 305 deletions.
69 changes: 46 additions & 23 deletions sortinghat/core/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

import django_rq
import django_rq.utils
import pandas
import rq
import redis.exceptions
from django.db import IntegrityError, transaction

from .db import find_individual_by_uuid, find_organization
from .api import enroll, merge, update_profile, add_scheduled_task, delete_scheduled_task
Expand Down Expand Up @@ -255,16 +255,27 @@ def recommend_matches(ctx, source_uuids,
results[rec.key] = list(rec.options)
# Store matches in the database
for match in rec.options:
try:
individual1 = find_individual_by_uuid(rec.key)
individual2 = find_individual_by_uuid(match)
except NotFoundError:
logger.info(f"Job {job.id} 'One individual does not exists'")
if verbose:
try:
match_indiv = find_individual_by_uuid(match)
match = match_indiv.mk
except NotFoundError:
logger.info(f"'Individual {match} does not exists'")
continue

indiv_1, indiv_2 = rec.mk, match

# Use "the smaller string" as individual1 and try to create the recommendation
if indiv_1 == indiv_2:
continue
# Check if the recommendation already exists in any direction
if not MergeRecommendation.objects.filter(individual1=individual1, individual2=individual2).exists() and \
not MergeRecommendation.objects.filter(individual2=individual1, individual1=individual2).exists():
MergeRecommendation.objects.create(individual1=individual1, individual2=individual2)
elif indiv_1 > indiv_2:
indiv_1, indiv_2 = indiv_2, indiv_1

try:
with transaction.atomic():
MergeRecommendation.objects.create(individual1_id=indiv_1, individual2_id=indiv_2)
except IntegrityError:
pass

trxl.close()

Expand Down Expand Up @@ -443,23 +454,34 @@ def unify(ctx, criteria, source_uuids=None, target_uuids=None, exclude=True, str
def _group_recommendations(recs):
"""Calculate unique sets of identities from matching recommendations.
For instance, given a list of matching groups like
A = {A, B}; B = {B,A,C}, C = {C,} and D = {D,} the output
for keys A, B and C will be the group {A, B, C}. As D has no matches,
it won't be included in any group and it won't be returned.
For instance, given a dictionary of matching groups like
{A: [B], B: [A,C], D: [E]} the output will be the groups
[{A, B, C}, {D, E}].
:param recs: recommendations of matching identities
:returns: a list including unique groups of matches
"""
groups = []
for group_key in recs:
g_uuids = pandas.Series(recs[group_key])
g_uuids = g_uuids.append(pandas.Series([group_key]))
g_uuids = list(g_uuids.sort_values().unique())
if (len(g_uuids) > 1) and (g_uuids not in groups):
groups.append(g_uuids)
return groups
visited = set()

def dfs(node, current_set):
# Depth First Search algorithm
# Keep visiting the keys until the end
if node not in visited:
visited.add(node)
current_set.add(node)
for neighbor in recs.get(node, []):
dfs(neighbor, current_set)

result = []
for key in recs:
if key not in visited:
current_set = set()
dfs(key, current_set)
if len(current_set) > 1:
result.append(current_set)

return result

check_criteria(criteria)

Expand Down Expand Up @@ -489,12 +511,13 @@ def _group_recommendations(recs):
exclude=exclude,
strict=strict,
last_modified=last_modified):
match_recs[rec.key] = list(rec.options)
match_recs[rec.mk] = list(rec.options)

match_groups = _group_recommendations(match_recs)

# Apply the merge of the matching identities
for group in match_groups:
group = sorted(group)
uuid = group[0]
result = group[1:]
merged_to, errs = _merge_individuals(job_ctx, uuid, result)
Expand Down
2 changes: 1 addition & 1 deletion sortinghat/core/recommendations/affiliation.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def recommend_affiliations(uuids):
except NotFoundError:
continue
else:
yield (uuid, _suggest_affiliations(individual))
yield (uuid, individual.mk, _suggest_affiliations(individual))

logger.info(f"Affiliation recommendations generated; uuids='{uuids}'")

Expand Down
4 changes: 2 additions & 2 deletions sortinghat/core/recommendations/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

Recommendation = collections.namedtuple(
'Recommendation',
['key', 'type', 'options']
['key', 'mk', 'type', 'options']
)


Expand Down Expand Up @@ -86,7 +86,7 @@ def _generate_recommendations(name, recommender, *args, **kwargs):
"""Generator of recommendations."""

for rec in recommender(*args, **kwargs):
yield Recommendation(rec[0], name, rec[1])
yield Recommendation(rec[0], rec[1], name, rec[2])

@classmethod
def types(cls):
Expand Down
2 changes: 1 addition & 1 deletion sortinghat/core/recommendations/gender.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def recommend_gender(uuids, exclude=True, no_strict_matching=False):
logger.warning(message)
continue
else:
yield uuid, (gender, accuracy)
yield uuid, individual.mk, (gender, accuracy)

logger.info(f"Gender recommendations generated; uuids='{uuids}'")

Expand Down
66 changes: 14 additions & 52 deletions sortinghat/core/recommendations/matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,22 @@ def _get_identities(uuid):
)

aliases = defaultdict(list)
mk_sources = dict()
input_set = set()
target_set = set()

if source_uuids:
for uuid in source_uuids:
identities = _get_identities(uuid)
aliases[uuid] = [identity.uuid for identity in identities]
mk_sources[uuid] = identities[0].individual.mk if len(identities) > 0 else uuid
input_set.update(identities)
else:
identities = Identity.objects.select_related('individual').filter(individual__last_modified__gte=last_modified)
input_set.update(identities)
for identity in identities:
aliases[identity.individual.mk].append(identity.uuid)
mk_sources[identity.uuid] = identity.individual.mk
source_uuids = aliases.keys()

if target_uuids:
Expand All @@ -140,7 +143,7 @@ def _get_identities(uuid):
result.remove(uuid)
except KeyError:
pass
yield uuid, list(result)
yield uuid, mk_sources[uuid], sorted(result)

logger.info(f"Matching recommendations generated; criteria='{criteria}'")

Expand Down Expand Up @@ -197,52 +200,6 @@ def _filter_criteria(df, c, strict=True):

return cdf

def _calculate_matches_groups(grouped_uids, verbose=False):
"""Calculate groups of matching identities from identity groups.
For instance, given a list of matched unique identities like
A = {A, B}; B = {B,A,C}, C = {C,} and D = {D,} the output
for keys A, B and C will be the set {A, B, C}. As D has no matches,
it won't be included in any group and it won't be returned.
:param grouped_uids: groups of unique identities
:param verbose: if true, the grouping will be calculated using
unique identities (uuids) instead of main keys from individuals.
:returns: a dictionary including the set of matches for each
group key.
"""
matches = {}
processed = set()

# Group by main keys from Individuals or by uuids from Identities
col_name = 'uuid_y' if verbose else 'individual_y'

sorted_keys = sorted(grouped_uids.groups.keys())

while sorted_keys:
group_key = sorted_keys.pop(0)
uuid_set = set()
for uuid in grouped_uids.get_group(group_key)[col_name]:
uuid_set.add(uuid)

if processed.intersection(uuid_set):
# There are common identities already seen.
# Find the common sets

for key in matches:
prev_match = matches[key]
if prev_match == uuid_set:
continue
elif prev_match.intersection(uuid_set):
prev_match.update(uuid_set)
uuid_set = prev_match

processed.update(uuid_set)
matches[group_key] = uuid_set

return matches

data_x = [model_to_dict(fl) for fl in set_x]
data_y = [model_to_dict(fl) for fl in set_y]

Expand All @@ -262,15 +219,20 @@ def _calculate_matches_groups(grouped_uids, verbose=False):
cdf_x = _filter_criteria(df_x, c, strict)
cdf_y = _filter_criteria(df_y, c, strict)
cdf = pandas.merge(cdf_x, cdf_y, on=c, how='inner')
cdf = cdf[['individual_y', 'uuid_x', 'uuid_y']]
cdf = cdf[['individual_x', 'uuid_x', 'individual_y', 'uuid_y']]
cdfs.append(cdf)

result = pandas.concat(cdfs)
result = result.drop_duplicates()

g_result = result.groupby(by=['uuid_x'],
as_index=True, sort=True)
col_y_name = 'uuid_y' if verbose else 'individual_y'
col_x_name = 'uuid_x' if verbose else 'individual_x'
result = result[[col_x_name, col_y_name]]

# Remove duplicated
result = result[result[col_x_name] != result[col_y_name]]
result_g = result.groupby(col_x_name, group_keys=False)

matched = _calculate_matches_groups(g_result, verbose=verbose)
# Convert the dataframe to a dict of sets
matched = result_g[col_y_name].apply(set).to_dict()

return matched
17 changes: 16 additions & 1 deletion sortinghat/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from django.conf import settings
from django.core.paginator import Paginator
from django.db import IntegrityError
from django.db.models import Q, Subquery

from django.db.models import (JSONField, Count)
Expand Down Expand Up @@ -1254,9 +1255,23 @@ def mutate(self, info, recommendation_id, apply):
ctx = SortingHatContext(user=user, tenant=tenant)

recommendation = MergeRecommendation.objects.get(id=int(recommendation_id))
to_indiv = recommendation.individual1
from_indiv = recommendation.individual2
if apply:
# Update related individual removed
for rec in MergeRecommendation.objects.filter(Q(individual1=from_indiv.mk) | Q(individual2=from_indiv.mk)):
if rec.individual1 == from_indiv:
key = 'individual1'
else:
key = 'individual2'
try:
setattr(rec, key, to_indiv)
rec.save()
except IntegrityError:
rec.delete()

try:
merge(ctx, [recommendation.individual2.mk], recommendation.individual1.mk)
merge(ctx, [from_indiv.mk], to_indiv.mk)
except EqualIndividualError:
pass
# Can't keep a recommendation in which one individual is missing
Expand Down
30 changes: 20 additions & 10 deletions tests/rec/test_affiliations.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ def test_recommendations(self):

rec = recs[0]
self.assertEqual(rec[0], jsmith.uuid)
self.assertListEqual(rec[1], ['Example'])
self.assertEqual(rec[1], jsmith.individual.mk)
self.assertListEqual(rec[2], ['Example'])

rec = recs[1]
self.assertEqual(rec[0], jroe.uuid)
self.assertListEqual(rec[1], ['Bitergia', 'Example'])
self.assertEqual(rec[1], jroe.individual.mk)
self.assertListEqual(rec[2], ['Bitergia', 'Example'])

def test_already_enrolled(self):
"""Check if an organization is not included in the recommendation
Expand All @@ -131,8 +133,9 @@ def test_already_enrolled(self):

rec = recs[0]
self.assertEqual(rec[0], jsmith.uuid)
self.assertEqual(rec[1], jsmith.individual.mk)
# Bitergia is not included
self.assertListEqual(rec[1], ['Example'])
self.assertListEqual(rec[2], ['Example'])

def test_multiple_top_domains(self):
"""Check if it chooses the right domain when multiple top
Expand All @@ -150,7 +153,8 @@ def test_multiple_top_domains(self):

rec = recs[0]
self.assertEqual(rec[0], jdoe.uuid)
self.assertListEqual(rec[1], ['Example Int.'])
self.assertEqual(rec[1], jdoe.individual.mk)
self.assertListEqual(rec[2], ['Example Int.'])

def test_no_match(self):
"""Check if empty recommendations are returned when there is
Expand All @@ -171,7 +175,8 @@ def test_no_match(self):

rec = recs[0]
self.assertEqual(rec[0], jsmith.uuid)
self.assertListEqual(rec[1], [])
self.assertEqual(rec[1], jsmith.individual.mk)
self.assertListEqual(rec[2], [])

def test_invalid_identity_email(self):
"""Check if empty recommendations are returned for invalid emails"""
Expand All @@ -189,7 +194,8 @@ def test_invalid_identity_email(self):

rec = recs[0]
self.assertEqual(rec[0], noemail.uuid)
self.assertListEqual(rec[1], [])
self.assertEqual(rec[1], noemail.individual.mk)
self.assertListEqual(rec[2], [])

def test_empty_email(self):
"""Check if empty recommendations are returned for empty
Expand All @@ -209,7 +215,8 @@ def test_empty_email(self):

rec = recs[0]
self.assertEqual(rec[0], jdoe.uuid)
self.assertListEqual(rec[1], [])
self.assertEqual(rec[1], jdoe.individual.mk)
self.assertListEqual(rec[2], [])

def test_wrong_email(self):
"""Check if email ending in a dot doesn't raise an exception"""
Expand All @@ -227,7 +234,8 @@ def test_wrong_email(self):

rec = recs[0]
self.assertEqual(rec[0], wrong_email.uuid)
self.assertListEqual(rec[1], [])
self.assertEqual(rec[1], wrong_email.individual.mk)
self.assertListEqual(rec[2], [])

def test_not_found_individual(self):
"""Check if no recommendations are generated when an
Expand Down Expand Up @@ -272,8 +280,10 @@ def test_top_domain(self):

rec = recs[0]
self.assertEqual(rec[0], jsmith.uuid)
self.assertListEqual(rec[1], ['Bitergia', 'Example'])
self.assertEqual(rec[1], jsmith.individual.mk)
self.assertListEqual(rec[2], ['Bitergia', 'Example'])

rec = recs[1]
self.assertEqual(rec[0], jroe.uuid)
self.assertListEqual(rec[1], ['Example'])
self.assertEqual(rec[1], jroe.individual.mk)
self.assertListEqual(rec[2], ['Example'])
2 changes: 1 addition & 1 deletion tests/rec/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def generate_recommendations():
"""Generate fake recommendations."""

for i in range(len(options)):
yield (i, options[0:i])
yield (i, i, options[0:i])


def generate_error():
Expand Down
Loading

0 comments on commit 13c2d64

Please sign in to comment.