Skip to content

Commit

Permalink
Merge branch 'sh_connection' of 'https://github.com/zhquan/grimoirela…
Browse files Browse the repository at this point in the history
…b-sirmordred'

Merges #624 
Closes #624
  • Loading branch information
jjmerchante authored Aug 30, 2024
2 parents 00e5e1b + c763dbb commit e41bad5
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: Reduced the number of connections to SortingHat
category: performance
author: Quan Zhou
issue: null
notes: >
Mordred makes a lot of connections to the SortingHat server
which could cause the uWSGI queue to fill up. When the uWSGI
queue is full, Mordred cannot connect to the SortingHat server.
39 changes: 34 additions & 5 deletions sirmordred/sirmordred.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from sirmordred.task_manager import TasksManager
from sirmordred.task_panels import TaskPanels, TaskPanelsMenu
from sirmordred.task_projects import TaskProjects
from sortinghat.cli.client import SortingHatClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,15 +199,15 @@ def _split_tasks(tasks_cls):
repos_backend = self._get_repos_by_backend()
for backend in repos_backend:
# Start new Threads and add them to the threads list to complete
t = TasksManager(backend_tasks, backend, stopper, self.config, small_delay)
t = TasksManager(backend_tasks, backend, stopper, self.config, self.client, small_delay)
threads.append(t)
t.start()

# launch thread for global tasks
if len(global_tasks) > 0:
# FIXME timer is applied to all global_tasks, does it make sense?
# All tasks are executed in the same thread sequentially
gt = TasksManager(global_tasks, "Global tasks", stopper, self.config, big_delay)
gt = TasksManager(global_tasks, "Global tasks", stopper, self.config, self.client, big_delay)
threads.append(gt)
gt.start()
if big_delay > 0:
Expand Down Expand Up @@ -248,14 +249,14 @@ def __execute_initial_load(self):
if self.conf['phases']['panels']:
tasks = [TaskPanels, TaskPanelsMenu]
stopper.set()
tm = TasksManager(tasks, "Global tasks", stopper, self.config)
tm = TasksManager(tasks, "Global tasks", stopper, self.config, self.client)
tm.start()
tm.join()

logger.info("Loading projects")
tasks = [TaskProjects]
stopper.set()
tm = TasksManager(tasks, "Global tasks", stopper, self.config)
tm = TasksManager(tasks, "Global tasks", stopper, self.config, self.client)
tm.start()
tm.join()
logger.info("Projects loaded")
Expand All @@ -280,7 +281,7 @@ def start(self):

# check we have access to the needed ES
if not self.check_es_access():
print('Can not access Elasticsearch service. Exiting sirmordred ...')
print('Can not access ElasticSearch/OpenSearch service. Exiting sirmordred ...')
sys.exit(1)

# If bestiary is configured check that it is working
Expand All @@ -289,6 +290,9 @@ def start(self):
print('Can not access bestiary service. Exiting sirmordred ...')
sys.exit(1)

# Create SortingHat Client
self.__create_sh_client(self.config)

# Initial round: panels and projects loading
self.__execute_initial_load()

Expand Down Expand Up @@ -336,3 +340,28 @@ def start(self):
logger.error(var)

logger.info("Finished SirMordred engine ...")

def __create_sh_client(self, config):
self.config = config
self.conf = config.get_conf()

sortinghat = self.conf.get('sortinghat', None)
self.db_sh = sortinghat['database'] if sortinghat else None
self.db_user = sortinghat['user'] if sortinghat else None
self.db_password = sortinghat['password'] if sortinghat else None
self.db_host = sortinghat['host'] if sortinghat else '127.0.0.1'
self.db_path = sortinghat.get('path', None) if sortinghat else None
self.db_port = sortinghat.get('port', None) if sortinghat else None
self.db_ssl = sortinghat.get('ssl', False) if sortinghat else False
self.db_verify_ssl = sortinghat.get('verify_ssl', True) if sortinghat else True
self.db_tenant = sortinghat.get('tenant', True) if sortinghat else None
self.db_unaffiliate_group = sortinghat['unaffiliated_group'] if sortinghat else None
if sortinghat and not hasattr(self, 'client'):
self.client = SortingHatClient(host=self.db_host, port=self.db_port,
path=self.db_path, ssl=self.db_ssl,
user=self.db_user, password=self.db_password,
verify_ssl=self.db_verify_ssl,
tenant=self.db_tenant)
self.client.connect()
elif not sortinghat:
self.client = None
13 changes: 2 additions & 11 deletions sirmordred/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from grimoire_elk.elk import get_ocean_backend
from grimoire_elk.utils import get_connector_from_name, get_elastic
from grimoire_elk.enriched.utils import grimoire_con
from sortinghat.cli.client import SortingHatClient

logger = logging.getLogger(__name__)

Expand All @@ -42,10 +41,11 @@ class Task():
'studies', 'node_regex', 'anonymize']
PARAMS_WITH_SPACES = ['blacklist-jobs']

def __init__(self, config):
def __init__(self, config, sortinghat_client=None):
self.backend_section = None
self.config = config
self.conf = config.get_conf()
self.client = sortinghat_client

sortinghat = self.conf.get('sortinghat', None)
self.db_sh = sortinghat['database'] if sortinghat else None
Expand All @@ -58,15 +58,6 @@ def __init__(self, config):
self.db_verify_ssl = sortinghat.get('verify_ssl', True) if sortinghat else True
self.db_tenant = sortinghat.get('tenant', True) if sortinghat else None
self.db_unaffiliate_group = sortinghat['unaffiliated_group'] if sortinghat else None
if sortinghat:
self.client = SortingHatClient(host=self.db_host, port=self.db_port,
path=self.db_path, ssl=self.db_ssl,
user=self.db_user, password=self.db_password,
verify_ssl=self.db_verify_ssl,
tenant=self.db_tenant)
self.client.connect()
else:
self.client = None

self.grimoire_con = grimoire_con(conn_retries=12) # 30m retry

Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_autorefresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
class TaskAutorefresh(Task):
"""Refresh the last modified identities for all the backends."""

def __init__(self, config):
super().__init__(config)
def __init__(self, config, sortinghat_client):
super().__init__(config, sortinghat_client)

self.last_autorefresh_backend = {}

Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
class TaskEnrich(Task):
""" Basic class shared by all enriching tasks """

def __init__(self, config, backend_section=None, allowed_repos=None):
super().__init__(config)
def __init__(self, config, sortinghat_client, backend_section=None, allowed_repos=None):
super().__init__(config, sortinghat_client)
self.backend_section = backend_section
self.allowed_repos = set(allowed_repos) if allowed_repos else None
# This will be options in next iteration
Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_identities.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
class TaskIdentitiesMerge(Task):
""" Task for processing identities in SortingHat """

def __init__(self, conf):
super().__init__(conf)
def __init__(self, conf, soringhat_client):
super().__init__(conf, soringhat_client)
self.last_autorefresh = datetime.utcnow() # Last autorefresh date

def is_backend_task(self):
Expand Down
5 changes: 3 additions & 2 deletions sirmordred/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TasksManager(threading.Thread):
IDENTITIES_TASKS_ON_LOCK = threading.Lock()
IDENTITIES_TASKS_ON = False

def __init__(self, tasks_cls, backend_section, stopper, config, timer=0):
def __init__(self, tasks_cls, backend_section, stopper, config, sortinghat_client, timer=0):
"""
:tasks_cls : tasks classes to be executed using the backend
:backend_section: perceval backend section name
Expand All @@ -64,6 +64,7 @@ def __init__(self, tasks_cls, backend_section, stopper, config, timer=0):
self.stopper = stopper # To stop the thread from parent
self.timer = timer
self.thread_id = None
self.client = sortinghat_client

def add_task(self, task):
self.tasks.append(task)
Expand All @@ -80,7 +81,7 @@ def __set_thread_id():
logger.debug(self.tasks_cls)
for tc in self.tasks_cls:
# create the real Task from the class
task = tc(self.config)
task = tc(self.config, self.client)
task.set_backend_section(self.backend_section)
self.tasks.append(task)

Expand Down
32 changes: 19 additions & 13 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from sirmordred.config import Config
from sirmordred.task import Task

from sortinghat.cli.client import SortingHatClient

CONF_FILE = 'test.cfg'
BACKEND_NAME = 'stackexchange'
COLLECTION_URL_STACKEXCHANGE = 'http://127.0.0.1:9200'
Expand All @@ -47,14 +49,23 @@ def read_file(filename, mode='r'):

class TestTask(unittest.TestCase):
"""Task tests"""
def setUp(self):
self.config = Config(CONF_FILE)
self.conf = self.config.get_conf()
sh = self.conf.get('sortinghat')
self.sortinghat_client = SortingHatClient(host=sh['host'], port=sh.get('port', None),
path=sh.get('path', None), ssl=sh.get('ssl', False),
user=sh['user'], password=sh['password'],
verify_ssl=sh.get('verify_ssl', True),
tenant=sh.get('tenant', True))
self.sortinghat_client.connect()

def test_initialization(self):
"""Test whether attributes are initializated"""

config = Config(CONF_FILE)
task = Task(config)
task = Task(self.config, self.sortinghat_client)

self.assertEqual(task.config, config)
self.assertEqual(task.config, self.config)
self.assertEqual(task.db_sh, task.conf['sortinghat']['database'])
self.assertEqual(task.db_user, task.conf['sortinghat']['user'])
self.assertEqual(task.db_password, task.conf['sortinghat']['password'])
Expand All @@ -63,15 +74,13 @@ def test_initialization(self):
def test_run(self):
"""Test whether the Task could be run"""

config = Config(CONF_FILE)
task = Task(config)
task = Task(self.config, self.sortinghat_client)
self.assertEqual(task.execute(), None)

def test_compose_p2o_params(self):
"""Test whether p2o params are built correctly for a backend and a repository"""

config = Config(CONF_FILE)
task = Task(config)
task = Task(self.config, self.sortinghat_client)
params = task._compose_p2o_params("stackexchange", "https://stackoverflow.com/questions/tagged/example")
self.assertDictEqual(params, {'url': "https://stackoverflow.com/questions/tagged/example"})

Expand All @@ -92,8 +101,7 @@ def test_compose_p2o_params(self):
def test_extract_repo_tags(self):
"""Test the extraction of tags in repositories"""

config = Config(CONF_FILE)
task = Task(config)
task = Task(self.config, self.sortinghat_client)
url, tags = task._extract_repo_tags("git", "https://github.com/zhquan_example/repo --labels=[ENG, SUPP]")
self.assertEqual(url, "https://github.com/zhquan_example/repo")
self.assertListEqual(tags, ["ENG", "SUPP"])
Expand All @@ -113,8 +121,7 @@ def test_compose_perceval_params(self):

expected_repo_params = json.loads(read_file('data/task-params-expected'))

config = Config(CONF_FILE)
task = Task(config)
task = Task(self.config, self.sortinghat_client)

for backend in expected_repo_params.keys():
repo = expected_repo_params.get(backend)['repo']
Expand All @@ -126,8 +133,7 @@ def test_compose_perceval_params(self):
def test_get_collection_url(self):
"""Test whether the collection url could be overwritten in a backend"""

config = Config(CONF_FILE)
task = Task(config)
task = Task(self.config, self.sortinghat_client)
task.backend_section = "stackexchange"

self.assertEqual(task._get_collection_url(), COLLECTION_URL_STACKEXCHANGE)
Expand Down
34 changes: 19 additions & 15 deletions tests/test_task_autorefresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from sirmordred.task_enrich import TaskEnrich
from sirmordred.task_projects import TaskProjects

from sortinghat.cli.client import SortingHatSchema
from sortinghat.cli.client import SortingHatClient, SortingHatSchema

from sgqlc.operation import Operation

Expand Down Expand Up @@ -131,8 +131,16 @@ def add_domain(client, args):
client.execute(op)

def setUp(self):
config = Config(CONF_FILE)
task = TaskAutorefresh(config)
self.config = Config(CONF_FILE)
self.conf = self.config.get_conf()
sh = self.conf.get('sortinghat')
self.sortinghat_client = SortingHatClient(host=sh['host'], port=sh.get('port', None),
path=sh.get('path', None), ssl=sh.get('ssl', False),
user=sh['user'], password=sh['password'],
verify_ssl=sh.get('verify_ssl', True),
tenant=sh.get('tenant', True))
self.sortinghat_client.connect()
task = TaskAutorefresh(self.config, self.sortinghat_client)

# Clean database
entities = SortingHat.unique_identities(task.client)
Expand Down Expand Up @@ -162,35 +170,31 @@ def setUp(self):
def test_initialization(self):
"""Test whether attributes are initialized"""

config = Config(CONF_FILE)
task = TaskAutorefresh(config)
task = TaskAutorefresh(self.config, self.sortinghat_client)

self.assertEqual(task.config, config)
self.assertEqual(task.config, self.config)

def test_is_backend_task(self):
"""Test whether the Task is not a backend task"""

config = Config(CONF_FILE)
task = TaskAutorefresh(config)
task = TaskAutorefresh(self.config, self.sortinghat_client)

self.assertFalse(task.is_backend_task())

def test_execute(self):
"""Test whether the Task could be run"""

# Create a raw and enriched indexes
config = Config(CONF_FILE)

TaskProjects(config).execute()
TaskProjects(self.config, self.sortinghat_client).execute()
backend_section = GIT_BACKEND_SECTION

task_collection = TaskRawDataCollection(config, backend_section=backend_section)
task_collection = TaskRawDataCollection(self.config, backend_section=backend_section)
task_collection.execute()

task_enrich = TaskEnrich(config, backend_section=backend_section)
task_enrich = TaskEnrich(self.config, self.sortinghat_client, backend_section=backend_section)
task_enrich.execute()

task_autorefresh = TaskAutorefresh(config)
task_autorefresh = TaskAutorefresh(self.config, self.sortinghat_client)
task_autorefresh.config.set_param('es_enrichment', 'autorefresh', True)
# This does nothing because it uses now as from_date:
task_autorefresh.execute()
Expand All @@ -205,7 +209,7 @@ def test_execute(self):
self.assertIsNone(task_autorefresh.execute())

# Check that the autorefresh went well
cfg = config.get_conf()
cfg = self.conf
es_enrichment = cfg['es_enrichment']['url']
enrich_index = es_enrichment + "/" + cfg[GIT_BACKEND_SECTION]['enriched_index']

Expand Down
6 changes: 4 additions & 2 deletions tests/test_task_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def test_execute(self):
raw_index = es_collection + "/" + cfg[GIT_BACKEND_SECTION]['raw_index']

r = requests.get(raw_index + "/_search?size=0", verify=False)
raw_items = r.json()['hits']['total']
total = r.json()['hits']['total']
raw_items = total['value'] if isinstance(total, dict) else total
self.assertEqual(raw_items, 3603)

def test_execute_no_collection(self):
Expand All @@ -130,7 +131,8 @@ def test_execute_no_collection(self):
raw_index = es_collection + "/" + cfg[GIT_BACKEND_SECTION]['raw_index']

r = requests.get(raw_index + "/_search?size=0", verify=False)
raw_items = r.json()['hits']['total']
total = r.json()['hits']['total']
raw_items = total['value'] if isinstance(total, dict) else total
self.assertEqual(raw_items, 40)

def test_execute_filter_no_collection(self):
Expand Down
Loading

0 comments on commit e41bad5

Please sign in to comment.