Skip to content

Commit

Permalink
Reduce the number of connection to SortingHat server
Browse files Browse the repository at this point in the history
Signed-off-by: Quan Zhou <[email protected]>
  • Loading branch information
zhquan committed Aug 2, 2024
1 parent f2a96e0 commit ab351e0
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 23 deletions.
40 changes: 35 additions & 5 deletions sirmordred/sirmordred.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from sirmordred.task_manager import TasksManager
from sirmordred.task_panels import TaskPanels, TaskPanelsMenu
from sirmordred.task_projects import TaskProjects
from sortinghat.cli.client import SortingHatClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,15 +199,15 @@ def _split_tasks(tasks_cls):
repos_backend = self._get_repos_by_backend()
for backend in repos_backend:
# Start new Threads and add them to the threads list to complete
t = TasksManager(backend_tasks, backend, stopper, self.config, small_delay)
t = TasksManager(backend_tasks, backend, stopper, self.config, self.client, small_delay)
threads.append(t)
t.start()

# launch thread for global tasks
if len(global_tasks) > 0:
# FIXME timer is applied to all global_tasks, does it make sense?
# All tasks are executed in the same thread sequentially
gt = TasksManager(global_tasks, "Global tasks", stopper, self.config, big_delay)
gt = TasksManager(global_tasks, "Global tasks", stopper, self.config, self.client, big_delay)
threads.append(gt)
gt.start()
if big_delay > 0:
Expand Down Expand Up @@ -248,14 +249,14 @@ def __execute_initial_load(self):
if self.conf['phases']['panels']:
tasks = [TaskPanels, TaskPanelsMenu]
stopper.set()
tm = TasksManager(tasks, "Global tasks", stopper, self.config)
tm = TasksManager(tasks, "Global tasks", stopper, self.config, self.client)
tm.start()
tm.join()

logger.info("Loading projects")
tasks = [TaskProjects]
stopper.set()
tm = TasksManager(tasks, "Global tasks", stopper, self.config)
tm = TasksManager(tasks, "Global tasks", stopper, self.config, self.client)
tm.start()
tm.join()
logger.info("Projects loaded")
Expand All @@ -280,7 +281,7 @@ def start(self):

# check we have access to the needed ES
if not self.check_es_access():
print('Can not access Elasticsearch service. Exiting sirmordred ...')
print('Can not access ElasticSearch/OpenSearch service. Exiting sirmordred ...')
sys.exit(1)

# If bestiary is configured check that it is working
Expand All @@ -289,6 +290,9 @@ def start(self):
print('Can not access bestiary service. Exiting sirmordred ...')
sys.exit(1)

# Create SortingHat Client
self.__create_sh_client(self.config)

# Initial round: panels and projects loading
self.__execute_initial_load()

Expand Down Expand Up @@ -336,3 +340,29 @@ def start(self):
logger.error(var)

logger.info("Finished SirMordred engine ...")

def __create_sh_client(self, config):
self.config = config
self.conf = config.get_conf()

sortinghat = self.conf.get('sortinghat', None)
self.db_sh = sortinghat['database'] if sortinghat else None
self.db_user = sortinghat['user'] if sortinghat else None
self.db_password = sortinghat['password'] if sortinghat else None
self.db_host = sortinghat['host'] if sortinghat else '127.0.0.1'
self.db_path = sortinghat.get('path', None) if sortinghat else None
self.db_port = sortinghat.get('port', None) if sortinghat else None
self.db_ssl = sortinghat.get('ssl', False) if sortinghat else False
self.db_verify_ssl = sortinghat.get('verify_ssl', True) if sortinghat else True
self.db_tenant = sortinghat.get('tenant', True) if sortinghat else None
self.db_unaffiliate_group = sortinghat['unaffiliated_group'] if sortinghat else None
if sortinghat and not hasattr(self, 'client'):
self.client = SortingHatClient(host=self.db_host, port=self.db_port,
path=self.db_path, ssl=self.db_ssl,
user=self.db_user, password=self.db_password,
verify_ssl=self.db_verify_ssl,
tenant=self.db_tenant)
self.client.connect()
logger.info("SORTINGHAT")
elif not sortinghat:
self.client = None
12 changes: 2 additions & 10 deletions sirmordred/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ class Task():
'studies', 'node_regex', 'anonymize']
PARAMS_WITH_SPACES = ['blacklist-jobs']

def __init__(self, config):
def __init__(self, config, sortinghat_client):
self.backend_section = None
self.config = config
self.conf = config.get_conf()
self.client = sortinghat_client

sortinghat = self.conf.get('sortinghat', None)
self.db_sh = sortinghat['database'] if sortinghat else None
Expand All @@ -58,15 +59,6 @@ def __init__(self, config):
self.db_verify_ssl = sortinghat.get('verify_ssl', True) if sortinghat else True
self.db_tenant = sortinghat.get('tenant', True) if sortinghat else None
self.db_unaffiliate_group = sortinghat['unaffiliated_group'] if sortinghat else None
if sortinghat:
self.client = SortingHatClient(host=self.db_host, port=self.db_port,
path=self.db_path, ssl=self.db_ssl,
user=self.db_user, password=self.db_password,
verify_ssl=self.db_verify_ssl,
tenant=self.db_tenant)
self.client.connect()
else:
self.client = None

self.grimoire_con = grimoire_con(conn_retries=12) # 30m retry

Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_autorefresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
class TaskAutorefresh(Task):
"""Refresh the last modified identities for all the backends."""

def __init__(self, config):
super().__init__(config)
def __init__(self, config, sortinghat_client):
super().__init__(config, sortinghat_client)

self.last_autorefresh_backend = {}

Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
class TaskRawDataCollection(Task):
""" Basic class shared by all collection tasks """

def __init__(self, config, backend_section=None, allowed_repos=None):
super().__init__(config)
def __init__(self, config, sortinghat_client, backend_section=None, allowed_repos=None):
super().__init__(config, sortinghat_client)

self.backend_section = backend_section
self.allowed_repos = set(allowed_repos) if allowed_repos else None
Expand Down
4 changes: 2 additions & 2 deletions sirmordred/task_enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
class TaskEnrich(Task):
""" Basic class shared by all enriching tasks """

def __init__(self, config, backend_section=None, allowed_repos=None):
super().__init__(config)
def __init__(self, config, sortinghat_client, backend_section=None, allowed_repos=None):
super().__init__(config, sortinghat_client)
self.backend_section = backend_section
self.allowed_repos = set(allowed_repos) if allowed_repos else None
# This will be options in next iteration
Expand Down
5 changes: 3 additions & 2 deletions sirmordred/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TasksManager(threading.Thread):
IDENTITIES_TASKS_ON_LOCK = threading.Lock()
IDENTITIES_TASKS_ON = False

def __init__(self, tasks_cls, backend_section, stopper, config, timer=0):
def __init__(self, tasks_cls, backend_section, stopper, config, sortinghat_client, timer=0):
"""
:tasks_cls : tasks classes to be executed using the backend
:backend_section: perceval backend section name
Expand All @@ -64,6 +64,7 @@ def __init__(self, tasks_cls, backend_section, stopper, config, timer=0):
self.stopper = stopper # To stop the thread from parent
self.timer = timer
self.thread_id = None
self.client = sortinghat_client

def add_task(self, task):
self.tasks.append(task)
Expand All @@ -80,7 +81,7 @@ def __set_thread_id():
logger.debug(self.tasks_cls)
for tc in self.tasks_cls:
# create the real Task from the class
task = tc(self.config)
task = tc(self.config, self.client)
task.set_backend_section(self.backend_section)
self.tasks.append(task)

Expand Down

0 comments on commit ab351e0

Please sign in to comment.